Merge branch 'develop' into client-notifs

This commit is contained in:
Rohit
2025-08-14 15:59:10 +05:30
committed by GitHub
98 changed files with 15877 additions and 2424 deletions

View File

@@ -1,50 +0,0 @@
FROM --platform=$BUILDPLATFORM mcr.microsoft.com/playwright:v1.46.0-noble
# Set working directory
WORKDIR /app
# Install node dependencies
COPY package*.json ./
COPY src ./src
COPY public ./public
COPY server ./server
COPY tsconfig.json ./
COPY server/tsconfig.json ./server/
# COPY server/start.sh ./
# Install dependencies
RUN npm install --legacy-peer-deps
# Install Playwright browsers and dependencies
RUN npx playwright install --with-deps chromium
# Create the Chromium data directory with necessary permissions
RUN mkdir -p /tmp/chromium-data-dir && \
chmod -R 777 /tmp/chromium-data-dir
# Install dependencies
RUN apt-get update && apt-get install -y \
libgbm1 \
libnss3 \
libatk1.0-0 \
libatk-bridge2.0-0 \
libdrm2 \
libxkbcommon0 \
libglib2.0-0 \
libdbus-1-3 \
libx11-xcb1 \
libxcb1 \
libxcomposite1 \
libxcursor1 \
libxdamage1 \
libxext6 \
libxi6 \
libxtst6 \
&& rm -rf /var/lib/apt/lists/* \
&& mkdir -p /tmp/.X11-unix && chmod 1777 /tmp/.X11-unix
# Expose the backend port
EXPOSE ${BACKEND_PORT:-8080}
# Start the backend using the start script
CMD ["npm", "run", "server"]

View File

@@ -7,7 +7,7 @@ import Robot from "../models/Robot";
import Run from "../models/Run";
const router = Router();
import { getDecryptedProxyConfig } from "../routes/proxy";
import { uuid } from "uuidv4";
import { v4 as uuid } from "uuid";
import { createRemoteBrowserForRun, destroyRemoteBrowser } from "../browser-management/controller";
import logger from "../logger";
import { browserPool } from "../server";
@@ -19,6 +19,7 @@ import { Page } from "playwright";
import { WorkflowFile } from "maxun-core";
import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../workflow-management/integrations/gsheet";
import { airtableUpdateTasks, processAirtableUpdates } from "../workflow-management/integrations/airtable";
import { sendWebhook } from "../routes/webhook";
chromium.use(stealthPlugin());
const formatRecording = (recordingData: any) => {
@@ -667,6 +668,35 @@ async function executeRun(id: string, userId: string) {
}
)
// Trigger webhooks for run completion
const webhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: plainRun.runId,
robot_name: recording.recording_meta.name,
status: 'success',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
extracted_data: {
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
captured_lists: categorizedOutput.scrapeList,
total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount
},
metadata: {
browser_id: plainRun.browserId,
user_id: userId,
}
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_completed', webhookPayload);
logger.log('info', `Webhooks sent successfully for completed run ${plainRun.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send webhooks for run ${plainRun.runId}: ${webhookError.message}`);
}
try {
googleSheetUpdateTasks[id] = {
robotId: plainRun.robotMetaId,
@@ -701,6 +731,34 @@ async function executeRun(id: string, userId: string) {
status: 'failed',
finishedAt: new Date().toLocaleString(),
});
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: run.robotMetaId,
run_id: run.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: run.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: error.message,
stack: error.stack,
type: error.name || 'ExecutionError'
},
metadata: {
browser_id: run.browserId,
user_id: userId,
}
};
try {
await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${run.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${run.runId}: ${webhookError.message}`);
}
}
capture(
'maxun-oss-run-created-api',

View File

@@ -14,7 +14,7 @@ interface BrowserPoolInfo {
/**
* The instance of remote browser.
*/
browser: RemoteBrowser,
browser: RemoteBrowser | null,
/**
* States if the browser's instance is being actively used.
* Helps to persist the progress on the frontend when the application has been reloaded.
@@ -31,6 +31,11 @@ interface BrowserPoolInfo {
* @default "recording"
*/
state: BrowserState,
/**
* The status of the browser instance.
* Can be "reserved", "initializing", "ready" or "failed".
*/
status?: "reserved" | "initializing" | "ready" | "failed",
}
/**
@@ -205,8 +210,18 @@ export class BrowserPool {
* @returns remote browser instance or undefined if it does not exist in the pool
*/
public getRemoteBrowser = (id: string): RemoteBrowser | undefined => {
logger.log('debug', `Remote browser with id: ${id} retrieved from the pool`);
return this.pool[id]?.browser;
const poolInfo = this.pool[id];
if (!poolInfo) {
return undefined;
}
// Return undefined for reserved slots (browser is null)
if (poolInfo.status === "reserved") {
logger.log('debug', `Browser ${id} is reserved but not yet ready`);
return undefined;
}
return poolInfo.browser || undefined;
};
/**
@@ -506,6 +521,29 @@ export class BrowserPool {
return browserIds.length > 0 ? browserIds[0] : null;
};
/**
* Checks if there are available browser slots for a user.
* Returns true if user has available slots AND none of their active browsers are in "recording" state.
* @param userId the user ID to check browser slots for
* @returns {boolean} true if user has available slots and no recording browsers, false otherwise
*/
public hasAvailableBrowserSlots = (userId: string, state?: BrowserState): boolean => {
const userBrowserIds = this.userToBrowserMap.get(userId) || [];
if (userBrowserIds.length >= 2) {
return false;
}
if (state === "recording") {
const hasBrowserInState = userBrowserIds.some(browserId =>
this.pool[browserId] && this.pool[browserId].state === "recording"
);
return !hasBrowserInState;
}
return true;
};
/**
* Returns the first active browser's instance id from the pool.
* If there is no active browser, it returns null.
@@ -524,4 +562,71 @@ export class BrowserPool {
// logger.log('warn', `No active browser in the pool`);
return null;
};
/**
* Reserves a browser slot immediately without creating the actual browser.
* This ensures slot counting is accurate for rapid successive requests.
*
* @param id browser ID to reserve
* @param userId user ID that owns this reservation
* @param state browser state ("recording" or "run")
* @returns true if slot was reserved, false if user has reached limit
*/
public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => {
// Check if user has available slots first
if (!this.hasAvailableBrowserSlots(userId, state)) {
logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`);
return false;
}
// Reserve the slot with null browser
this.pool[id] = {
browser: null,
active: false,
userId,
state,
status: "reserved"
};
// Update the user-to-browser mapping
let userBrowserIds = this.userToBrowserMap.get(userId) || [];
if (!userBrowserIds.includes(id)) {
userBrowserIds.push(id);
this.userToBrowserMap.set(userId, userBrowserIds);
}
logger.log('info', `Reserved browser slot ${id} for user ${userId} in state ${state}`);
return true;
};
/**
* Upgrades a reserved slot to an actual browser instance.
*
* @param id browser ID that was previously reserved
* @param browser the actual RemoteBrowser instance
* @returns true if successful, false if slot wasn't reserved
*/
public upgradeBrowserSlot = (id: string, browser: RemoteBrowser): boolean => {
if (!this.pool[id] || this.pool[id].status !== "reserved") {
logger.log('warn', `Cannot upgrade browser ${id}: slot not reserved`);
return false;
}
this.pool[id].browser = browser;
this.pool[id].status = "ready";
logger.log('info', `Upgraded browser slot ${id} to ready state`);
return true;
};
/**
* Marks a reserved slot as failed and removes it.
*
* @param id browser ID to mark as failed
*/
public failBrowserSlot = (id: string): void => {
if (this.pool[id]) {
logger.log('info', `Marking browser slot ${id} as failed`);
this.deleteRemoteBrowser(id);
}
};
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,10 @@
const esbuild = require('esbuild');
esbuild.build({
entryPoints: ['rrweb-entry.js'],
bundle: true,
minify: true,
outfile: 'rrweb-bundle.js',
format: 'iife', // so that rrwebSnapshot is available on window
globalName: 'rrwebSnapshotBundle'
}).catch(() => process.exit(1));

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,2 @@
import { snapshot } from 'rrweb-snapshot';
window.rrwebSnapshot = { snapshot };

View File

@@ -3,7 +3,7 @@
* Holds the singleton instances of browser pool and socket.io server.
*/
import { Socket } from "socket.io";
import { uuid } from 'uuidv4';
import { v4 as uuid } from "uuid";
import { createSocketConnection, createSocketConnectionForRun } from "../socket-connection/connection";
import { io, browserPool } from "../server";
@@ -20,7 +20,7 @@ import logger from "../logger";
* @returns string
* @category BrowserManagement-Controller
*/
export const initializeRemoteBrowserForRecording = (userId: string): string => {
export const initializeRemoteBrowserForRecording = (userId: string, mode: string = "dom"): string => {
const id = getActiveBrowserIdByState(userId, "recording") || uuid();
createSocketConnection(
io.of(id),
@@ -37,7 +37,15 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => {
browserSession.interpreter.subscribeToPausing();
await browserSession.initialize(userId);
await browserSession.registerEditorEvents();
await browserSession.subscribeToScreencast();
if (mode === "dom") {
await browserSession.subscribeToDOM();
logger.info('DOM streaming started for scraping browser in recording mode');
} else {
await browserSession.subscribeToScreencast();
logger.info('Screenshot streaming started for local browser in recording mode');
}
browserPool.addRemoteBrowser(id, browserSession, userId, false, "recording");
}
socket.emit('loaded');
@@ -54,20 +62,23 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => {
* @category BrowserManagement-Controller
*/
export const createRemoteBrowserForRun = (userId: string): string => {
const id = uuid();
if (!userId) {
logger.log('error', 'createRemoteBrowserForRun: Missing required parameter userId');
throw new Error('userId is required');
}
const id = uuid();
const slotReserved = browserPool.reserveBrowserSlot(id, userId, "run");
if (!slotReserved) {
logger.log('warn', `Cannot create browser for user ${userId}: no available slots`);
throw new Error('User has reached maximum browser limit');
}
logger.log('info', `createRemoteBrowserForRun: Reserved slot ${id} for user ${userId}`);
initializeBrowserAsync(id, userId);
createSocketConnectionForRun(
io.of(id),
async (socket: Socket) => {
try {
const browserSession = new RemoteBrowser(socket, userId, id);
await browserSession.initialize(userId);
browserPool.addRemoteBrowser(id, browserSession, userId, false, "run");
socket.emit('ready-for-run');
} catch (error: any) {
logger.error(`Error initializing browser: ${error.message}`);
}
});
return id;
};
@@ -135,6 +146,19 @@ export const getActiveBrowserIdByState = (userId: string, state: "recording" | "
return browserPool.getActiveBrowserId(userId, state);
};
/**
* Checks if there are available browser slots for a user.
* Wrapper around {@link browserPool.hasAvailableBrowserSlots()} function.
* If state is provided, also checks that none of their active browsers are in that state.
* @param userId the user ID to check browser slots for
* @param state optional state to check - if provided, ensures no browser is in this state
* @returns {boolean} true if user has available slots (and no browsers in specified state if state is provided)
* @category BrowserManagement-Controller
*/
export const canCreateBrowserInState = (userId: string, state?: "recording" | "run"): boolean => {
return browserPool.hasAvailableBrowserSlots(userId, state);
};
/**
* Returns the url string from a remote browser if exists in the browser pool.
* @param id instance id of the remote browser
@@ -198,3 +222,87 @@ export const stopRunningInterpretation = async (userId: string) => {
logger.log('error', 'Cannot stop interpretation: No active browser or generator.');
}
};
const initializeBrowserAsync = async (id: string, userId: string) => {
try {
const namespace = io.of(id);
let clientConnected = false;
let connectionTimeout: NodeJS.Timeout;
const waitForConnection = new Promise<Socket | null>((resolve) => {
namespace.on('connection', (socket: Socket) => {
clientConnected = true;
clearTimeout(connectionTimeout);
logger.log('info', `Frontend connected to browser ${id} via socket ${socket.id}`);
resolve(socket);
});
connectionTimeout = setTimeout(() => {
if (!clientConnected) {
logger.log('warn', `No client connected to browser ${id} within timeout, proceeding with dummy socket`);
resolve(null);
}
}, 10000);
});
namespace.on('error', (error: any) => {
logger.log('error', `Socket namespace error for browser ${id}: ${error.message}`);
clearTimeout(connectionTimeout);
browserPool.failBrowserSlot(id);
});
const socket = await waitForConnection;
try {
let browserSession: RemoteBrowser;
if (socket) {
logger.log('info', `Using real socket for browser ${id}`);
browserSession = new RemoteBrowser(socket, userId, id);
} else {
logger.log('info', `Using dummy socket for browser ${id}`);
const dummySocket = {
emit: (event: string, data?: any) => {
logger.log('debug', `Browser ${id} dummy socket emitted ${event}:`, data);
},
on: () => {},
id: `dummy-${id}`,
} as any;
browserSession = new RemoteBrowser(dummySocket, userId, id);
}
await browserSession.initialize(userId);
const upgraded = browserPool.upgradeBrowserSlot(id, browserSession);
if (!upgraded) {
throw new Error('Failed to upgrade reserved browser slot');
}
if (socket) {
socket.emit('ready-for-run');
} else {
setTimeout(async () => {
try {
logger.log('info', `Starting execution for browser ${id} with dummy socket`);
} catch (error: any) {
logger.log('error', `Error executing run for browser ${id}: ${error.message}`);
}
}, 100);
}
logger.log('info', `Browser ${id} successfully initialized for run with ${socket ? 'real' : 'dummy'} socket`);
} catch (error: any) {
logger.log('error', `Error initializing browser ${id}: ${error.message}`);
browserPool.failBrowserSlot(id);
if (socket) {
socket.emit('error', { message: error.message });
}
}
} catch (error: any) {
logger.log('error', `Error setting up browser ${id}: ${error.message}`);
browserPool.failBrowserSlot(id);
}
};

View File

@@ -11,6 +11,8 @@ import { WorkflowGenerator } from "../workflow-management/classes/Generator";
import { Page } from "playwright";
import { throttle } from "../../../src/helpers/inputHelpers";
import { CustomActions } from "../../../src/shared/types";
import { WhereWhatPair } from "maxun-core";
import { RemoteBrowser } from './classes/RemoteBrowser';
/**
* A wrapper function for handling user input.
@@ -27,7 +29,7 @@ import { CustomActions } from "../../../src/shared/types";
*/
const handleWrapper = async (
handleCallback: (
generator: WorkflowGenerator,
activeBrowser: RemoteBrowser,
page: Page,
args?: any
) => Promise<void>,
@@ -44,9 +46,9 @@ const handleWrapper = async (
const currentPage = activeBrowser?.getCurrentPage();
if (currentPage && activeBrowser) {
if (args) {
await handleCallback(activeBrowser.generator, currentPage, args);
await handleCallback(activeBrowser, currentPage, args);
} else {
await handleCallback(activeBrowser.generator, currentPage);
await handleCallback(activeBrowser, currentPage);
}
} else {
logger.log('warn', `No active page for browser ${id}`);
@@ -85,8 +87,19 @@ const onGenerateAction = async (customActionEventData: CustomActionEventData, us
* @category BrowserManagement
*/
const handleGenerateAction =
async (generator: WorkflowGenerator, page: Page, { action, settings }: CustomActionEventData) => {
await generator.customAction(action, settings, page);
async (activeBrowser: RemoteBrowser, page: Page, { action, settings }: CustomActionEventData) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring generate action event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await generator.customAction(action, settings, page);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling generate action event: ${message}`);
}
}
/**
@@ -104,40 +117,51 @@ const onMousedown = async (coordinates: Coordinates, userId: string) => {
* A mousedown event handler.
* Reproduces the click on the remote browser instance
* and generates pair data for the recorded workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param x - the x coordinate of the mousedown event
* @param y - the y coordinate of the mousedown event
* @category BrowserManagement
*/
const handleMousedown = async (generator: WorkflowGenerator, page: Page, { x, y }: Coordinates) => {
const handleMousedown = async (activeBrowser: RemoteBrowser, page: Page, { x, y }: Coordinates) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring mousedown event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await generator.onClick({ x, y }, page);
const previousUrl = page.url();
const tabsBeforeClick = page.context().pages().length;
await page.mouse.click(x, y);
// try if the click caused a navigation to a new url
try {
await page.waitForNavigation({ timeout: 2000 });
const currentUrl = page.url();
if (currentUrl !== previousUrl) {
generator.notifyUrlChange(currentUrl);
}
await page.waitForNavigation({ timeout: 2000 });
const currentUrl = page.url();
if (currentUrl !== previousUrl) {
generator.notifyUrlChange(currentUrl);
}
} catch (e) {
const { message } = e as Error;
const { message } = e as Error;
} //ignore possible timeouts
// check if any new page was opened by the click
const tabsAfterClick = page.context().pages().length;
const numOfNewPages = tabsAfterClick - tabsBeforeClick;
if (numOfNewPages > 0) {
for (let i = 1; i <= numOfNewPages; i++) {
const newPage = page.context().pages()[tabsAfterClick - i];
if (newPage) {
generator.notifyOnNewTab(newPage, tabsAfterClick - i);
}
for (let i = 1; i <= numOfNewPages; i++) {
const newPage = page.context().pages()[tabsAfterClick - i];
if (newPage) {
generator.notifyOnNewTab(newPage, tabsAfterClick - i);
}
}
}
logger.log('debug', `Clicked on position x:${x}, y:${y}`);
logger.log("debug", `Clicked on position x:${x}, y:${y}`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling mousedown event: ${message}`);
}
};
/**
@@ -156,15 +180,16 @@ const onWheel = async (scrollDeltas: ScrollDeltas, userId: string) => {
* Reproduces the wheel event on the remote browser instance.
* Scroll is not generated for the workflow pair. This is because
* Playwright scrolls elements into focus on any action.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param deltaX - the delta x of the wheel event
* @param deltaY - the delta y of the wheel event
* @category BrowserManagement
*/
const handleWheel = async (generator: WorkflowGenerator, page: Page, { deltaX, deltaY }: ScrollDeltas) => {
const handleWheel = async (activeBrowser: RemoteBrowser, page: Page, { deltaX, deltaY }: ScrollDeltas) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring wheel event: page is closed`);
return;
}
@@ -194,28 +219,30 @@ const onMousemove = async (coordinates: Coordinates, userId: string) => {
* Reproduces the mousemove event on the remote browser instance
* and generates data for the client's highlighter.
* Mousemove is also not reflected in the workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param x - the x coordinate of the mousemove event
* @param y - the y coordinate of the mousemove event
* @category BrowserManagement
*/
const handleMousemove = async (generator: WorkflowGenerator, page: Page, { x, y }: Coordinates) => {
const handleMousemove = async (activeBrowser: RemoteBrowser, page: Page, { x, y }: Coordinates) => {
try {
if (page.isClosed()) {
logger.log('debug', `Ignoring mousemove event: page is closed`);
logger.log("debug", `Ignoring mousemove event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await page.mouse.move(x, y);
throttle(async () => {
if (!page.isClosed()) {
await generator.generateDataForHighlighter(page, { x, y });
}
}, 100)();
logger.log('debug', `Moved over position x:${x}, y:${y}`);
logger.log("debug", `Moved over position x:${x}, y:${y}`);
} catch (e) {
const { message } = e as Error;
logger.log('error', message);
logger.log("error", message);
}
}
@@ -234,28 +261,50 @@ const onKeydown = async (keyboardInput: KeyboardInput, userId: string) => {
* A keydown event handler.
* Reproduces the keydown event on the remote browser instance
* and generates the workflow pair data.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param key - the pressed key
* @param coordinates - the coordinates, where the keydown event happened
* @category BrowserManagement
*/
const handleKeydown = async (generator: WorkflowGenerator, page: Page, { key, coordinates }: KeyboardInput) => {
await page.keyboard.down(key);
await generator.onKeyboardInput(key, coordinates, page);
logger.log('debug', `Key ${key} pressed`);
const handleKeydown = async (activeBrowser: RemoteBrowser, page: Page, { key, coordinates }: KeyboardInput) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring keydown event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await page.keyboard.down(key);
await generator.onKeyboardInput(key, coordinates, page);
logger.log("debug", `Key ${key} pressed`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling keydown event: ${message}`);
}
};
/**
* Handles the date selection event.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the date selection event {@link DatePickerEventData}
* @category BrowserManagement
*/
const handleDateSelection = async (generator: WorkflowGenerator, page: Page, data: DatePickerEventData) => {
await generator.onDateSelection(page, data);
logger.log('debug', `Date ${data.value} selected`);
const handleDateSelection = async (activeBrowser: RemoteBrowser, page: Page, data: DatePickerEventData) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring date selection event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await generator.onDateSelection(page, data);
logger.log("debug", `Date ${data.value} selected`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling date selection event: ${message}`);
}
}
/**
@@ -271,14 +320,25 @@ const onDateSelection = async (data: DatePickerEventData, userId: string) => {
/**
* Handles the dropdown selection event.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the dropdown selection event
* @category BrowserManagement
*/
const handleDropdownSelection = async (generator: WorkflowGenerator, page: Page, data: { selector: string, value: string }) => {
await generator.onDropdownSelection(page, data);
logger.log('debug', `Dropdown value ${data.value} selected`);
const handleDropdownSelection = async (activeBrowser: RemoteBrowser, page: Page, data: { selector: string, value: string }) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring dropdown selection event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await generator.onDropdownSelection(page, data);
logger.log("debug", `Dropdown value ${data.value} selected`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling dropdown selection event: ${message}`);
}
}
/**
@@ -294,14 +354,25 @@ const onDropdownSelection = async (data: { selector: string, value: string }, us
/**
* Handles the time selection event.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the time selection event
* @category BrowserManagement
*/
const handleTimeSelection = async (generator: WorkflowGenerator, page: Page, data: { selector: string, value: string }) => {
await generator.onTimeSelection(page, data);
logger.log('debug', `Time value ${data.value} selected`);
const handleTimeSelection = async (activeBrowser: RemoteBrowser, page: Page, data: { selector: string, value: string }) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring time selection event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await generator.onTimeSelection(page, data);
logger.log("debug", `Time value ${data.value} selected`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling time selection event: ${message}`);
}
}
/**
@@ -317,14 +388,31 @@ const onTimeSelection = async (data: { selector: string, value: string }, userId
/**
* Handles the datetime-local selection event.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the datetime-local selection event
* @category BrowserManagement
*/
const handleDateTimeLocalSelection = async (generator: WorkflowGenerator, page: Page, data: { selector: string, value: string }) => {
await generator.onDateTimeLocalSelection(page, data);
logger.log('debug', `DateTime Local value ${data.value} selected`);
const handleDateTimeLocalSelection = async (activeBrowser: RemoteBrowser, page: Page, data: { selector: string, value: string }) => {
try {
if (page.isClosed()) {
logger.log(
"debug",
`Ignoring datetime-local selection event: page is closed`
);
return;
}
const generator = activeBrowser.generator;
await generator.onDateTimeLocalSelection(page, data);
logger.log("debug", `DateTime Local value ${data.value} selected`);
} catch (e) {
const { message } = e as Error;
logger.log(
"warn",
`Error handling datetime-local selection event: ${message}`
);
}
}
/**
@@ -353,14 +441,24 @@ const onKeyup = async (keyboardInput: KeyboardInput, userId: string) => {
* A keyup event handler.
* Reproduces the keyup event on the remote browser instance.
* Does not generate any data - keyup is not reflected in the workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param key - the released key
* @category BrowserManagement
*/
const handleKeyup = async (generator: WorkflowGenerator, page: Page, key: string) => {
await page.keyboard.up(key);
logger.log('debug', `Key ${key} unpressed`);
const handleKeyup = async (activeBrowser: RemoteBrowser, page: Page, key: string) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring keyup event: page is closed`);
return;
}
await page.keyboard.up(key);
logger.log("debug", `Key ${key} unpressed`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling keyup event: ${message}`);
}
};
/**
@@ -377,23 +475,36 @@ const onChangeUrl = async (url: string, userId: string) => {
/**
* An url change event handler.
* Navigates the page to the given url and generates data for the workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param url - the new url of the page
* @category BrowserManagement
*/
const handleChangeUrl = async (generator: WorkflowGenerator, page: Page, url: string) => {
if (url) {
await generator.onChangeUrl(url, page);
try {
await page.goto(url, { waitUntil: 'networkidle', timeout: 10000 });
logger.log('debug', `Went to ${url}`);
} catch (e) {
const { message } = e as Error;
logger.log('error', message);
const handleChangeUrl = async (activeBrowser: RemoteBrowser, page: Page, url: string) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring change url event: page is closed`);
return;
}
} else {
logger.log('warn', `No url provided`);
if (url) {
const generator = activeBrowser.generator;
await generator.onChangeUrl(url, page);
try {
await page.goto(url, { waitUntil: "domcontentloaded", timeout: 30000 });
await page.waitForTimeout(2000);
logger.log("debug", `Went to ${url}`);
} catch (e) {
const { message } = e as Error;
logger.log("error", message);
}
} else {
logger.log("warn", `No url provided`);
}
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling change url event: ${message}`);
}
};
@@ -410,13 +521,23 @@ const onRefresh = async (userId: string) => {
/**
* A refresh event handler.
* Refreshes the page. This is not reflected in the workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @category BrowserManagement
*/
const handleRefresh = async (generator: WorkflowGenerator, page: Page) => {
await page.reload();
logger.log('debug', `Page refreshed.`);
const handleRefresh = async (activeBrowser: RemoteBrowser, page: Page) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring refresh event: page is closed`);
return;
}
await page.reload();
logger.log("debug", `Page refreshed.`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling refresh event: ${message}`);
}
};
/**
@@ -432,14 +553,25 @@ const onGoBack = async (userId: string) => {
/**
* A go back event handler.
* Navigates the page back and generates data for the workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @category BrowserManagement
*/
const handleGoBack = async (generator: WorkflowGenerator, page: Page) => {
await page.goBack({ waitUntil: 'commit' });
generator.onGoBack(page.url());
logger.log('debug', 'Page went back')
const handleGoBack = async (activeBrowser: RemoteBrowser, page: Page) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring go back event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await page.goBack({ waitUntil: "commit" });
generator.onGoBack(page.url());
logger.log("debug", "Page went back");
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling go back event: ${message}`);
}
};
/**
@@ -455,14 +587,209 @@ const onGoForward = async (userId: string) => {
/**
* A go forward event handler.
* Navigates the page forward and generates data for the workflow.
* @param generator - the workflow generator {@link Generator}
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @category BrowserManagement
*/
const handleGoForward = async (generator: WorkflowGenerator, page: Page) => {
await page.goForward({ waitUntil: 'commit' });
generator.onGoForward(page.url());
logger.log('debug', 'Page went forward');
const handleGoForward = async (activeBrowser: RemoteBrowser, page: Page) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring go forward event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await page.goForward({ waitUntil: "commit" });
generator.onGoForward(page.url());
logger.log("debug", "Page went forward");
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling go forward event: ${message}`);
}
};
/**
* Handles the click action event.
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the click action event
* @category BrowserManagement
*/
const handleClickAction = async (
activeBrowser: RemoteBrowser,
page: Page,
data: {
selector: string;
url: string;
userId: string;
elementInfo?: any;
coordinates?: { x: number; y: number };
isSPA?: boolean;
}
) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring click action event: page is closed`);
return;
}
const { selector, url, elementInfo, coordinates, isSPA = false } = data;
const currentUrl = page.url();
await page.click(selector);
const generator = activeBrowser.generator;
await generator.onDOMClickAction(page, data);
logger.log("debug", `Click action processed: ${selector}`);
if (isSPA) {
logger.log("debug", `SPA interaction detected for selector: ${selector}`);
await new Promise((resolve) => setTimeout(resolve, 1500));
} else {
const newUrl = page.url();
const hasNavigated = newUrl !== currentUrl && !newUrl.endsWith("/#");
if (hasNavigated) {
logger.log("debug", `Navigation detected: ${currentUrl} -> ${newUrl}`);
await generator.onDOMNavigation(page, {
url: newUrl,
currentUrl: currentUrl,
userId: data.userId,
});
}
}
await new Promise((resolve) => setTimeout(resolve, 2000));
await activeBrowser.makeAndEmitDOMSnapshot();
} catch (e) {
const { message } = e as Error;
logger.log(
"warn",
`Error handling enhanced click action event: ${message}`
);
}
};
/**
* A wrapper function for handling the click action event.
* @param socket The socket connection
* @param data - the data of the click action event
* @category HelperFunctions
*/
const onDOMClickAction = async (
data: {
selector: string;
url: string;
userId: string;
elementInfo?: any;
coordinates?: { x: number; y: number };
},
userId: string
) => {
logger.log("debug", "Handling click action event emitted from client");
await handleWrapper(handleClickAction, userId, data);
};
/**
* Handles the keyboard action event.
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the keyboard action event
* @category BrowserManagement
*/
const handleKeyboardAction = async (
activeBrowser: RemoteBrowser,
page: Page,
data: {
selector: string;
key: string;
url: string;
userId: string;
inputType?: string;
}
) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring keyboard action event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await page.press(data.selector, data.key);
await generator.onDOMKeyboardAction(page, data);
logger.log(
"debug",
`Keyboard action processed: ${data.key} on ${data.selector}`
);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling keyboard action event: ${message}`);
}
};
/**
* A wrapper function for handling the keyboard action event.
* @param socket The socket connection
* @param data - the data of the keyboard action event
* @category HelperFunctions
*/
const onDOMKeyboardAction = async (
data: {
selector: string;
key: string;
url: string;
userId: string;
inputType?: string;
},
userId: string
) => {
logger.log("debug", "Handling keyboard action event emitted from client");
await handleWrapper(handleKeyboardAction, userId, data);
};
/**
* Handles the workflow pair event.
* @param activeBrowser - the active remote browser {@link RemoteBrowser}
* @param page - the active page of the remote browser
* @param data - the data of the workflow pair event
* @category BrowserManagement
*/
const handleWorkflowPair = async (
activeBrowser: RemoteBrowser,
page: Page,
data: { pair: WhereWhatPair; userId: string }
) => {
try {
if (page.isClosed()) {
logger.log("debug", `Ignoring workflow pair event: page is closed`);
return;
}
const generator = activeBrowser.generator;
await generator.onDOMWorkflowPair(page, data);
logger.log("debug", `Workflow pair processed from frontend`);
} catch (e) {
const { message } = e as Error;
logger.log("warn", `Error handling workflow pair event: ${message}`);
}
};
/**
* A wrapper function for handling the workflow pair event.
* @param socket The socket connection
* @param data - the data of the workflow pair event
* @category HelperFunctions
*/
const onDOMWorkflowPair = async (
data: { pair: WhereWhatPair; userId: string },
userId: string
) => {
logger.log("debug", "Handling workflow pair event emitted from client");
await handleWrapper(handleWorkflowPair, userId, data);
};
/**
@@ -493,6 +820,10 @@ const registerInputHandlers = (socket: Socket, userId: string) => {
socket.on("input:time", (data) => onTimeSelection(data, userId));
socket.on("input:datetime-local", (data) => onDateTimeLocalSelection(data, userId));
socket.on("action", (data) => onGenerateAction(data, userId));
socket.on("dom:click", (data) => onDOMClickAction(data, userId));
socket.on("dom:keypress", (data) => onDOMKeyboardAction(data, userId));
socket.on("dom:addpair", (data) => onDOMWorkflowPair(data, userId));
};
export default registerInputHandlers;

View File

@@ -1,4 +1,4 @@
import dotenv from 'dotenv';
const dotenv = require('dotenv');
dotenv.config({ path: './.env' });
// Validate required environment variables

View File

@@ -0,0 +1,27 @@
'use strict';
module.exports = {
async up(queryInterface, Sequelize) {
await queryInterface.addColumn('robot', 'webhooks', {
type: Sequelize.JSONB,
allowNull: true,
defaultValue: null,
comment: 'Webhook configurations for the robot'
});
// Optional: Add an index for better query performance if you plan to search within webhook data
await queryInterface.addIndex('robot', {
fields: ['webhooks'],
using: 'gin', // GIN index for JSONB columns
name: 'robot_webhooks_gin_idx'
});
},
async down(queryInterface, Sequelize) {
// Remove the index first
await queryInterface.removeIndex('robot', 'robot_webhooks_gin_idx');
// Then remove the column
await queryInterface.removeColumn('robot', 'webhooks');
}
};

373
server/src/mcp-worker.ts Normal file
View File

@@ -0,0 +1,373 @@
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import fetch from 'node-fetch';
import dotenv from 'dotenv';
dotenv.config();
const log = (message: string) => {
if (process.env.NODE_ENV !== 'production') {
console.error(`[MCP Worker] ${message}`);
}
};
class MaxunMCPWorker {
private mcpServer: McpServer;
private apiKey: string;
private apiUrl: string;
constructor() {
this.apiKey = process.env.MCP_API_KEY || '';
this.apiUrl = process.env.BACKEND_URL || 'http://localhost:8080';
if (!this.apiKey) {
throw new Error('MCP_API_KEY environment variable is required');
}
this.mcpServer = new McpServer({
name: 'Maxun Web Scraping Server',
version: '1.0.0'
});
this.setupTools();
}
private async makeApiRequest(endpoint: string, options: any = {}) {
const url = `${this.apiUrl}${endpoint}`;
const headers = {
'Content-Type': 'application/json',
'x-api-key': this.apiKey,
...options.headers
};
const response = await fetch(url, {
...options,
headers
});
if (!response.ok) {
throw new Error(`API request failed: ${response.status} ${response.statusText}`);
}
return await response.json();
}
private setupTools() {
// Tool: List all robots
this.mcpServer.tool(
"list_robots",
{},
async () => {
try {
const data = await this.makeApiRequest('/api/robots');
return {
content: [{
type: "text",
text: `Found ${data.robots.totalCount} robots:\n\n${JSON.stringify(data.robots.items, null, 2)}`
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `Error fetching robots: ${error.message}`
}],
isError: true
};
}
}
);
// Tool: Get robot details by ID
this.mcpServer.tool(
"get_robot",
{
robot_id: z.string().describe("ID of the robot to get details for")
},
async ({ robot_id }: { robot_id: string }) => {
try {
const data = await this.makeApiRequest(`/api/robots/${robot_id}`);
return {
content: [{
type: "text",
text: `Robot Details:\n\n${JSON.stringify(data.robot, null, 2)}`
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `Error fetching robot: ${error.message}`
}],
isError: true
};
}
}
);
// Tool: Run a robot and get results
this.mcpServer.tool(
"run_robot",
{
robot_id: z.string().describe("ID of the robot to run"),
wait_for_completion: z.boolean().default(true).describe("Whether to wait for the run to complete")
},
async ({ robot_id, wait_for_completion }: { robot_id: string; wait_for_completion: boolean }) => {
try {
const data = await this.makeApiRequest(`/api/robots/${robot_id}/runs`, {
method: 'POST'
});
if (wait_for_completion) {
const extractedData = data.run.data;
const screenshots = data.run.screenshots;
let resultText = `Robot run completed successfully!\n\n`;
resultText += `Run ID: ${data.run.runId}\n`;
resultText += `Status: ${data.run.status}\n`;
resultText += `Started: ${data.run.startedAt}\n`;
resultText += `Finished: ${data.run.finishedAt}\n\n`;
if (extractedData.textData && extractedData.textData.length > 0) {
resultText += `Extracted Text Data (${extractedData.textData.length} items):\n`;
resultText += JSON.stringify(extractedData.textData, null, 2) + '\n\n';
}
if (extractedData.listData && extractedData.listData.length > 0) {
resultText += `Extracted List Data (${extractedData.listData.length} items):\n`;
resultText += JSON.stringify(extractedData.listData, null, 2) + '\n\n';
}
if (screenshots && screenshots.length > 0) {
resultText += `Screenshots captured: ${screenshots.length}\n`;
resultText += `Screenshot URLs:\n`;
screenshots.forEach((screenshot: any, index: any) => {
resultText += `${index + 1}. ${screenshot}\n`;
});
}
return {
content: [{
type: "text",
text: resultText
}]
};
} else {
return {
content: [{
type: "text",
text: `Robot run started! Run ID: ${data.run.runId}\nStatus: ${data.run.status}`
}]
};
}
} catch (error: any) {
return {
content: [{
type: "text",
text: `Error running robot: ${error.message}`
}],
isError: true
};
}
}
);
// Tool: Get all runs for a robot
this.mcpServer.tool(
"get_robot_runs",
{
robot_id: z.string().describe("ID of the robot")
},
async ({ robot_id }: { robot_id: string }) => {
try {
const data = await this.makeApiRequest(`/api/robots/${robot_id}/runs`);
return {
content: [{
type: "text",
text: `Robot runs (${data.runs.totalCount} total):\n\n${JSON.stringify(data.runs.items, null, 2)}`
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `Error fetching runs: ${error.message}`
}],
isError: true
};
}
}
);
// Tool: Get specific run details
this.mcpServer.tool(
"get_run_details",
{
robot_id: z.string().describe("ID of the robot"),
run_id: z.string().describe("ID of the specific run")
},
async ({ robot_id, run_id }: { robot_id: string; run_id: string }) => {
try {
const data = await this.makeApiRequest(`/api/robots/${robot_id}/runs/${run_id}`);
const run = data.run;
let resultText = `Run Details:\n\n`;
resultText += `Run ID: ${run.runId}\n`;
resultText += `Status: ${run.status}\n`;
resultText += `Robot ID: ${run.robotId}\n`;
resultText += `Started: ${run.startedAt}\n`;
resultText += `Finished: ${run.finishedAt}\n\n`;
if (run.data.textData && run.data.textData.length > 0) {
resultText += `Extracted Text Data:\n${JSON.stringify(run.data.textData, null, 2)}\n\n`;
}
if (run.data.listData && run.data.listData.length > 0) {
resultText += `Extracted List Data:\n${JSON.stringify(run.data.listData, null, 2)}\n\n`;
}
if (run.screenshots && run.screenshots.length > 0) {
resultText += `Screenshots:\n`;
run.screenshots.forEach((screenshot: any, index: any) => {
resultText += `${index + 1}. ${screenshot}\n`;
});
}
return {
content: [{
type: "text",
text: resultText
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `Error fetching run details: ${error.message}`
}],
isError: true
};
}
}
);
// Tool: Get robot performance summary
this.mcpServer.tool(
"get_robot_summary",
{
robot_id: z.string().describe("ID of the robot")
},
async ({ robot_id }: { robot_id: string }) => {
try {
const [robotData, runsData] = await Promise.all([
this.makeApiRequest(`/api/robots/${robot_id}`),
this.makeApiRequest(`/api/robots/${robot_id}/runs`)
]);
const robot = robotData.robot;
const runs = runsData.runs.items;
const successfulRuns = runs.filter((run: any) => run.status === 'success');
const failedRuns = runs.filter((run: any) => run.status === 'failed');
let totalTextItems = 0;
let totalListItems = 0;
let totalScreenshots = 0;
successfulRuns.forEach((run: any) => {
if (run.data.textData) totalTextItems += run.data.textData.length;
if (run.data.listData) totalListItems += run.data.listData.length;
if (run.screenshots) totalScreenshots += run.screenshots.length;
});
const summary = `Robot Performance Summary:
Robot Name: ${robot.name}
Robot ID: ${robot.id}
Created: ${robot.createdAt ? new Date(robot.createdAt).toLocaleString() : 'N/A'}
Performance Metrics:
- Total Runs: ${runs.length}
- Successful Runs: ${successfulRuns.length}
- Failed Runs: ${failedRuns.length}
- Success Rate: ${runs.length > 0 ? ((successfulRuns.length / runs.length) * 100).toFixed(1) : 0}%
Data Extracted:
- Total Text Items: ${totalTextItems}
- Total List Items: ${totalListItems}
- Total Screenshots: ${totalScreenshots}
- Total Data Points: ${totalTextItems + totalListItems}
Input Parameters:
${JSON.stringify(robot.inputParameters, null, 2)}`;
return {
content: [{
type: "text",
text: summary
}]
};
} catch (error: any) {
return {
content: [{
type: "text",
text: `Error generating robot summary: ${error.message}`
}],
isError: true
};
}
}
);
}
async start() {
try {
const transport = new StdioServerTransport();
await this.mcpServer.connect(transport);
log('Maxun MCP Worker connected and ready');
} catch (error: any) {
log(`Failed to start MCP Worker: ${error.message}`);
throw error;
}
}
async stop() {
try {
await this.mcpServer.close();
log('Maxun MCP Worker stopped');
} catch (error: any) {
log(`Error stopping MCP Worker: ${error.message}`);
}
}
}
async function main() {
try {
const worker = new MaxunMCPWorker();
await worker.start();
// Handle graceful shutdown
process.on('SIGTERM', async () => {
await worker.stop();
process.exit(0);
});
process.on('SIGINT', async () => {
await worker.stop();
process.exit(0);
});
} catch (error) {
console.error('Failed to start MCP Worker:', error);
process.exit(1);
}
}
// Only start if this is run as a worker or directly
if (process.env.MCP_WORKER === 'true' || require.main === module) {
main();
}

View File

@@ -15,6 +15,19 @@ interface RobotWorkflow {
workflow: WhereWhatPair[];
}
interface WebhookConfig {
id: string;
url: string;
events: string[];
active: boolean;
createdAt: string;
updatedAt: string;
lastCalledAt?: string | null;
retryAttempts?: number;
retryDelay?: number;
timeout?: number;
}
interface RobotAttributes {
id: string;
userId?: number;
@@ -32,6 +45,7 @@ interface RobotAttributes {
airtable_refresh_token?: string | null;
schedule?: ScheduleConfig | null;
airtable_table_id?: string | null;
webhooks?: WebhookConfig[] | null;
}
interface ScheduleConfig {
@@ -66,6 +80,7 @@ class Robot extends Model<RobotAttributes, RobotCreationAttributes> implements R
public airtable_refresh_token!: string | null;
public airtable_table_id!: string | null;
public schedule!: ScheduleConfig | null;
public webhooks!: WebhookConfig[] | null;
}
Robot.init(
@@ -135,6 +150,11 @@ Robot.init(
type: DataTypes.JSONB,
allowNull: true,
},
webhooks: {
type: DataTypes.JSONB,
allowNull: true,
defaultValue: null,
},
},
{
sequelize,

View File

@@ -20,6 +20,7 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma
import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
import { RemoteBrowser } from './browser-management/classes/RemoteBrowser';
import { io as serverIo } from "./server";
import { sendWebhook } from './routes/webhook';
if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
throw new Error('Failed to start pgboss worker: one or more required environment variables are missing.');
@@ -55,7 +56,10 @@ interface AbortRunData {
runId: string;
}
const pgBoss = new PgBoss({connectionString: pgBossConnectionString });
const pgBoss = new PgBoss({
connectionString: pgBossConnectionString,
expireInHours: 23
});
/**
* Extract data safely from a job (single job or job array)
@@ -82,90 +86,128 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
};
/**
* Function to reset browser state without creating a new browser
* Helper function to extract and process scraped data from browser interpreter
*/
async function resetBrowserState(browser: RemoteBrowser): Promise<boolean> {
try {
const currentPage = browser.getCurrentPage();
if (!currentPage) {
logger.log('error', 'No current page available to reset browser state');
return false;
}
// Navigate to blank page to reset state
await currentPage.goto('about:blank', { waitUntil: 'networkidle', timeout: 10000 });
// Clear browser storage
await currentPage.evaluate(() => {
try {
localStorage.clear();
sessionStorage.clear();
} catch (e) {
// Ignore errors in cleanup
async function extractAndProcessScrapedData(
browser: RemoteBrowser,
run: any
): Promise<{
categorizedOutput: any;
uploadedBinaryOutput: any;
totalDataPointsExtracted: number;
totalSchemaItemsExtracted: number;
totalListItemsExtracted: number;
extractedScreenshotsCount: number;
}> {
let categorizedOutput: {
scrapeSchema: Record<string, any>;
scrapeList: Record<string, any>;
} = {
scrapeSchema: {},
scrapeList: {}
};
if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) {
browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => {
categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem;
});
}
if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) {
browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => {
categorizedOutput.scrapeList[`list-${index}`] = listItem;
});
}
const binaryOutput = browser?.interpreter?.binaryData?.reduce(
(reducedObject: Record<string, any>, item: any, index: number): Record<string, any> => {
return {
[`item-${index}`]: item,
...reducedObject,
};
},
{}
) || {};
let totalDataPointsExtracted = 0;
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
schemaResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalDataPointsExtracted += Object.keys(schemaResult).length;
totalSchemaItemsExtracted += 1;
}
});
// Clear cookies
const context = currentPage.context();
await context.clearCookies();
return true;
} catch (error) {
logger.log('error', `Failed to reset browser state`);
return false;
}
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
listResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalListItemsExtracted += listResult.length;
}
});
}
if (binaryOutput) {
extractedScreenshotsCount = Object.keys(binaryOutput).length;
totalDataPointsExtracted += extractedScreenshotsCount;
}
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(
run,
binaryOutput
);
return {
categorizedOutput: {
scrapeSchema: categorizedOutput.scrapeSchema || {},
scrapeList: categorizedOutput.scrapeList || {}
},
uploadedBinaryOutput,
totalDataPointsExtracted,
totalSchemaItemsExtracted,
totalListItemsExtracted,
extractedScreenshotsCount
};
}
/**
* Modified checkAndProcessQueuedRun function - only changes browser reset logic
*/
async function checkAndProcessQueuedRun(userId: string, browserId: string): Promise<boolean> {
// Helper function to handle integration updates
async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise<void> {
try {
// Find the oldest queued run for this specific browser
const queuedRun = await Run.findOne({
where: {
browserId: browserId,
runByUserId: userId,
status: 'queued'
},
order: [['startedAt', 'ASC']]
});
if (!queuedRun) {
logger.log('info', `No queued runs found for browser ${browserId}`);
return false;
}
// Reset the browser state before next run
const browser = browserPool.getRemoteBrowser(browserId);
if (browser) {
logger.log('info', `Resetting browser state for browser ${browserId} before next run`);
await resetBrowserState(browser);
}
// Update the queued run to running status
await queuedRun.update({
status: 'running',
log: 'Run started - using browser from previous run'
});
// Use user-specific queue
const userQueueName = `execute-run-user-${userId}`;
// Schedule the run execution
await pgBoss.createQueue(userQueueName);
const executeJobId = await pgBoss.send(userQueueName, {
userId: userId,
runId: queuedRun.runId,
browserId: browserId
});
logger.log('info', `Scheduled queued run ${queuedRun.runId} to use browser ${browserId}, job ID: ${executeJobId}`);
return true;
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Error checking for queued runs: ${errorMessage}`);
return false;
googleSheetUpdateTasks[runId] = {
robotId: robotMetaId,
runId: runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[runId] = {
robotId: robotMetaId,
runId: runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`);
}
}
@@ -173,10 +215,12 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom
* Modified processRunExecution function - only add browser reset
*/
async function processRunExecution(job: Job<ExecuteRunData>) {
try {
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
const BROWSER_INIT_TIMEOUT = 30000;
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
try {
// Find the run
const run = await Run.findOne({ where: { runId: data.runId } });
if (!run) {
@@ -190,48 +234,56 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
const plainRun = run.toJSON();
const browserId = data.browserId || plainRun.browserId;
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
logger.log('error', `Recording for run ${data.runId} not found`);
const currentRun = await Run.findOne({ where: { runId: data.runId } });
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed: Recording not found',
});
}
// Check for queued runs even if this one failed
await checkAndProcessQueuedRun(data.userId, data.browserId);
return { success: false };
if (!browserId) {
throw new Error(`No browser ID available for run ${data.runId}`);
}
// Get the browser and execute the run
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
let currentPage = browser?.getCurrentPage();
logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`);
let browser = browserPool.getRemoteBrowser(browserId);
const browserWaitStart = Date.now();
if (!browser || !currentPage) {
logger.log('error', `Browser or page not available for run ${data.runId}`);
// Even if this run failed, check for queued runs
await checkAndProcessQueuedRun(data.userId, data.browserId);
return { success: false };
while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) {
logger.log('debug', `Browser ${browserId} not ready yet, waiting...`);
await new Promise(resolve => setTimeout(resolve, 1000));
browser = browserPool.getRemoteBrowser(browserId);
}
try {
// Reset the browser state before executing this run
await resetBrowserState(browser);
if (!browser) {
throw new Error(`Browser ${browserId} not found in pool after timeout`);
}
logger.log('info', `Browser ${browserId} found and ready for execution`);
try {
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
throw new Error(`Recording for run ${data.runId} not found`);
}
const isRunAborted = async (): Promise<boolean> => {
const currentRun = await Run.findOne({ where: { runId: data.runId } });
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
};
let currentPage = browser.getCurrentPage();
const pageWaitStart = Date.now();
while (!currentPage && (Date.now() - pageWaitStart) < 30000) {
logger.log('debug', `Page not ready for browser ${browserId}, waiting...`);
await new Promise(resolve => setTimeout(resolve, 1000));
currentPage = browser.getCurrentPage();
}
if (!currentPage) {
throw new Error(`No current page available for browser ${browserId} after timeout`);
}
logger.log('info', `Starting workflow execution for run ${data.runId}`);
// Execute the workflow
const workflow = AddGeneratedFlags(recording.recording);
@@ -244,30 +296,27 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
if (!queuedRunProcessed) {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
}
await destroyRemoteBrowser(plainRun.browserId, data.userId);
return { success: true };
}
logger.log('info', `Workflow execution completed for run ${data.runId}`);
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
return { success: true };
}
const categorizedOutput = {
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
scrapeList: interpretationInfo.scrapeListOutput || {}
};
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
return { success: true };
}
await run.update({
...run,
status: 'success',
@@ -282,6 +331,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
});
// Track extraction metrics
let totalDataPointsExtracted = 0;
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
@@ -289,23 +339,35 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
schemaResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalDataPointsExtracted += Object.keys(schemaResult).length;
totalSchemaItemsExtracted += 1;
}
});
}
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
listResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalListItemsExtracted += listResult.length;
}
});
}
if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
totalDataPointsExtracted += extractedScreenshotsCount;
}
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
@@ -314,7 +376,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`);
// Capture metrics
capture(
'maxun-oss-run-created-manual',
@@ -330,103 +393,244 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
);
// Schedule updates for Google Sheets and Airtable
// Trigger webhooks for run completion
const webhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: recording.recording_meta.name,
status: 'success',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
extracted_data: {
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
captured_lists: categorizedOutput.scrapeList,
total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount,
total_data_points_extracted: totalDataPointsExtracted,
},
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
};
try {
googleSheetUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
await sendWebhook(plainRun.robotMetaId, 'run_completed', webhookPayload);
logger.log('info', `Webhooks sent successfully for completed run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send webhooks for run ${data.runId}: ${webhookError.message}`);
}
serverIo.of(plainRun.browserId).emit('run-completed', {
// Schedule updates for Google Sheets and Airtable
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
const completionData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording.recording_meta.name,
status: 'success',
finishedAt: new Date().toLocaleString()
});
// Check for and process queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
// Only destroy the browser if no queued run was found
if (!queuedRunProcessed) {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
}
};
serverIo.of(browserId).emit('run-completed', completionData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData);
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`);
return { success: true };
} catch (executionError: any) {
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
const currentRun = await Run.findOne({ where: { runId: data.runId } });
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
await run.update({
let partialDataExtracted = false;
let partialData: any = null;
let partialUpdateData: any = {
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`,
};
try {
if (browser && browser.interpreter) {
const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0;
const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0;
const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0;
if (hasSchemaData || hasListData || hasBinaryData) {
logger.log('info', `Extracting partial data from failed run ${data.runId}`);
partialData = await extractAndProcessScrapedData(browser, run);
partialUpdateData.serializableOutput = {
scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema),
scrapeList: Object.values(partialData.categorizedOutput.scrapeList),
};
partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput;
partialDataExtracted = true;
logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`);
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
}
}
} catch (partialDataError: any) {
logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`);
}
await run.update(partialUpdateData);
try {
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failureData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`,
});
// Capture failure metrics
capture(
'maxun-oss-run-created-manual',
{
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
}
);
} else {
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
hasPartialData: partialDataExtracted
};
serverIo.of(browserId).emit('run-completed', failureData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData);
} catch (emitError: any) {
logger.log('warn', `Failed to emit failure event: ${emitError.message}`);
}
// Check for queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
// Only destroy the browser if no queued run was found
if (!queuedRunProcessed) {
try {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
} catch (cleanupError: any) {
logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`);
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: executionError.message,
stack: executionError.stack,
type: 'ExecutionError',
},
partial_data_extracted: partialDataExtracted,
extracted_data: partialDataExtracted ? {
captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [],
captured_lists: partialUpdateData.serializableOutput?.scrapeList || {},
total_data_points_extracted: partialData?.totalDataPointsExtracted || 0,
captured_texts_count: partialData?.totalSchemaItemsExtracted || 0,
captured_lists_count: partialData?.totalListItemsExtracted || 0,
screenshots_count: partialData?.extractedScreenshotsCount || 0
} : null,
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
return { success: false };
try {
const failureSocketData = {
runId: data.runId,
robotMetaId: run.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString()
};
serverIo.of(run.browserId).emit('run-completed', failureSocketData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData);
} catch (socketError: any) {
logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`);
}
capture('maxun-oss-run-created-manual', {
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
partial_data_extracted: partialDataExtracted,
totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0,
});
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after failed run`);
return { success: false, partialDataExtracted };
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to process run execution job: ${errorMessage}`);
try {
const run = await Run.findOne({ where: { runId: data.runId }});
if (run) {
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${errorMessage}`,
});
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failedWebhookPayload = {
robot_id: run.robotMetaId,
run_id: data.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: run.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: errorMessage,
},
metadata: {
browser_id: run.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
try {
const failureSocketData = {
runId: data.runId,
robotMetaId: run.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString()
};
serverIo.of(run.browserId).emit('run-completed', failureSocketData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData);
} catch (socketError: any) {
logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`);
}
}
} catch (updateError: any) {
logger.log('error', `Failed to update run status: ${updateError.message}`);
}
return { success: false };
}
}
async function abortRun(runId: string, userId: string): Promise<boolean> {
try {
const run = await Run.findOne({
where: {
runId: runId,
runByUserId: userId
}
});
const run = await Run.findOne({ where: { runId: runId } });
if (!run) {
logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`);
@@ -477,32 +681,9 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
}
let currentLog = 'Run aborted by user';
let categorizedOutput = {
scrapeSchema: {},
scrapeList: {},
};
let binaryOutput: Record<string, any> = {};
try {
if (browser.interpreter) {
if (browser.interpreter.debugMessages) {
currentLog = browser.interpreter.debugMessages.join('\n') || currentLog;
}
if (browser.interpreter.serializableDataByType) {
categorizedOutput = {
scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []),
scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []),
};
}
if (browser.interpreter.binaryData) {
binaryOutput = collectBinaryData(browser.interpreter.binaryData);
}
}
} catch (interpreterError) {
logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`);
}
const extractedData = await extractAndProcessScrapedData(browser, run);
console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`);
await run.update({
status: 'aborted',
@@ -510,12 +691,16 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
browserId: plainRun.browserId,
log: currentLog,
serializableOutput: {
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
scrapeList: Object.values(categorizedOutput.scrapeList),
scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema),
scrapeList: Object.values(extractedData.categorizedOutput.scrapeList),
},
binaryOutput,
binaryOutput: extractedData.uploadedBinaryOutput,
});
if (extractedData.totalDataPointsExtracted > 0) {
await triggerIntegrationUpdates(runId, plainRun.robotMetaId);
}
try {
serverIo.of(plainRun.browserId).emit('run-aborted', {
runId,
@@ -527,22 +712,13 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
}
let queuedRunProcessed = false;
try {
queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId);
} catch (queueError) {
logger.log('warn', `Error checking queued runs: ${queueError}`);
}
if (!queuedRunProcessed) {
try {
await new Promise(resolve => setTimeout(resolve, 500));
await destroyRemoteBrowser(plainRun.browserId, userId);
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
} catch (cleanupError) {
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
}
await new Promise(resolve => setTimeout(resolve, 500));
await destroyRemoteBrowser(plainRun.browserId, userId);
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
} catch (cleanupError) {
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
}
return true;
@@ -553,30 +729,6 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
}
}
/**
* Helper function to collect data from arrays into indexed objects
* @param dataArray Array of data to be transformed into an object with indexed keys
* @returns Object with indexed keys
*/
function collectDataByType(dataArray: any[]): Record<string, any> {
return dataArray.reduce((result: Record<string, any>, item, index) => {
result[`item-${index}`] = item;
return result;
}, {});
}
/**
* Helper function to collect binary data (like screenshots)
* @param binaryDataArray Array of binary data objects to be transformed
* @returns Object with indexed keys
*/
function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record<string, any> {
return binaryDataArray.reduce((result: Record<string, any>, item, index) => {
result[`item-${index}`] = item;
return result;
}, {});
}
async function registerRunExecutionWorker() {
try {
const registeredUserQueues = new Map();

View File

@@ -33,6 +33,14 @@ router.post("/register", async (req, res) => {
});
}
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!emailRegex.test(email)) {
return res.status(400).json({
error: "VALIDATION_ERROR",
code: "register.validation.invalid_email_format"
});
}
if (!password || password.length < 6) {
return res.status(400).json({
error: "VALIDATION_ERROR",
@@ -74,16 +82,16 @@ router.post("/register", async (req, res) => {
res.cookie("token", token, {
httpOnly: true,
});
capture("maxun-oss-user-registered", {
email: user.email,
userId: user.id,
registeredAt: new Date().toISOString(),
});
console.log(`User registered`);
res.json(user);
} catch (error: any) {
console.log(`Could not register user - ${error}`);
return res.status(500).json({
@@ -150,23 +158,23 @@ router.post("/login", async (req, res) => {
});
router.get("/logout", async (req, res) => {
try {
res.clearCookie("token");
return res.status(200).json({
ok: true,
message: "Logged out successfully",
code: "success"
});
} catch (error) {
console.error('Logout error:', error);
return res.status(500).json({
ok: false,
message: "Error during logout",
code: "server",
error: process.env.NODE_ENV === 'development' ? error : undefined
});
}
try {
res.clearCookie("token");
return res.status(200).json({
ok: true,
message: "Logged out successfully",
code: "success"
});
} catch (error) {
console.error('Logout error:', error);
return res.status(500).json({
ok: false,
message: "Error during logout",
code: "server",
error: process.env.NODE_ENV === 'development' ? error : undefined
});
}
}
);
router.get(
@@ -678,7 +686,7 @@ router.get("/airtable", requireSignIn, (req: Request, res) => {
router.get("/airtable/callback", requireSignIn, async (req: Request, res) => {
const authenticatedReq = req as AuthenticatedRequest;
const baseUrl = process.env.PUBLIC_URL || "http://localhost:5173";
try {
const { code, state, error } = authenticatedReq.query;
@@ -694,7 +702,7 @@ router.get("/airtable/callback", requireSignIn, async (req: Request, res) => {
// Verify session data
if (!authenticatedReq.session?.code_verifier || authenticatedReq.session.robotId !== state.toString()) {
return res.status(400).json({
return res.status(400).json({
message: "Session expired - please restart the OAuth flow"
});
}
@@ -708,7 +716,7 @@ router.get("/airtable/callback", requireSignIn, async (req: Request, res) => {
body: new URLSearchParams({
grant_type: "authorization_code",
code: code.toString(),
client_id: process.env.AIRTABLE_CLIENT_ID!,
client_id: process.env.AIRTABLE_CLIENT_ID!,
redirect_uri: process.env.AIRTABLE_REDIRECT_URI!,
code_verifier: authenticatedReq.session.code_verifier
}),
@@ -811,7 +819,7 @@ router.get("/airtable/bases", requireSignIn, async (req: Request, res) => {
// Update robot with selected base
router.post("/airtable/update", requireSignIn, async (req: Request, res) => {
const authenticatedReq = req as AuthenticatedRequest;
const { baseId, robotId , baseName, tableName, tableId} = req.body;
const { baseId, robotId, baseName, tableName, tableId } = req.body;
if (!baseId || !robotId) {
return res.status(400).json({ message: "Base ID and Robot ID are required" });

View File

@@ -4,6 +4,7 @@ import { router as storage } from './storage';
import { router as auth } from './auth';
import { router as integration } from './integration';
import { router as proxy } from './proxy';
import { router as webhook } from './webhook';
export {
record,
@@ -11,5 +12,6 @@ export {
storage,
auth,
integration,
proxy
proxy,
webhook
};

View File

@@ -11,6 +11,7 @@ import {
getRemoteBrowserCurrentTabs,
getActiveBrowserIdByState,
destroyRemoteBrowser,
canCreateBrowserInState,
} from '../browser-management/controller';
import { chromium } from 'playwright-extra';
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
@@ -181,6 +182,18 @@ router.get('/active', requireSignIn, (req: AuthenticatedRequest, res) => {
return res.send(id);
});
/**
* GET endpoint for checking if the user can create a new remote browser.
*/
router.get('/can-create/:state', requireSignIn, (req: AuthenticatedRequest, res) => {
if (!req.user) {
return res.status(401).send('User not authenticated');
}
const state = req.params.state as "recording" | "run";
const canCreate = canCreateBrowserInState(req.user.id, state);
return res.json({ canCreate });
});
/**
* GET endpoint for getting the current url of the active remote browser.
*/

View File

@@ -1,10 +1,10 @@
import { Router } from 'express';
import logger from "../logger";
import { createRemoteBrowserForRun, getActiveBrowserIdByState } from "../browser-management/controller";
import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserIdByState } from "../browser-management/controller";
import { chromium } from 'playwright-extra';
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
import { browserPool } from "../server";
import { uuid } from "uuidv4";
import { v4 as uuid } from "uuid";
import moment from 'moment-timezone';
import cron from 'node-cron';
import { getDecryptedProxyConfig } from './proxy';
@@ -23,7 +23,7 @@ chromium.use(stealthPlugin());
export const router = Router();
export const processWorkflowActions = async (workflow: any[], checkLimit: boolean = false): Promise<any[]> => {
const processedWorkflow = JSON.parse(JSON.stringify(workflow));
const processedWorkflow = JSON.parse(JSON.stringify(workflow));
processedWorkflow.forEach((pair: any) => {
pair.what.forEach((action: any) => {
@@ -108,52 +108,52 @@ router.get('/recordings/:id', requireSignIn, async (req, res) => {
router.get(('/recordings/:id/runs'), requireSignIn, async (req, res) => {
try {
const runs = await Run.findAll({
where: {
robotMetaId: req.params.id
},
raw: true
where: {
robotMetaId: req.params.id
},
raw: true
});
const formattedRuns = runs.map(formatRunResponse);
const response = {
statusCode: 200,
messageCode: "success",
runs: {
statusCode: 200,
messageCode: "success",
runs: {
totalCount: formattedRuns.length,
items: formattedRuns,
},
},
};
res.status(200).json(response);
} catch (error) {
} catch (error) {
console.error("Error fetching runs:", error);
res.status(500).json({
statusCode: 500,
messageCode: "error",
message: "Failed to retrieve runs",
statusCode: 500,
messageCode: "error",
message: "Failed to retrieve runs",
});
}
}
})
function formatRunResponse(run: any) {
const formattedRun = {
id: run.id,
status: run.status,
name: run.name,
robotId: run.robotMetaId, // Renaming robotMetaId to robotId
startedAt: run.startedAt,
finishedAt: run.finishedAt,
runId: run.runId,
runByUserId: run.runByUserId,
runByScheduleId: run.runByScheduleId,
runByAPI: run.runByAPI,
data: {},
screenshot: null,
id: run.id,
status: run.status,
name: run.name,
robotId: run.robotMetaId, // Renaming robotMetaId to robotId
startedAt: run.startedAt,
finishedAt: run.finishedAt,
runId: run.runId,
runByUserId: run.runByUserId,
runByScheduleId: run.runByScheduleId,
runByAPI: run.runByAPI,
data: {},
screenshot: null,
};
if (run.serializableOutput && run.serializableOutput['item-0']) {
formattedRun.data = run.serializableOutput['item-0'];
formattedRun.data = run.serializableOutput['item-0'];
} else if (run.binaryOutput && run.binaryOutput['item-0']) {
formattedRun.screenshot = run.binaryOutput['item-0'];
formattedRun.screenshot = run.binaryOutput['item-0'];
}
return formattedRun;
@@ -170,81 +170,81 @@ interface Credentials {
function handleWorkflowActions(workflow: any[], credentials: Credentials) {
return workflow.map(step => {
if (!step.what) return step;
if (!step.what) return step;
const newWhat: any[] = [];
const processedSelectors = new Set<string>();
for (let i = 0; i < step.what.length; i++) {
const action = step.what[i];
if (!action?.action || !action?.args?.[0]) {
newWhat.push(action);
continue;
}
const newWhat: any[] = [];
const processedSelectors = new Set<string>();
const selector = action.args[0];
const credential = credentials[selector];
for (let i = 0; i < step.what.length; i++) {
const action = step.what[i];
if (!credential) {
newWhat.push(action);
continue;
}
if (action.action === 'click') {
newWhat.push(action);
if (!processedSelectors.has(selector) &&
i + 1 < step.what.length &&
(step.what[i + 1].action === 'type' || step.what[i + 1].action === 'press')) {
newWhat.push({
action: 'type',
args: [selector, encrypt(credential.value), credential.type]
});
newWhat.push({
action: 'waitForLoadState',
args: ['networkidle']
});
processedSelectors.add(selector);
while (i + 1 < step.what.length &&
(step.what[i + 1].action === 'type' ||
step.what[i + 1].action === 'press' ||
step.what[i + 1].action === 'waitForLoadState')) {
i++;
}
}
} else if ((action.action === 'type' || action.action === 'press') &&
!processedSelectors.has(selector)) {
newWhat.push({
action: 'type',
args: [selector, encrypt(credential.value), credential.type]
});
newWhat.push({
action: 'waitForLoadState',
args: ['networkidle']
});
processedSelectors.add(selector);
// Skip subsequent type/press/waitForLoadState actions for this selector
while (i + 1 < step.what.length &&
(step.what[i + 1].action === 'type' ||
step.what[i + 1].action === 'press' ||
step.what[i + 1].action === 'waitForLoadState')) {
i++;
}
}
if (!action?.action || !action?.args?.[0]) {
newWhat.push(action);
continue;
}
return {
...step,
what: newWhat
};
const selector = action.args[0];
const credential = credentials[selector];
if (!credential) {
newWhat.push(action);
continue;
}
if (action.action === 'click') {
newWhat.push(action);
if (!processedSelectors.has(selector) &&
i + 1 < step.what.length &&
(step.what[i + 1].action === 'type' || step.what[i + 1].action === 'press')) {
newWhat.push({
action: 'type',
args: [selector, encrypt(credential.value), credential.type]
});
newWhat.push({
action: 'waitForLoadState',
args: ['networkidle']
});
processedSelectors.add(selector);
while (i + 1 < step.what.length &&
(step.what[i + 1].action === 'type' ||
step.what[i + 1].action === 'press' ||
step.what[i + 1].action === 'waitForLoadState')) {
i++;
}
}
} else if ((action.action === 'type' || action.action === 'press') &&
!processedSelectors.has(selector)) {
newWhat.push({
action: 'type',
args: [selector, encrypt(credential.value), credential.type]
});
newWhat.push({
action: 'waitForLoadState',
args: ['networkidle']
});
processedSelectors.add(selector);
// Skip subsequent type/press/waitForLoadState actions for this selector
while (i + 1 < step.what.length &&
(step.what[i + 1].action === 'type' ||
step.what[i + 1].action === 'press' ||
step.what[i + 1].action === 'waitForLoadState')) {
i++;
}
}
}
return {
...step,
what: newWhat
};
});
}
@@ -275,7 +275,7 @@ router.put('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, r
if (targetUrl) {
const updatedWorkflow = [...robot.recording.workflow];
for (let i = updatedWorkflow.length - 1; i >= 0; i--) {
const step = updatedWorkflow[i];
for (let j = 0; j < step.what.length; j++) {
@@ -286,7 +286,7 @@ router.put('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, r
if (step.where?.url && step.where.url !== "about:blank") {
step.where.url = targetUrl;
}
robot.set('recording', { ...robot.recording, workflow: updatedWorkflow });
robot.changed('recording', true);
i = -1;
@@ -307,16 +307,16 @@ router.put('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, r
if (limits && Array.isArray(limits) && limits.length > 0) {
for (const limitInfo of limits) {
const { pairIndex, actionIndex, argIndex, limit } = limitInfo;
const pair = workflow[pairIndex];
if (!pair || !pair.what) continue;
const action = pair.what[actionIndex];
if (!action || !action.args) continue;
const arg = action.args[argIndex];
if (!arg || typeof arg !== 'object') continue;
(arg as { limit: number }).limit = limit;
}
}
@@ -384,7 +384,7 @@ router.post('/recordings/:id/duplicate', requireSignIn, async (req: Authenticate
step.what.forEach((action) => {
if (action.action === "goto" && action.args?.length) {
action.args[0] = targetUrl;
action.args[0] = targetUrl;
}
});
@@ -394,22 +394,22 @@ router.post('/recordings/:id/duplicate', requireSignIn, async (req: Authenticate
const currentTimestamp = new Date().toLocaleString();
const newRobot = await Robot.create({
id: uuid(),
userId: originalRobot.userId,
id: uuid(),
userId: originalRobot.userId,
recording_meta: {
...originalRobot.recording_meta,
id: uuid(),
name: `${originalRobot.recording_meta.name} (${lastWord})`,
createdAt: currentTimestamp,
updatedAt: currentTimestamp,
},
recording: { ...originalRobot.recording, workflow },
google_sheet_email: null,
createdAt: currentTimestamp,
updatedAt: currentTimestamp,
},
recording: { ...originalRobot.recording, workflow },
google_sheet_email: null,
google_sheet_name: null,
google_sheet_id: null,
google_access_token: null,
google_refresh_token: null,
schedule: null,
schedule: null,
});
logger.log('info', `Robot with ID ${id} duplicated successfully as ${newRobot.id}.`);
@@ -517,98 +517,124 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) =>
return res.status(401).send({ error: 'Unauthorized' });
}
const proxyConfig = await getDecryptedProxyConfig(req.user.id);
let proxyOptions: any = {};
if (proxyConfig.proxy_url) {
proxyOptions = {
server: proxyConfig.proxy_url,
...(proxyConfig.proxy_username && proxyConfig.proxy_password && {
username: proxyConfig.proxy_username,
password: proxyConfig.proxy_password,
}),
};
}
console.log(`Proxy config for run: ${JSON.stringify(proxyOptions)}`);
// Generate runId first
const runId = uuid();
// Check if user has reached browser limit
const userBrowserIds = browserPool.getAllBrowserIdsForUser(req.user.id);
const canCreateBrowser = userBrowserIds.length < 2;
if (canCreateBrowser) {
// User has available browser slots, create it directly
const id = createRemoteBrowserForRun(req.user.id);
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(req.user.id, "run");
const run = await Run.create({
status: 'running',
if (canCreateBrowser) {
let browserId: string;
try {
browserId = await createRemoteBrowserForRun(req.user.id);
if (!browserId || browserId.trim() === '') {
throw new Error('Failed to generate valid browser ID');
}
logger.log('info', `Created browser ${browserId} for run ${runId}`);
} catch (browserError: any) {
logger.log('error', `Failed to create browser: ${browserError.message}`);
return res.status(500).send({ error: 'Failed to create browser instance' });
}
try {
await Run.create({
status: 'running',
name: recording.recording_meta.name,
robotId: recording.id,
robotMetaId: recording.recording_meta.id,
startedAt: new Date().toLocaleString(),
finishedAt: '',
browserId: browserId,
interpreterSettings: req.body,
log: '',
runId,
runByUserId: req.user.id,
serializableOutput: {},
binaryOutput: {},
});
logger.log('info', `Created run ${runId} with browser ${browserId}`);
} catch (dbError: any) {
logger.log('error', `Database error creating run: ${dbError.message}`);
try {
await destroyRemoteBrowser(browserId, req.user.id);
} catch (cleanupError: any) {
logger.log('warn', `Failed to cleanup browser after run creation failure: ${cleanupError.message}`);
}
return res.status(500).send({ error: 'Failed to create run record' });
}
try {
const userQueueName = `execute-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, {
userId: req.user.id,
runId: runId,
browserId: browserId,
});
logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`);
} catch (queueError: any) {
logger.log('error', `Failed to queue run execution: ${queueError.message}`);
try {
await Run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed to queue execution job'
}, { where: { runId: runId } });
await destroyRemoteBrowser(browserId, req.user.id);
} catch (cleanupError: any) {
logger.log('warn', `Failed to cleanup after queue error: ${cleanupError.message}`);
}
return res.status(503).send({ error: 'Unable to queue run, please try again later' });
}
return res.send({
browserId: browserId,
runId: runId,
robotMetaId: recording.recording_meta.id,
queued: false
});
} else {
const browserId = uuid();
await Run.create({
status: 'queued',
name: recording.recording_meta.name,
robotId: recording.id,
robotMetaId: recording.recording_meta.id,
startedAt: new Date().toLocaleString(),
finishedAt: '',
browserId: id,
browserId,
interpreterSettings: req.body,
log: '',
log: 'Run queued - waiting for available browser slot',
runId,
runByUserId: req.user.id,
serializableOutput: {},
binaryOutput: {},
});
const plainRun = run.toJSON();
return res.send({
browserId: id,
runId: plainRun.runId,
browserId: browserId,
runId: runId,
robotMetaId: recording.recording_meta.id,
queued: false
queued: true
});
} else {
const browserId = getActiveBrowserIdByState(req.user.id, "run")
if (browserId) {
// User has reached the browser limit, queue the run
try {
// Create the run record with 'queued' status
await Run.create({
status: 'queued',
name: recording.recording_meta.name,
robotId: recording.id,
robotMetaId: recording.recording_meta.id,
startedAt: new Date().toLocaleString(),
finishedAt: '',
browserId: browserId, // Random will be updated later
interpreterSettings: req.body,
log: 'Run queued - waiting for available browser slot',
runId,
runByUserId: req.user.id,
serializableOutput: {},
binaryOutput: {},
});
return res.send({
browserId: browserId,
runId: runId,
robotMetaId: recording.recording_meta.id,
queued: true,
});
} catch (queueError: any) {
logger.log('error', `Failed to queue run job: ${queueError.message}`);
return res.status(503).send({ error: 'Unable to queue run, please try again later' });
}
} else {
logger.log('info', "Browser id does not exist");
return res.send('');
}
}
}
} catch (e) {
const { message } = e as Error;
logger.log('info', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
return res.send('');
logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
return res.status(500).send({ error: 'Internal server error' });
}
});
@@ -664,17 +690,17 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re
// Queue the execution job
await pgBoss.createQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, {
userId: req.user.id,
runId: req.params.id,
browserId: plainRun.browserId
});
logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${req.params.id}`);
} catch (queueError: any) {
logger.log('error', `Failed to queue run execution`);
}
} catch (e) {
const { message } = e as Error;
@@ -891,31 +917,128 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
try {
if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); }
const run = await Run.findOne({ where: {
runId: req.params.id,
runByUserId: req.user.id,
} });
const run = await Run.findOne({ where: { runId: req.params.id } });
if (!run) {
return res.status(404).send(false);
return res.status(404).send({ error: 'Run not found' });
}
const userQueueName = `abort-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName);
await pgBoss.send(userQueueName, {
userId: req.user.id,
runId: req.params.id
});
if (!['running', 'queued'].includes(run.status)) {
return res.status(400).send({
error: `Cannot abort run with status: ${run.status}`
});
}
const isQueued = run.status === 'queued';
await run.update({
status: 'aborting'
});
if (isQueued) {
await run.update({
status: 'aborted',
finishedAt: new Date().toLocaleString(),
log: 'Run aborted while queued'
});
return res.send({
success: true,
message: 'Queued run aborted',
isQueued: true
});
}
const userQueueName = `abort-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, {
userId: req.user.id,
runId: req.params.id
});
logger.log('info', `Abort signal sent for run ${req.params.id}, job ID: ${jobId}`);
return res.send({
success: true,
message: 'Abort signal sent',
jobId,
isQueued: false
});
return res.send(true);
} catch (e) {
const { message } = e as Error;
logger.log('info', `Error while aborting run with id: ${req.params.id} - ${message}`);
return res.send(false);
logger.log('error', `Error aborting run ${req.params.id}: ${message}`);
return res.status(500).send({ error: 'Failed to abort run' });
}
});
async function processQueuedRuns() {
try {
const queuedRun = await Run.findOne({
where: { status: 'queued' },
order: [['startedAt', 'ASC']]
});
if (!queuedRun) return;
const userId = queuedRun.runByUserId;
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run");
if (canCreateBrowser) {
logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`);
const recording = await Robot.findOne({
where: {
'recording_meta.id': queuedRun.robotMetaId
},
raw: true
});
if (!recording) {
await queuedRun.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Recording not found'
});
return;
}
try {
const newBrowserId = await createRemoteBrowserForRun(userId);
logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`);
await queuedRun.update({
status: 'running',
browserId: newBrowserId,
log: 'Browser created and ready for execution'
});
const userQueueName = `execute-run-user-${userId}`;
await pgBoss.createQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, {
userId: userId,
runId: queuedRun.runId,
browserId: newBrowserId,
});
logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`);
} catch (browserError: any) {
logger.log('error', `Failed to create browser for queued run: ${browserError.message}`);
await queuedRun.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed to create browser: ${browserError.message}`
});
}
}
} catch (error: any) {
logger.log('error', `Error processing queued runs: ${error.message}`);
}
}
export { processQueuedRuns };

View File

@@ -0,0 +1,493 @@
import { Router, Request, Response } from 'express';
import Robot from '../models/Robot';
import { requireSignIn } from '../middlewares/auth';
import axios from 'axios';
import { v4 as uuid } from "uuid";
export const router = Router();
interface AuthenticatedRequest extends Request {
user?: { id: string };
}
interface WebhookConfig {
id: string;
url: string;
events: string[];
active: boolean;
createdAt: string;
updatedAt: string;
lastCalledAt?: string | null;
retryAttempts?: number;
retryDelay?: number;
timeout?: number;
}
const updateWebhookLastCalled = async (robotId: string, webhookId: string): Promise<void> => {
try {
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot || !robot.webhooks) {
return;
}
const updatedWebhooks = robot.webhooks.map((w: WebhookConfig) => {
if (w.id === webhookId) {
return {
...w,
lastCalledAt: new Date().toISOString()
};
}
return w;
});
await robot.update({ webhooks: updatedWebhooks });
} catch (error) {
console.error('Error updating webhook lastCalledAt:', error);
}
};
// Add new webhook
router.post('/add', requireSignIn, async (req: Request, res: Response) => {
const { webhook, robotId } = req.body;
const authenticatedReq = req as AuthenticatedRequest;
try {
if (!authenticatedReq.user) {
return res.status(401).json({ ok: false, error: 'Unauthorized' });
}
if (!webhook || !robotId) {
return res.status(400).json({ ok: false, error: 'Webhook configuration and robot ID are required' });
}
if (!webhook.url) {
return res.status(400).json({ ok: false, error: 'Webhook URL is required' });
}
// Validate URL format
try {
new URL(webhook.url);
} catch (error) {
return res.status(400).json({ ok: false, error: 'Invalid webhook URL format' });
}
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot) {
return res.status(404).json({ ok: false, error: 'Robot not found' });
}
const currentWebhooks = robot.webhooks || [];
const existingWebhook = currentWebhooks.find((w: WebhookConfig) => w.url === webhook.url);
if (existingWebhook) {
return res.status(400).json({ ok: false, error: 'Webhook with this url already exists' });
}
const newWebhook: WebhookConfig = {
...webhook,
id: webhook.id || uuid(),
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
lastCalledAt: null,
retryAttempts: webhook.retryAttempts || 3,
retryDelay: webhook.retryDelay || 5,
timeout: webhook.timeout || 30,
};
const updatedWebhooks = [...currentWebhooks, newWebhook];
await robot.update({ webhooks: updatedWebhooks });
res.status(200).json({
ok: true,
message: 'Webhook added successfully',
webhook: newWebhook
});
} catch (error: any) {
console.log(`Could not add webhook - ${error}`);
res.status(500).json({ ok: false, error: 'Could not add webhook configuration' });
}
});
// Update existing webhook
router.post('/update', requireSignIn, async (req: Request, res: Response) => {
const { webhook, robotId } = req.body;
const authenticatedReq = req as AuthenticatedRequest;
try {
if (!authenticatedReq.user) {
return res.status(401).json({ ok: false, error: 'Unauthorized' });
}
if (!webhook || !robotId || !webhook.id) {
return res.status(400).json({ ok: false, error: 'Webhook configuration, webhook ID, and robot ID are required' });
}
// Validate URL format if provided
if (webhook.url) {
try {
new URL(webhook.url);
} catch (error) {
return res.status(400).json({ ok: false, error: 'Invalid webhook URL format' });
}
}
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot) {
return res.status(404).json({ ok: false, error: 'Robot not found' });
}
const currentWebhooks = robot.webhooks || [];
const webhookIndex = currentWebhooks.findIndex((w: WebhookConfig) => w.id === webhook.id);
if (webhookIndex === -1) {
return res.status(404).json({ ok: false, error: 'Webhook not found' });
}
// Check for duplicate URLs (excluding current webhook)
const duplicateUrl = currentWebhooks.find((w: WebhookConfig, index: number) =>
w.url === webhook.url && index !== webhookIndex
);
if (duplicateUrl) {
return res.status(400).json({ ok: false, error: 'Webhook with this URL already exists' });
}
const updatedWebhook: WebhookConfig = {
...currentWebhooks[webhookIndex],
...webhook,
updatedAt: new Date().toISOString(),
lastCalledAt: currentWebhooks[webhookIndex].lastCalledAt
};
const updatedWebhooks = [...currentWebhooks];
updatedWebhooks[webhookIndex] = updatedWebhook;
await robot.update({ webhooks: updatedWebhooks });
res.status(200).json({
ok: true,
message: 'Webhook updated successfully',
webhook: updatedWebhook
});
} catch (error: any) {
console.log(`Could not update webhook - ${error}`);
res.status(500).json({ ok: false, error: 'Could not update webhook configuration' });
}
});
// Remove webhook
router.post('/remove', requireSignIn, async (req: Request, res: Response) => {
const { webhookId, robotId } = req.body;
const authenticatedReq = req as AuthenticatedRequest;
try {
if (!authenticatedReq.user) {
return res.status(401).json({ ok: false, error: 'Unauthorized' });
}
if (!webhookId || !robotId) {
return res.status(400).json({ ok: false, error: 'Webhook ID and robot ID are required' });
}
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot) {
return res.status(404).json({ ok: false, error: 'Robot not found' });
}
const currentWebhooks = robot.webhooks || [];
const webhookExists = currentWebhooks.find((w: WebhookConfig) => w.id === webhookId);
if (!webhookExists) {
return res.status(404).json({ ok: false, error: 'Webhook not found' });
}
const updatedWebhooks = currentWebhooks.filter((w: WebhookConfig) => w.id !== webhookId);
await robot.update({ webhooks: updatedWebhooks });
res.status(200).json({
ok: true,
message: 'Webhook removed successfully'
});
} catch (error: any) {
console.log(`Could not remove webhook - ${error}`);
res.status(500).json({ ok: false, error: 'Could not remove webhook configuration' });
}
});
// Get all webhooks for a robot
router.get('/list/:robotId', requireSignIn, async (req: Request, res: Response) => {
const { robotId } = req.params;
const authenticatedReq = req as AuthenticatedRequest;
try {
if (!authenticatedReq.user) {
return res.status(401).json({ ok: false, error: 'Unauthorized' });
}
const robot = await Robot.findOne({
where: { 'recording_meta.id': robotId },
attributes: ['webhooks']
});
if (!robot) {
return res.status(404).json({ ok: false, error: 'Robot not found' });
}
const webhooks = robot.webhooks || [];
res.status(200).json({
ok: true,
webhooks: webhooks
});
} catch (error: any) {
console.log(`Could not retrieve webhooks - ${error}`);
res.status(500).json({ ok: false, error: 'Could not retrieve webhook configurations' });
}
});
// Test webhook endpoint
router.post('/test', requireSignIn, async (req: Request, res: Response) => {
const { webhook, robotId } = req.body;
const authenticatedReq = req as AuthenticatedRequest;
try {
if (!authenticatedReq.user) {
return res.status(401).json({ ok: false, error: 'Unauthorized' });
}
if (!webhook || !robotId) {
return res.status(400).json({ ok: false, error: 'Webhook configuration and robot ID are required' });
}
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot) {
return res.status(404).json({ ok: false, error: 'Robot not found' });
}
// Create test payload
const testPayload = {
event_type: "webhook_test",
timestamp: new Date().toISOString(),
webhook_id: webhook.id,
data: {
robot_id: robotId,
run_id: "110c4dae-c39b-4b30-a932-eff1022e4bb0",
robot_name: robot.recording_meta?.name || "E-commerce Product Scraper",
status: "test",
started_at: new Date(Date.now() - 45000).toISOString(),
finished_at: new Date().toISOString(),
extracted_data: {
captured_texts: [
{
"Product Name": "MacBook Pro 16-inch M3 Max",
"Price": "$3,999.00",
"Rating": "4.8/5 stars",
"Availability": "In Stock - Ships within 2-3 business days",
"SKU": "MBPM3-16-1TB-SLV",
"Description": "The most powerful MacBook Pro ever is here. With the blazing-fast M3 Max chip, pro-level performance has never been more portable."
}
],
captured_lists: {
"list_1": [
{
"Rank": "1",
"Product": "MacBook Air M2",
"Category": "Laptops",
"Units Sold": "2,847",
"Revenue": "$2,847,000"
},
{
"Rank": "2",
"Product": "iPhone 15",
"Category": "Smartphones",
"Units Sold": "1,923",
"Revenue": "$1,923,000"
},
{
"Rank": "3",
"Product": "iPad Pro 12.9",
"Category": "Tablets",
"Units Sold": "1,456",
"Revenue": "$1,456,000"
}
],
"list_0": [
{
"Customer": "Sarah M.",
"Rating": "5 stars",
"Review": "Absolutely love my new MacBook! The battery life is incredible and the performance is outstanding.",
"Date": "2024-12-15",
"Verified Purchase": "Yes"
},
{
"Customer": "John D.",
"Rating": "4 stars",
"Review": "Great phone overall, but wish the battery lasted a bit longer with heavy usage.",
"Date": "2024-12-14",
"Verified Purchase": "Yes"
},
{
"Customer": "Emily R.",
"Rating": "5 stars",
"Review": "The camera quality is phenomenal! Perfect for my photography business.",
"Date": "2024-12-13",
"Verified Purchase": "Yes"
}
],
},
total_rows: 11,
captured_texts_count: 5,
captured_lists_count: 6,
screenshots_count: 5
},
metadata: {
test_mode: true,
browser_id: "d27ace57-75cb-441c-8589-8ba34e52f7d1",
user_id: 108,
}
}
};
await updateWebhookLastCalled(robotId, webhook.id);
const response = await axios.post(webhook.url, testPayload, {
timeout: (webhook.timeout || 30) * 1000,
validateStatus: (status) => status < 500
});
const success = response.status >= 200 && response.status < 300;
res.status(200).json({
ok: true,
message: success ? 'Test webhook sent successfully' : 'Webhook endpoint responded with non-success status',
details: {
status: response.status,
statusText: response.statusText,
success: success
}
});
} catch (error: any) {
console.log(`Could not test webhook - ${error}`);
try {
await updateWebhookLastCalled(robotId, webhook.id);
} catch (updateError) {
console.error('Failed to update lastCalledAt after webhook error:', updateError);
}
let errorMessage = 'Could not send test webhook';
if (error.code === 'ECONNREFUSED') {
errorMessage = 'Connection refused - webhook URL is not accessible';
} else if (error.code === 'ETIMEDOUT') {
errorMessage = 'Request timeout - webhook endpoint did not respond in time';
} else if (error.response) {
errorMessage = `Webhook endpoint responded with error: ${error.response.status} ${error.response.statusText}`;
}
res.status(500).json({
ok: false,
error: errorMessage,
details: {
code: error.code,
message: error.message
}
});
}
});
// Send webhook
export const sendWebhook = async (robotId: string, eventType: string, data: any): Promise<void> => {
try {
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot || !robot.webhooks) {
return;
}
const activeWebhooks = robot.webhooks.filter((w: WebhookConfig) =>
w.active && w.events.includes(eventType)
);
if (activeWebhooks.length === 0) {
return;
}
const webhookPromises = activeWebhooks.map(async (webhook: WebhookConfig) => {
const payload = {
event_type: eventType,
timestamp: new Date().toISOString(),
webhook_id: webhook.id,
data: data
};
return sendWebhookWithRetry(robotId, webhook, payload);
});
await Promise.allSettled(webhookPromises);
} catch (error) {
console.error('Error sending webhooks:', error);
}
};
// Helper function to send webhook with retry logic
const sendWebhookWithRetry = async (robotId: string, webhook: WebhookConfig, payload: any, attempt: number = 1): Promise<void> => {
const maxRetries = webhook.retryAttempts || 3;
const retryDelay = webhook.retryDelay || 5;
const timeout = webhook.timeout || 30;
try {
await updateWebhookLastCalled(robotId, webhook.id);
const response = await axios.post(webhook.url, payload, {
timeout: timeout * 1000,
validateStatus: (status) => status >= 200 && status < 300
});
console.log(`Webhook sent successfully to ${webhook.url}: ${response.status}`);
} catch (error: any) {
console.error(`Webhook failed for ${webhook.url} (attempt ${attempt}):`, error.message);
if (attempt < maxRetries) {
const delay = retryDelay * Math.pow(2, attempt - 1);
console.log(`Retrying webhook ${webhook.url} in ${delay} seconds...`);
setTimeout(async () => {
await sendWebhookWithRetry(robotId, webhook, payload, attempt + 1);
}, delay * 1000);
} else {
console.error(`Webhook ${webhook.url} failed after ${maxRetries} attempts`);
}
}
};
// Clear all webhooks for a robot
router.delete('/clear/:robotId', requireSignIn, async (req: Request, res: Response) => {
const { robotId } = req.params;
const authenticatedReq = req as AuthenticatedRequest;
try {
if (!authenticatedReq.user) {
return res.status(401).json({ ok: false, error: 'Unauthorized' });
}
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
if (!robot) {
return res.status(404).json({ ok: false, error: 'Robot not found' });
}
await robot.update({ webhooks: [] });
res.status(200).json({
ok: true,
message: 'All webhooks cleared successfully'
});
} catch (error: any) {
console.log(`Could not clear webhooks - ${error}`);
res.status(500).json({ ok: false, error: 'Could not clear webhook configurations' });
}
});

View File

@@ -6,6 +6,7 @@ import logger from './logger';
import Robot from './models/Robot';
import { handleRunRecording } from './workflow-management/scheduler';
import { computeNextRun } from './utils/schedule';
import { v4 as uuid } from "uuid";
if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
throw new Error('One or more required environment variables are missing.');
@@ -32,7 +33,7 @@ interface ScheduledWorkflowData {
*/
export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> {
try {
const runId = require('uuidv4').uuid();
const runId = uuid();
const queueName = `scheduled-workflow-${id}`;

View File

@@ -4,7 +4,7 @@ import http from 'http';
import cors from 'cors';
import dotenv from 'dotenv';
dotenv.config();
import { record, workflow, storage, auth, integration, proxy } from './routes';
import { record, workflow, storage, auth, integration, proxy, webhook } from './routes';
import { BrowserPool } from "./browser-management/classes/BrowserPool";
import logger from './logger';
import { connectDB, syncDB } from './storage/db'
@@ -20,6 +20,7 @@ import connectPgSimple from 'connect-pg-simple';
import pg from 'pg';
import session from 'express-session';
import Run from './models/Run';
import { processQueuedRuns } from './routes/storage';
const app = express();
app.use(cors({
@@ -83,11 +84,9 @@ export const io = new Server(server);
*/
export const browserPool = new BrowserPool();
// app.use(bodyParser.json({ limit: '10mb' }))
// app.use(bodyParser.urlencoded({ extended: true, limit: '10mb', parameterLimit: 9000 }));
// parse cookies - "cookie" is true in csrfProtection
app.use(cookieParser())
app.use('/webhook', webhook);
app.use('/record', record);
app.use('/workflow', workflow);
app.use('/storage', storage);
@@ -98,9 +97,9 @@ app.use('/api-docs', swaggerUi.serve, swaggerUi.setup(swaggerSpec));
readdirSync(path.join(__dirname, 'api')).forEach((r) => {
const route = require(path.join(__dirname, 'api', r));
const router = route.default || route; // Use .default if available, fallback to route
const router = route.default || route;
if (typeof router === 'function') {
app.use('/api', router); // Use the default export or named router
app.use('/api', router);
} else {
console.error(`Error: ${r} does not export a valid router`);
}
@@ -150,7 +149,6 @@ app.get('/', function (req, res) {
return res.send('Maxun server started 🚀');
});
// Add CORS headers
app.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', process.env.PUBLIC_URL || 'http://localhost:5173');
res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS');
@@ -178,14 +176,19 @@ io.of('/queued-run').on('connection', (socket) => {
}
});
setInterval(() => {
processQueuedRuns();
}, 5000);
server.listen(SERVER_PORT, '0.0.0.0', async () => {
try {
await connectDB();
await syncDB();
logger.log('info', `Server listening on port ${SERVER_PORT}`);
logger.log('info', `Server listening on port ${SERVER_PORT}`);
} catch (error: any) {
logger.log('error', `Failed to connect to the database: ${error.message}`);
process.exit(1); // Exit the process if DB connection fails
process.exit(1);
}
});
@@ -219,4 +222,4 @@ process.on('SIGINT', async () => {
if (recordingWorkerProcess) recordingWorkerProcess.kill();
}
process.exit();
});
});

View File

@@ -1,78 +0,0 @@
import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';
import logger from './logger';
import { handleRunRecording } from "./workflow-management/scheduler";
import Robot from './models/Robot';
import { computeNextRun } from './utils/schedule';
const connection = new IORedis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT, 10) : 6379,
maxRetriesPerRequest: null,
password: process.env.REDIS_PASSWORD ? process.env.REDIS_PASSWORD : undefined,
});
connection.on('connect', () => {
console.log('Connected to Redis!');
});
connection.on('error', (err) => {
console.error('Redis connection error:', err);
});
const workflowQueue = new Queue('workflow', { connection });
const worker = new Worker('workflow', async job => {
const { runId, userId, id } = job.data;
try {
const result = await handleRunRecording(id, userId);
return result;
} catch (error) {
logger.error('Error running workflow:', error);
throw error;
}
}, { connection });
worker.on('completed', async (job: any) => {
logger.log(`info`, `Job ${job.id} completed for ${job.data.runId}`);
const robot = await Robot.findOne({ where: { 'recording_meta.id': job.data.id } });
if (robot) {
// Update `lastRunAt` to the current time
const lastRunAt = new Date();
// Compute the next run date
if (robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) {
const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined;
await robot.update({
schedule: {
...robot.schedule,
lastRunAt,
nextRunAt,
},
});
} else {
logger.error('Robot schedule, cronExpression, or timezone is missing.');
}
}
});
worker.on('failed', async (job: any, err) => {
logger.log(`error`, `Job ${job.id} failed for ${job.data.runId}:`, err);
});
console.log('Worker is running...');
async function jobCounts() {
const jobCounts = await workflowQueue.getJobCounts();
}
jobCounts();
// We dont need this right now
// process.on('SIGINT', () => {
// console.log('Worker shutting down...');
// process.exit();
// });
export { workflowQueue, worker };

View File

@@ -15,7 +15,7 @@ import {
import { CustomActions } from "../../../../src/shared/types";
import Robot from "../../models/Robot";
import { getBestSelectorForAction } from "../utils";
import { uuid } from "uuidv4";
import { v4 as uuid } from "uuid";
import { capture } from "../../utils/analytics"
import { decrypt, encrypt } from "../../utils/auth";
@@ -82,6 +82,7 @@ export class WorkflowGenerator {
this.poolId = poolId;
this.registerEventHandlers(socket);
this.initializeSocketListeners();
this.initializeDOMListeners();
}
/**
@@ -92,6 +93,8 @@ export class WorkflowGenerator {
workflow: [],
};
private isDOMMode: boolean = false;
/**
* Metadata of the currently recorded workflow.
* @private
@@ -134,6 +137,18 @@ export class WorkflowGenerator {
})
}
private initializeDOMListeners() {
this.socket.on('dom-mode-enabled', () => {
this.isDOMMode = true;
logger.log('debug', 'Generator: DOM mode enabled');
});
this.socket.on('screenshot-mode-enabled', () => {
this.isDOMMode = false;
logger.log('debug', 'Generator: Screenshot mode enabled');
});
}
/**
* Registers the event handlers for all generator-related events on the socket.
* @param socket The socket used to communicate with the client.
@@ -159,9 +174,11 @@ export class WorkflowGenerator {
switch (actionType) {
case 'customAction':
// pair.where.selectors = [this.generatedData.lastUsedSelector];
pair.where.selectors = pair.where.selectors.filter(
(selector: string) => selector !== this.generatedData.lastUsedSelector
);
if (pair.where.selectors) {
pair.where.selectors = pair.where.selectors.filter(
(selector: string) => selector !== this.generatedData.lastUsedSelector
);
}
break;
default: break;
}
@@ -348,6 +365,96 @@ export class WorkflowGenerator {
await this.addPairToWorkflowAndNotifyClient(pair, page);
};
// Handles click events on the DOM, generating a pair for the click action
public onDOMClickAction = async (page: Page, data: {
selector: string,
url: string,
userId: string,
elementInfo?: any,
coordinates?: { x: number, y: number }
}) => {
const { selector, url, elementInfo, coordinates } = data;
const pair: WhereWhatPair = {
where: {
url: this.getBestUrl(url),
selectors: [selector]
},
what: [{
action: 'click',
args: [selector],
}],
};
// Handle special input elements with cursor positioning
if (elementInfo && coordinates &&
(elementInfo.tagName === 'INPUT' || elementInfo.tagName === 'TEXTAREA')) {
pair.what[0] = {
action: 'click',
args: [selector, { position: coordinates }, { cursorIndex: 0 }],
};
}
this.generatedData.lastUsedSelector = selector;
this.generatedData.lastAction = 'click';
await this.addPairToWorkflowAndNotifyClient(pair, page);
};
// Handles keyboard actions on the DOM, generating a pair for the key press action
public onDOMKeyboardAction = async (page: Page, data: {
selector: string,
key: string,
url: string,
userId: string,
inputType?: string
}) => {
const { selector, key, url, inputType } = data;
const pair: WhereWhatPair = {
where: {
url: this.getBestUrl(url),
selectors: [selector]
},
what: [{
action: 'press',
args: [selector, encrypt(key), inputType || 'text'],
}],
};
this.generatedData.lastUsedSelector = selector;
this.generatedData.lastAction = 'press';
await this.addPairToWorkflowAndNotifyClient(pair, page);
};
// Handles navigation events on the DOM, generating a pair for the navigation action
public onDOMNavigation = async (page: Page, data: {
url: string,
currentUrl: string,
userId: string
}) => {
const { url, currentUrl } = data;
const pair: WhereWhatPair = {
where: { url: this.getBestUrl(currentUrl) },
what: [{
action: 'goto',
args: [url],
}],
};
this.generatedData.lastUsedSelector = '';
await this.addPairToWorkflowAndNotifyClient(pair, page);
};
// Handles workflow pair events on the DOM
public onDOMWorkflowPair = async (page: Page, data: { pair: WhereWhatPair, userId: string }) => {
const { pair } = data;
await this.addPairToWorkflowAndNotifyClient(pair, page);
};
/**
* Generates a pair for the click event.
* @param coordinates The coordinates of the click event.
@@ -708,6 +815,7 @@ export class WorkflowGenerator {
this.socket = socket;
this.registerEventHandlers(socket);
this.initializeSocketListeners();
this.initializeDOMListeners();
};
/**
@@ -890,6 +998,7 @@ export class WorkflowGenerator {
rect,
selector: displaySelector,
elementInfo,
isDOMMode: this.isDOMMode,
// Include shadow DOM specific information
shadowInfo: elementInfo?.isShadowRoot ? {
mode: elementInfo.shadowRootMode,

View File

@@ -107,6 +107,11 @@ export class WorkflowInterpreter {
*/
public binaryData: { mimetype: string, data: string }[] = [];
/**
* Track current scrapeList index
*/
private currentScrapeListIndex: number = 0;
/**
* An array of id's of the pairs from the workflow that are about to be paused.
* As "breakpoints".
@@ -288,6 +293,7 @@ export class WorkflowInterpreter {
scrapeList: [],
};
this.binaryData = [];
this.currentScrapeListIndex = 0;
}
/**
@@ -322,6 +328,9 @@ export class WorkflowInterpreter {
},
setActionType: (type: string) => {
this.currentActionType = type;
},
incrementScrapeListIndex: () => {
this.currentScrapeListIndex++;
}
},
serializableCallback: (data: any) => {
@@ -334,7 +343,7 @@ export class WorkflowInterpreter {
this.serializableDataByType.scrapeSchema.push([data]);
}
} else if (this.currentActionType === 'scrapeList') {
this.serializableDataByType.scrapeList.push(data);
this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data;
}
this.socket.emit('serializableCallback', data);
@@ -372,25 +381,19 @@ export class WorkflowInterpreter {
log: this.debugMessages,
result: status,
scrapeSchemaOutput: Object.keys(mergedScrapeSchema).length > 0
? { "schema-merged": [mergedScrapeSchema] }
? { "schema_merged": [mergedScrapeSchema] }
: this.serializableDataByType.scrapeSchema.reduce((reducedObject, item, index) => {
return {
[`schema-${index}`]: item,
...reducedObject,
}
}, {}),
reducedObject[`schema_${index}`] = item;
return reducedObject;
}, {} as Record<string, any>),
scrapeListOutput: this.serializableDataByType.scrapeList.reduce((reducedObject, item, index) => {
return {
[`list-${index}`]: item,
...reducedObject,
}
}, {}),
reducedObject[`list_${index}`] = item;
return reducedObject;
}, {} as Record<string, any>),
binaryOutput: this.binaryData.reduce((reducedObject, item, index) => {
return {
[`item-${index}`]: item,
...reducedObject,
}
}, {})
reducedObject[`item_${index}`] = item;
return reducedObject;
}, {} as Record<string, any>)
}
logger.log('debug', `Interpretation finished`);

View File

@@ -44,65 +44,100 @@ async function refreshAirtableToken(refreshToken: string) {
}
}
function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record<string, string>) {
const mergedRecords: Record<string, any>[] = [];
const allRecords: Record<string, any>[] = [];
const maxLength = Math.max(
...[
...(serializableOutput.scrapeSchema ?? []).map(arr => arr?.length ?? 0),
...(serializableOutput.scrapeList ?? []).map(arr => arr?.length ?? 0),
0
]
);
for (let i = 0; i < maxLength; i++) {
mergedRecords.push({});
}
const schemaData: Array<{key: string, value: any}> = [];
const listData: any[] = [];
const screenshotData: Array<{key: string, url: string}> = [];
// Collect schema data
if (serializableOutput.scrapeSchema) {
for (const schemaArray of serializableOutput.scrapeSchema) {
if (!Array.isArray(schemaArray)) continue;
for (let i = 0; i < schemaArray.length; i++) {
if (i >= mergedRecords.length) break;
mergedRecords[i] = { ...mergedRecords[i], ...schemaArray[i] };
for (const schemaItem of schemaArray) {
Object.entries(schemaItem).forEach(([key, value]) => {
if (key && key.trim() !== '' && value !== null && value !== undefined && value !== '') {
schemaData.push({key, value});
}
});
}
}
}
// Collect list data
if (serializableOutput.scrapeList) {
for (const listArray of serializableOutput.scrapeList) {
if (!Array.isArray(listArray)) continue;
for (let i = 0; i < listArray.length; i++) {
if (i >= mergedRecords.length) break;
mergedRecords[i] = { ...mergedRecords[i], ...listArray[i] };
}
}
}
if (binaryOutput && Object.keys(binaryOutput).length > 0) {
for (let i = 0; i < mergedRecords.length; i++) {
const screenshotKey = `item-${i}`;
if (binaryOutput[screenshotKey]) {
mergedRecords[i].Screenshot = binaryOutput[screenshotKey];
mergedRecords[i].Key = screenshotKey;
}
}
for (const [key, url] of Object.entries(binaryOutput)) {
if (mergedRecords.some(record => record.Key === key)) {
continue;
}
mergedRecords.push({
"Key": key,
"Screenshot": url
listArray.forEach(listItem => {
const hasContent = Object.values(listItem).some(value =>
value !== null && value !== undefined && value !== ''
);
if (hasContent) {
listData.push(listItem);
}
});
}
}
return mergedRecords;
// Collect screenshot data
if (binaryOutput && Object.keys(binaryOutput).length > 0) {
Object.entries(binaryOutput).forEach(([key, url]) => {
if (key && key.trim() !== '' && url && url.trim() !== '') {
screenshotData.push({key, url});
}
});
}
// Mix all data types together to create consecutive records
const maxLength = Math.max(schemaData.length, listData.length, screenshotData.length);
for (let i = 0; i < maxLength; i++) {
const record: Record<string, any> = {};
if (i < schemaData.length) {
record.Label = schemaData[i].key;
record.Value = schemaData[i].value;
}
if (i < listData.length) {
Object.entries(listData[i]).forEach(([key, value]) => {
if (value !== null && value !== undefined && value !== '') {
record[key] = value;
}
});
}
if (i < screenshotData.length) {
record.Key = screenshotData[i].key;
record.Screenshot = screenshotData[i].url;
}
if (Object.keys(record).length > 0) {
allRecords.push(record);
}
}
for (let i = maxLength; i < schemaData.length; i++) {
allRecords.push({
Label: schemaData[i].key,
Value: schemaData[i].value
});
}
for (let i = maxLength; i < listData.length; i++) {
allRecords.push(listData[i]);
}
for (let i = maxLength; i < screenshotData.length; i++) {
allRecords.push({
Key: screenshotData[i].key,
Screenshot: screenshotData[i].url
});
}
return allRecords;
}
export async function updateAirtable(robotId: string, runId: string) {
@@ -210,11 +245,13 @@ export async function writeDataToAirtable(
const airtable = new Airtable({ apiKey: accessToken });
const base = airtable.base(baseId);
await deleteEmptyRecords(base, tableName);
const processedData = data.map(item => {
const cleanedItem: Record<string, any> = {};
for (const [key, value] of Object.entries(item)) {
if (value === null || value === undefined) {
if (value === null || value === undefined || value === '') {
cleanedItem[key] = '';
} else if (typeof value === 'object' && !Array.isArray(value)) {
cleanedItem[key] = JSON.stringify(value);
@@ -224,113 +261,55 @@ export async function writeDataToAirtable(
}
return cleanedItem;
}).filter(record => {
return Object.values(record).some(value => value !== null && value !== undefined && value !== '');
});
const existingFields = await getExistingFields(base, tableName);
console.log(`Found ${existingFields.length} existing fields in Airtable: ${existingFields.join(', ')}`);
if (processedData.length === 0) {
console.log('No valid data to write after filtering. Skipping.');
return;
}
const dataFields = [...new Set(processedData.flatMap(row => Object.keys(row)))];
const dataFields = [...new Set(processedData.flatMap(row => Object.keys(row)))];
console.log(`Found ${dataFields.length} fields in data: ${dataFields.join(', ')}`);
const existingFields = await getExistingFields(base, tableName);
const missingFields = dataFields.filter(field => !existingFields.includes(field));
const hasNewColumns = missingFields.length > 0;
console.log(`Found ${missingFields.length} new fields: ${missingFields.join(', ')}`);
for (const field of missingFields) {
const sampleRow = processedData.find(row => field in row);
if (sampleRow) {
const sampleValue = sampleRow[field];
try {
await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId);
console.log(`Successfully created field: ${field}`);
await new Promise(resolve => setTimeout(resolve, 200));
} catch (fieldError: any) {
console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`);
}
}
}
let existingRecords: Array<{ id: string, fields: Record<string, any> }> = [];
if (hasNewColumns) {
existingRecords = await fetchAllRecords(base, tableName);
console.log(`Found ${existingRecords.length} existing records in Airtable`);
}
if (hasNewColumns && existingRecords.length > 0) {
const recordsToUpdate = [];
const recordsToCreate = [];
if (missingFields.length > 0) {
console.log(`Creating ${missingFields.length} new fields: ${missingFields.join(', ')}`);
const newColumnData = processedData.map(record => {
const newColumnsOnly: Record<string, any> = {};
missingFields.forEach(field => {
if (field in record) {
newColumnsOnly[field] = record[field];
}
});
return newColumnsOnly;
});
for (let i = 0; i < Math.min(existingRecords.length, newColumnData.length); i++) {
if (Object.keys(newColumnData[i]).length > 0) {
recordsToUpdate.push({
id: existingRecords[i].id,
fields: newColumnData[i]
});
}
}
const existingColumnsBeingUpdated = dataFields.filter(field =>
existingFields.includes(field) && !missingFields.includes(field)
);
if (existingColumnsBeingUpdated.length > 0) {
recordsToCreate.push(...processedData.map(record => ({ fields: record })));
console.log(`Will append ${recordsToCreate.length} new records with all data`);
} else {
if (processedData.length > existingRecords.length) {
const additionalRecords = processedData.slice(existingRecords.length);
recordsToCreate.push(...additionalRecords.map(record => ({ fields: record })));
console.log(`Will append ${recordsToCreate.length} additional records`);
}
}
if (recordsToUpdate.length > 0) {
console.log(`Updating ${recordsToUpdate.length} existing records with new columns`);
const BATCH_SIZE = 10;
for (let i = 0; i < recordsToUpdate.length; i += BATCH_SIZE) {
const batch = recordsToUpdate.slice(i, i + BATCH_SIZE);
console.log(`Updating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToUpdate.length/BATCH_SIZE)}`);
for (const field of missingFields) {
const sampleRow = processedData.find(row => field in row && row[field] !== '');
if (sampleRow) {
const sampleValue = sampleRow[field];
try {
await retryableAirtableUpdate(base, tableName, batch);
} catch (batchError: any) {
console.error(`Error updating batch: ${batchError.message}`);
throw batchError;
await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId);
console.log(`Successfully created field: ${field}`);
await new Promise(resolve => setTimeout(resolve, 200));
} catch (fieldError: any) {
console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`);
}
await new Promise(resolve => setTimeout(resolve, 500));
}
}
} else {
console.log(`Appending all ${processedData.length} records to Airtable`);
const recordsToCreate = processedData.map(record => ({ fields: record }));
}
console.log(`Appending all ${processedData.length} records to Airtable`);
const recordsToCreate = processedData.map(record => ({ fields: record }));
const BATCH_SIZE = 10;
for (let i = 0; i < recordsToCreate.length; i += BATCH_SIZE) {
const batch = recordsToCreate.slice(i, i + BATCH_SIZE);
console.log(`Creating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToCreate.length/BATCH_SIZE)}`);
const BATCH_SIZE = 10;
for (let i = 0; i < recordsToCreate.length; i += BATCH_SIZE) {
const batch = recordsToCreate.slice(i, i + BATCH_SIZE);
console.log(`Creating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToCreate.length/BATCH_SIZE)}`);
try {
await retryableAirtableCreate(base, tableName, batch);
} catch (batchError: any) {
console.error(`Error creating batch: ${batchError.message}`);
throw batchError;
}
await new Promise(resolve => setTimeout(resolve, 500));
try {
await retryableAirtableCreate(base, tableName, batch);
} catch (batchError: any) {
console.error(`Error creating batch: ${batchError.message}`);
throw batchError;
}
await new Promise(resolve => setTimeout(resolve, 500));
}
await deleteEmptyRecords(base, tableName);
@@ -343,20 +322,6 @@ export async function writeDataToAirtable(
}
}
async function fetchAllRecords(base: Airtable.Base, tableName: string): Promise<Array<{ id: string, fields: Record<string, any> }>> {
try {
console.log(`Fetching all records from ${tableName}...`);
const records = await base(tableName).select().all();
return records.map(record => ({
id: record.id,
fields: record.fields
}));
} catch (error: any) {
console.warn(`Warning: Could not fetch all records: ${error.message}`);
return [];
}
}
async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise<void> {
console.log('Checking for empty records to clear...');
@@ -407,23 +372,6 @@ async function retryableAirtableCreate(
}
}
async function retryableAirtableUpdate(
base: Airtable.Base,
tableName: string,
batch: any[],
retries = MAX_RETRIES
): Promise<void> {
try {
await base(tableName).update(batch);
} catch (error) {
if (retries > 0) {
await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY));
return retryableAirtableUpdate(base, tableName, batch, retries - 1);
}
throw error;
}
}
// Helper functions
async function getExistingFields(base: Airtable.Base, tableName: string): Promise<string[]> {
try {

View File

@@ -114,7 +114,16 @@ async function processOutputType(
await ensureSheetExists(spreadsheetId, sheetName, robotConfig);
await writeDataToSheet(robotId, spreadsheetId, data, sheetName, robotConfig);
let formattedData = data;
if (outputType === 'Text' && data.length > 0) {
const schemaItem = data[0];
formattedData = Object.entries(schemaItem).map(([key, value]) => ({
Label: key,
Value: value
}));
}
await writeDataToSheet(robotId, spreadsheetId, formattedData, sheetName, robotConfig);
console.log(`Data written to ${sheetName} sheet for ${outputType} data`);
}
}

View File

@@ -1,4 +1,4 @@
import { uuid } from "uuidv4";
import { v4 as uuid } from "uuid";
import { chromium } from 'playwright-extra';
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
import { io, Socket } from "socket.io-client";
@@ -13,6 +13,8 @@ import { BinaryOutputService } from "../../storage/mino";
import { capture } from "../../utils/analytics";
import { WorkflowFile } from "maxun-core";
import { Page } from "playwright";
import { sendWebhook } from "../../routes/webhook";
import { airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable";
chromium.use(stealthPlugin());
async function createWorkflowAndStoreMetadata(id: string, userId: string) {
@@ -152,26 +154,34 @@ async function executeRun(id: string, userId: string) {
binaryOutput: uploadedBinaryOutput,
});
let totalRowsExtracted = 0;
// Track extraction metrics
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
let extractedItemsCount = 0;
if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) {
extractedScreenshotsCount = 1;
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) {
const itemsArray = run.dataValues.serializableOutput["item-0"];
extractedItemsCount = itemsArray.length;
totalRowsExtracted = itemsArray.reduce((total, item) => {
return total + Object.keys(item).length;
}, 0);
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
console.log(`Extracted Items Count: ${extractedItemsCount}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
}
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
capture(
'maxun-oss-run-created-scheduled',
@@ -180,18 +190,60 @@ async function executeRun(id: string, userId: string) {
created_at: new Date().toISOString(),
status: 'success',
totalRowsExtracted,
extractedItemsCount,
schemaItemsExtracted: totalSchemaItemsExtracted,
listItemsExtracted: totalListItemsExtracted,
extractedScreenshotsCount,
}
);
googleSheetUpdateTasks[id] = {
robotId: plainRun.robotMetaId,
runId: id,
status: 'pending',
retries: 5,
const webhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: plainRun.runId,
robot_name: recording.recording_meta.name,
status: 'success',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
extracted_data: {
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
captured_lists: categorizedOutput.scrapeList,
total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount
},
metadata: {
browser_id: plainRun.browserId,
user_id: userId
}
};
processGoogleSheetUpdates();
try {
await sendWebhook(plainRun.robotMetaId, 'run_completed', webhookPayload);
logger.log('info', `Webhooks sent successfully for completed run ${plainRun.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send webhooks for run ${plainRun.runId}: ${webhookError.message}`);
}
try {
googleSheetUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
}
return true;
} catch (error: any) {
logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`);
@@ -202,6 +254,34 @@ async function executeRun(id: string, userId: string) {
status: 'failed',
finishedAt: new Date().toLocaleString(),
});
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: run.robotMetaId,
run_id: run.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: run.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: error.message,
stack: error.stack,
type: error.name || 'ExecutionError'
},
metadata: {
browser_id: run.browserId,
user_id: userId,
}
};
try {
await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${run.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${run.runId}: ${webhookError.message}`);
}
}
capture(
'maxun-oss-run-created-scheduled',

23
server/tsconfig.mcp.json Normal file
View File

@@ -0,0 +1,23 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "../dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": [
"src/mcp-worker.ts"
],
"exclude": [
"node_modules",
"dist"
]
}