From 5be2b3175baf13bfa569ba27869f12a0d38a4cb3 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Tue, 21 Oct 2025 00:43:08 +0530 Subject: [PATCH] feat: recorder revamp server changes --- server/src/api/record.ts | 199 ++++++++++----- .../src/browser-management/inputHandlers.ts | 72 +++++- server/src/pgboss-worker.ts | 167 ++++++++----- .../workflow-management/classes/Generator.ts | 158 ++++++++++-- .../classes/Interpreter.ts | 231 ++++++++++++------ .../integrations/airtable.ts | 117 ++++++--- .../integrations/gsheet.ts | 86 ++++--- .../workflow-management/scheduler/index.ts | 206 +++++++++++----- src/context/browserSteps.tsx | 12 +- 9 files changed, 879 insertions(+), 369 deletions(-) diff --git a/server/src/api/record.ts b/server/src/api/record.ts index 5d9a68cd..29d1f261 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -1,16 +1,14 @@ -import { readFile, readFiles } from "../workflow-management/storage"; import { Router, Request, Response } from 'express'; import { chromium } from "playwright-extra"; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import { requireAPIKey } from "../middlewares/api"; import Robot from "../models/Robot"; import Run from "../models/Run"; -const router = Router(); import { getDecryptedProxyConfig } from "../routes/proxy"; import { v4 as uuid } from "uuid"; import { createRemoteBrowserForRun, destroyRemoteBrowser } from "../browser-management/controller"; import logger from "../logger"; -import { browserPool } from "../server"; +import { browserPool, io as serverIo } from "../server"; import { io, Socket } from "socket.io-client"; import { BinaryOutputService } from "../storage/mino"; import { AuthenticatedRequest } from "../routes/record" @@ -20,8 +18,11 @@ import { WorkflowFile } from "maxun-core"; import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../workflow-management/integrations/gsheet"; import { airtableUpdateTasks, processAirtableUpdates } from "../workflow-management/integrations/airtable"; import { sendWebhook } from "../routes/webhook"; + chromium.use(stealthPlugin()); +const router = Router(); + const formatRecording = (recordingData: any) => { const recordingMeta = recordingData.recording_meta; const workflow = recordingData.recording.workflow || []; @@ -334,7 +335,7 @@ function formatRunResponse(run: any) { id: run.id, status: run.status, name: run.name, - robotId: run.robotMetaId, // Renaming robotMetaId to robotId + robotId: run.robotMetaId, startedAt: run.startedAt, finishedAt: run.finishedAt, runId: run.runId, @@ -342,20 +343,20 @@ function formatRunResponse(run: any) { runByScheduleId: run.runByScheduleId, runByAPI: run.runByAPI, data: { - textData: [], - listData: [] + textData: {}, + listData: {} }, screenshots: [] as any[], }; - if (run.serializableOutput) { - if (run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) { - formattedRun.data.textData = run.serializableOutput.scrapeSchema; - } + const output = run.serializableOutput || {}; - if (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0) { - formattedRun.data.listData = run.serializableOutput.scrapeList; - } + if (output.scrapeSchema && typeof output.scrapeSchema === 'object') { + formattedRun.data.textData = output.scrapeSchema; + } + + if (output.scrapeList && typeof output.scrapeList === 'object') { + formattedRun.data.listData = output.scrapeList; } if (run.binaryOutput) { @@ -505,10 +506,30 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { runByAPI: true, serializableOutput: {}, binaryOutput: {}, + retryCount: 0 }); const plainRun = run.toJSON(); + try { + const runStartedData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: plainRun.name, + status: 'running', + startedAt: plainRun.startedAt, + runByUserId: plainRun.runByUserId, + runByScheduleId: plainRun.runByScheduleId, + runByAPI: plainRun.runByAPI || false, + browserId: plainRun.browserId + }; + + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-started', runStartedData); + logger.log('info', `API run started notification sent for run: ${plainRun.runId} to user-${userId}`); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-started notification for API run ${plainRun.runId}: ${socketError.message}`); + } + return { browserId, runId: plainRun.runId, @@ -525,6 +546,29 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { } } +async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { + try { + googleSheetUpdateTasks[runId] = { + robotId: robotMetaId, + runId: runId, + status: 'pending', + retries: 5, + }; + + airtableUpdateTasks[runId] = { + robotId: robotMetaId, + runId: runId, + status: 'pending', + retries: 5, + }; + + processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + } catch (err: any) { + logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); + } +} + async function readyForRunHandler(browserId: string, id: string, userId: string){ try { const result = await executeRun(id, userId); @@ -565,6 +609,8 @@ function AddGeneratedFlags(workflow: WorkflowFile) { }; async function executeRun(id: string, userId: string) { + let browser: any = null; + try { const run = await Run.findOne({ where: { runId: id } }); if (!run) { @@ -576,6 +622,27 @@ async function executeRun(id: string, userId: string) { const plainRun = run.toJSON(); + if (run.status === 'aborted' || run.status === 'aborting') { + logger.log('info', `API Run ${id} has status ${run.status}, skipping execution`); + return { success: true }; + } + + if (run.status === 'queued') { + logger.log('info', `API Run ${id} has status 'queued', skipping stale execution - will be handled by recovery`); + return { success: true }; + } + + const retryCount = plainRun.retryCount || 0; + if (retryCount >= 3) { + logger.log('warn', `API Run ${id} has exceeded max retries (${retryCount}/3), marking as failed`); + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Max retries exceeded (${retryCount}/3) - Run permanently failed` + }); + return { success: false, error: 'Max retries exceeded' }; + } + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); if (!recording) { return { @@ -586,7 +653,7 @@ async function executeRun(id: string, userId: string) { plainRun.status = 'running'; - const browser = browserPool.getRemoteBrowser(plainRun.browserId); + browser = browserPool.getRemoteBrowser(plainRun.browserId); if (!browser) { throw new Error('Could not access browser'); } @@ -597,41 +664,33 @@ async function executeRun(id: string, userId: string) { } const workflow = AddGeneratedFlags(recording.recording); - - browser.interpreter.setRunId(id); + + browser.interpreter.setRunId(plainRun.runId); const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + await destroyRemoteBrowser(plainRun.browserId, userId); const updatedRun = await run.update({ status: 'success', finishedAt: new Date().toLocaleString(), log: interpretationInfo.log.join('\n'), + binaryOutput: uploadedBinaryOutput, }); - // Upload binary output to MinIO and update run with MinIO URLs - const finalRun = await Run.findOne({ where: { runId: id } }); - if (finalRun && finalRun.binaryOutput && Object.keys(finalRun.binaryOutput).length > 0) { - try { - const binaryService = new BinaryOutputService('maxun-run-screenshots'); - await binaryService.uploadAndStoreBinaryOutput(finalRun, finalRun.binaryOutput); - logger.log('info', `Uploaded binary output to MinIO for API run ${id}`); - } catch (minioError: any) { - logger.log('error', `Failed to upload binary output to MinIO for API run ${id}: ${minioError.message}`); - } - } - let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (finalRun) { - if (finalRun.serializableOutput) { - if (finalRun.serializableOutput.scrapeSchema) { - Object.values(finalRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (updatedRun) { + if (updatedRun.dataValues.serializableOutput) { + if (updatedRun.dataValues.serializableOutput.scrapeSchema) { + Object.values(updatedRun.dataValues.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { if (Array.isArray(schemaResult)) { totalSchemaItemsExtracted += schemaResult.length; } else if (schemaResult && typeof schemaResult === 'object') { @@ -640,8 +699,8 @@ async function executeRun(id: string, userId: string) { }); } - if (finalRun.serializableOutput.scrapeList) { - Object.values(finalRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (updatedRun.dataValues.serializableOutput.scrapeList) { + Object.values(updatedRun.dataValues.serializableOutput.scrapeList).forEach((listResult: any) => { if (Array.isArray(listResult)) { totalListItemsExtracted += listResult.length; } @@ -649,8 +708,8 @@ async function executeRun(id: string, userId: string) { } } - if (finalRun.binaryOutput) { - extractedScreenshotsCount = Object.keys(finalRun.binaryOutput).length; + if (updatedRun.dataValues.binaryOutput) { + extractedScreenshotsCount = Object.keys(updatedRun.dataValues.binaryOutput).length; } } @@ -667,17 +726,31 @@ async function executeRun(id: string, userId: string) { } ) + const parsedOutput = + typeof updatedRun.dataValues.serializableOutput === "string" + ? JSON.parse(updatedRun.dataValues.serializableOutput) + : updatedRun.dataValues.serializableOutput || {}; + + const parsedList = + typeof parsedOutput.scrapeList === "string" + ? JSON.parse(parsedOutput.scrapeList) + : parsedOutput.scrapeList || {}; + + const parsedSchema = + typeof parsedOutput.scrapeSchema === "string" + ? JSON.parse(parsedOutput.scrapeSchema) + : parsedOutput.scrapeSchema || {}; + const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: plainRun.runId, robot_name: recording.recording_meta.name, - status: 'success', + status: "success", started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: finalRun?.serializableOutput?.scrapeSchema ? Object.values(finalRun.serializableOutput.scrapeSchema).flat() : [], - captured_lists: finalRun?.serializableOutput?.scrapeList || {}, - total_rows: totalRowsExtracted, + captured_texts: parsedSchema || {}, + captured_lists: parsedList || {}, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, screenshots_count: extractedScreenshotsCount @@ -685,7 +758,7 @@ async function executeRun(id: string, userId: string) { metadata: { browser_id: plainRun.browserId, user_id: userId, - } + }, }; try { @@ -695,26 +768,7 @@ async function executeRun(id: string, userId: string) { logger.log('error', `Failed to send webhooks for run ${plainRun.runId}: ${webhookError.message}`); } - try { - googleSheetUpdateTasks[id] = { - robotId: plainRun.robotMetaId, - runId: id, - status: 'pending', - retries: 5, - }; - - airtableUpdateTasks[id] = { - robotId: plainRun.robotMetaId, - runId: id, - status: 'pending', - retries: 5, - }; - - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); - } catch (err: any) { - logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); - } + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); return { success: true, @@ -728,8 +782,29 @@ async function executeRun(id: string, userId: string) { await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), + log: (run.log ? run.log + '\n' : '') + `Error: ${error.message}\n` + (error.stack ? error.stack : ''), }); + try { + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + const failureData = { + runId: run.runId, + robotMetaId: run.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString(), + runByUserId: run.runByUserId, + runByScheduleId: run.runByScheduleId, + runByAPI: run.runByAPI || false, + browserId: run.browserId + }; + + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-completed', failureData); + logger.log('info', `API run permanently failed notification sent for run: ${run.runId} to user-${userId}`); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-completed notification for permanently failed API run ${run.runId}: ${socketError.message}`); + } + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); // Trigger webhooks for run failure @@ -814,7 +889,7 @@ async function waitForRunCompletion(runId: string, interval: number = 2000) { if (!run) throw new Error('Run not found'); if (run.status === 'success') { - return run; + return run.toJSON(); } else if (run.status === 'failed') { throw new Error('Run failed'); } diff --git a/server/src/browser-management/inputHandlers.ts b/server/src/browser-management/inputHandlers.ts index c014af3d..3e58664e 100644 --- a/server/src/browser-management/inputHandlers.ts +++ b/server/src/browser-management/inputHandlers.ts @@ -65,6 +65,7 @@ const handleWrapper = async ( interface CustomActionEventData { action: CustomActions; settings: any; + actionId?: string; } /** @@ -84,23 +85,24 @@ const onGenerateAction = async (customActionEventData: CustomActionEventData, us * @param page The active page * @param action The custom action * @param settings The custom action settings + * @param actionId Optional action ID for tracking and updating specific actions * @category BrowserManagement */ const handleGenerateAction = - async (activeBrowser: RemoteBrowser, page: Page, { action, settings }: CustomActionEventData) => { - try { - if (page.isClosed()) { - logger.log("debug", `Ignoring generate action event: page is closed`); - return; - } + async (activeBrowser: RemoteBrowser, page: Page, { action, settings, actionId }: CustomActionEventData) => { + try { + if (page.isClosed()) { + logger.log("debug", `Ignoring generate action event: page is closed`); + return; + } - const generator = activeBrowser.generator; - await generator.customAction(action, settings, page); - } catch (e) { - const { message } = e as Error; - logger.log("warn", `Error handling generate action event: ${message}`); - } + const generator = activeBrowser.generator; + await generator.customAction(action, actionId || '', settings, page); + } catch (e) { + const { message } = e as Error; + logger.log("warn", `Error handling generate action event: ${message}`); } + } /** * A wrapper function for handling mousedown event. @@ -819,6 +821,49 @@ const onDOMWorkflowPair = async ( await handleWrapper(handleWorkflowPair, userId, data); }; +/** + * Handles the remove action event. + * This is called when a user discards a capture action (list or text) that was already emitted to the backend. + * @param activeBrowser - the active remote browser instance + * @param page - the active page of the remote browser + * @param data - the data containing the actionId to remove + * @category BrowserManagement + */ +const handleRemoveAction = async ( + activeBrowser: RemoteBrowser, + page: Page, + data: { actionId: string } +) => { + try { + const { actionId } = data; + const generator = activeBrowser.generator; + const removed = generator.removeAction(actionId); + + if (removed) { + logger.log("info", `Action ${actionId} successfully removed from workflow`); + } else { + logger.log("debug", `Action ${actionId} not found in workflow`); + } + } catch (e) { + const { message } = e as Error; + logger.log("warn", `Error handling remove action event: ${message}`); + } +}; + +/** + * A wrapper function for handling the remove action event. + * @param data - the data containing the actionId to remove + * @param userId - the user ID + * @category HelperFunctions + */ +const onRemoveAction = async ( + data: { actionId: string }, + userId: string +) => { + logger.log("debug", "Handling remove action event emitted from client"); + await handleWrapper(handleRemoveAction, userId, data); +}; + /** * Helper function for registering the handlers onto established websocket connection. * Registers various input handlers. @@ -831,7 +876,7 @@ const onDOMWorkflowPair = async ( * @returns void * @category BrowserManagement */ -const registerInputHandlers = (socket: Socket, userId: string) => { +const registerInputHandlers = (socket: Socket, userId: string) => { // Register handlers with the socket socket.on("input:mousedown", (data) => onMousedown(data, userId)); socket.on("input:wheel", (data) => onWheel(data, userId)); @@ -847,6 +892,7 @@ const registerInputHandlers = (socket: Socket, userId: string) => { socket.on("input:time", (data) => onTimeSelection(data, userId)); socket.on("input:datetime-local", (data) => onDateTimeLocalSelection(data, userId)); socket.on("action", (data) => onGenerateAction(data, userId)); + socket.on("removeAction", (data) => onRemoveAction(data, userId)); socket.on("dom:click", (data) => onDOMClickAction(data, userId)); socket.on("dom:keypress", (data) => onDOMKeyboardAction(data, userId)); diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index c1511b2f..b9f41100 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -149,14 +149,20 @@ async function processRunExecution(job: Job) { let browser = browserPool.getRemoteBrowser(browserId); const browserWaitStart = Date.now(); let lastLogTime = 0; + let pollAttempts = 0; + const MAX_POLL_ATTEMPTS = 15; - while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) { + while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT && pollAttempts < MAX_POLL_ATTEMPTS) { const currentTime = Date.now(); + pollAttempts++; const browserStatus = browserPool.getBrowserStatus(browserId); if (browserStatus === null) { throw new Error(`Browser slot ${browserId} does not exist in pool`); } + if (browserStatus === "failed") { + throw new Error(`Browser ${browserId} initialization failed`); + } if (currentTime - lastLogTime > 10000) { logger.log('info', `Browser ${browserId} not ready yet (status: ${browserStatus}), waiting... (${Math.round((currentTime - browserWaitStart) / 1000)}s elapsed)`); @@ -183,17 +189,25 @@ async function processRunExecution(job: Job) { } const isRunAborted = async (): Promise => { - const currentRun = await Run.findOne({ where: { runId: data.runId } }); - return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; + try { + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; + } catch (error: any) { + logger.log('error', `Error checking if run ${data.runId} is aborted: ${error.message}`); + return false; + } }; let currentPage = browser.getCurrentPage(); const pageWaitStart = Date.now(); let lastPageLogTime = 0; + let pageAttempts = 0; + const MAX_PAGE_ATTEMPTS = 15; - while (!currentPage && (Date.now() - pageWaitStart) < BROWSER_PAGE_TIMEOUT) { + while (!currentPage && (Date.now() - pageWaitStart) < BROWSER_PAGE_TIMEOUT && pageAttempts < MAX_PAGE_ATTEMPTS) { const currentTime = Date.now(); + pageAttempts++; if (currentTime - lastPageLogTime > 5000) { logger.log('info', `Page not ready for browser ${browserId}, waiting... (${Math.round((currentTime - pageWaitStart) / 1000)}s elapsed)`); @@ -209,6 +223,26 @@ async function processRunExecution(job: Job) { } logger.log('info', `Starting workflow execution for run ${data.runId}`); + + await run.update({ + status: 'running', + log: 'Workflow execution started' + }); + + try { + const startedData = { + runId: data.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording.recording_meta.name, + status: 'running', + startedAt: new Date().toLocaleString() + }; + + serverIo.of(browserId).emit('run-started', startedData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-started', startedData); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-started notification for API run ${plainRun.runId}: ${socketError.message}`); + } // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); @@ -231,6 +265,19 @@ async function processRunExecution(job: Job) { } logger.log('info', `Workflow execution completed for run ${data.runId}`); + + const binaryOutputService = new BinaryOutputService('maxuncloud-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput( + run, + interpretationInfo.binaryOutput + ); + + // Get the already persisted and credit-validated data from the run record + const finalRun = await Run.findByPk(run.id); + const categorizedOutput = { + scrapeSchema: finalRun?.serializableOutput?.scrapeSchema || {}, + scrapeList: finalRun?.serializableOutput?.scrapeList || {} + }; if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); @@ -240,48 +287,39 @@ async function processRunExecution(job: Job) { await run.update({ status: 'success', finishedAt: new Date().toLocaleString(), - log: interpretationInfo.log.join('\n') + log: interpretationInfo.log.join('\n'), + serializableOutput: JSON.parse(JSON.stringify({ + scrapeSchema: categorizedOutput.scrapeSchema || {}, + scrapeList: categorizedOutput.scrapeList || {}, + })), + binaryOutput: uploadedBinaryOutput, }); - // Upload binary output to MinIO and update run with MinIO URLs - const updatedRun = await Run.findOne({ where: { runId: data.runId } }); - if (updatedRun && updatedRun.binaryOutput && Object.keys(updatedRun.binaryOutput).length > 0) { - try { - const binaryService = new BinaryOutputService('maxun-run-screenshots'); - await binaryService.uploadAndStoreBinaryOutput(updatedRun, updatedRun.binaryOutput); - logger.log('info', `Uploaded binary output to MinIO for run ${data.runId}`); - } catch (minioError: any) { - logger.log('error', `Failed to upload binary output to MinIO for run ${data.runId}: ${minioError.message}`); - } - } - let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (updatedRun) { - if (updatedRun.serializableOutput) { - if (updatedRun.serializableOutput.scrapeSchema) { - Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalSchemaItemsExtracted += 1; - } - }); - } - - if (updatedRun.serializableOutput.scrapeList) { - Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - totalListItemsExtracted += listResult.length; - } - }); - } + if (categorizedOutput) { + if (categorizedOutput.scrapeSchema) { + Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); } - if (updatedRun.binaryOutput) { - extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length; + if (categorizedOutput.scrapeList) { + Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); + } + + if (run.binaryOutput) { + extractedScreenshotsCount = Object.keys(run.binaryOutput).length; } } @@ -302,6 +340,21 @@ async function processRunExecution(job: Job) { } ); + try { + const completionData = { + runId: data.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording.recording_meta.name, + status: 'success', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(browserId).emit('run-completed', completionData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-completed notification for API run ${plainRun.runId}: ${socketError.message}`); + } + const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: data.runId, @@ -310,12 +363,16 @@ async function processRunExecution(job: Job) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [], - captured_lists: updatedRun?.serializableOutput?.scrapeList || {}, - total_rows: totalRowsExtracted, + captured_texts: Object.keys(categorizedOutput.scrapeSchema || {}).length > 0 + ? Object.entries(categorizedOutput.scrapeSchema).reduce((acc, [name, value]) => { + acc[name] = Array.isArray(value) ? value : [value]; + return acc; + }, {} as Record) + : {}, + captured_lists: categorizedOutput.scrapeList, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, - screenshots_count: extractedScreenshotsCount, + screenshots_count: extractedScreenshotsCount }, metadata: { browser_id: plainRun.browserId, @@ -330,26 +387,8 @@ async function processRunExecution(job: Job) { logger.log('error', `Failed to send webhooks for run ${data.runId}: ${webhookError.message}`); } - // Schedule updates for Google Sheets and Airtable await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); - // Flush any remaining persistence buffer before emitting socket event - if (browser && browser.interpreter) { - await browser.interpreter.flushPersistenceBuffer(); - logger.log('debug', `Flushed persistence buffer before emitting run-completed for run ${data.runId}`); - } - - const completionData = { - runId: data.runId, - robotMetaId: plainRun.robotMetaId, - robotName: recording.recording_meta.name, - status: 'success', - finishedAt: new Date().toLocaleString() - }; - - serverIo.of(browserId).emit('run-completed', completionData); - serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData); - await destroyRemoteBrowser(browserId, data.userId); logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`); @@ -416,9 +455,13 @@ async function processRunExecution(job: Job) { }, partial_data_extracted: partialDataExtracted, extracted_data: partialDataExtracted ? { - captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [], + captured_texts: Object.keys(partialUpdateData.serializableOutput?.scrapeSchema || {}).length > 0 + ? Object.entries(partialUpdateData.serializableOutput.scrapeSchema).reduce((acc, [name, value]) => { + acc[name] = Array.isArray(value) ? value : [value]; + return acc; + }, {} as Record) + : {}, captured_lists: partialUpdateData.serializableOutput?.scrapeList || {}, - total_data_points_extracted: partialData?.totalDataPointsExtracted || 0, captured_texts_count: partialData?.totalSchemaItemsExtracted || 0, captured_lists_count: partialData?.totalListItemsExtracted || 0, screenshots_count: partialData?.extractedScreenshotsCount || 0 diff --git a/server/src/workflow-management/classes/Generator.ts b/server/src/workflow-management/classes/Generator.ts index a5bc2edc..57a30863 100644 --- a/server/src/workflow-management/classes/Generator.ts +++ b/server/src/workflow-management/classes/Generator.ts @@ -726,34 +726,108 @@ export class WorkflowGenerator { /** * Generates a pair for the custom action event. + * * @param action The type of the custom action. - * @param settings The settings of the custom action. + * @param actionId The unique identifier for this action (for updates) + * @param settings The settings of the custom action (may include name and actionId). * @param page The page to use for obtaining the needed data. */ - public customAction = async (action: CustomActions, settings: any, page: Page) => { - const pair: WhereWhatPair = { - where: { url: this.getBestUrl(page.url()) }, - what: [{ - action, - args: settings ? Array.isArray(settings) ? settings : [settings] : [], - }], - } + public customAction = async (action: CustomActions, actionId: string, settings: any, page: Page) => { + try { + let actionSettings = settings; + let actionName: string | undefined; - await this.addPairToWorkflowAndNotifyClient(pair, page); + if (settings && !Array.isArray(settings)) { + actionName = settings.name; + actionSettings = JSON.parse(JSON.stringify(settings)); + delete actionSettings.name; + } - if (this.generatedData.lastUsedSelector) { - const elementInfo = await this.getLastUsedSelectorInfo(page, this.generatedData.lastUsedSelector); + const pair: WhereWhatPair = { + where: { url: this.getBestUrl(page.url()) }, + what: [{ + action, + args: actionSettings + ? Array.isArray(actionSettings) + ? actionSettings + : [actionSettings] + : [], + ...(actionName ? { name: actionName } : {}), + ...(actionId ? { actionId } : {}), + }], + }; - this.socket.emit('decision', { - pair, actionType: 'customAction', - lastData: { - selector: this.generatedData.lastUsedSelector, - action: this.generatedData.lastAction, - tagName: elementInfo.tagName, - innerText: elementInfo.innerText, + if (actionId) { + const existingIndex = this.workflowRecord.workflow.findIndex( + (workflowPair) => + Array.isArray(workflowPair.what) && + workflowPair.what.some((whatItem: any) => whatItem.actionId === actionId) + ); + + if (existingIndex !== -1) { + const existingPair = this.workflowRecord.workflow[existingIndex]; + const existingAction = existingPair.what.find((whatItem: any) => whatItem.actionId === actionId); + + const updatedAction = { + ...existingAction, + action, + args: Array.isArray(actionSettings) + ? actionSettings + : [actionSettings], + name: actionName || existingAction?.name || '', + actionId, + }; + + this.workflowRecord.workflow[existingIndex] = { + where: JSON.parse(JSON.stringify(existingPair.where)), + what: existingPair.what.map((whatItem: any) => + whatItem.actionId === actionId ? updatedAction : whatItem + ), + }; + + if (action === 'scrapeSchema' && actionName) { + this.workflowRecord.workflow.forEach((pair, index) => { + pair.what.forEach((whatItem: any, whatIndex: number) => { + if (whatItem.action === 'scrapeSchema' && whatItem.actionId !== actionId) { + this.workflowRecord.workflow[index].what[whatIndex] = { + ...whatItem, + name: actionName + }; + } + }); + }); + } + + } else { + await this.addPairToWorkflowAndNotifyClient(pair, page); + logger.log("debug", `Added new workflow action: ${action} with actionId: ${actionId}`); } - }); - } + } else { + await this.addPairToWorkflowAndNotifyClient(pair, page); + logger.log("debug", `Added new workflow action: ${action} without actionId`); + } + + if (this.generatedData.lastUsedSelector) { + const elementInfo = await this.getLastUsedSelectorInfo( + page, + this.generatedData.lastUsedSelector + ); + + this.socket.emit('decision', { + pair, + actionType: 'customAction', + lastData: { + selector: this.generatedData.lastUsedSelector, + action: this.generatedData.lastAction, + tagName: elementInfo.tagName, + innerText: elementInfo.innerText, + }, + }); + } + } catch (e) { + const { message } = e as Error; + logger.log("warn", `Error handling customAction: ${message}`); + } }; /** @@ -810,6 +884,48 @@ export class WorkflowGenerator { } }; + /** + * Removes an action with the given actionId from the workflow. + * Only removes the specific action from the what array, not the entire pair. + * If the what array becomes empty after removal, then the entire pair is removed. + * @param actionId The actionId of the action to remove + * @returns boolean indicating whether an action was removed + */ + public removeAction = (actionId: string): boolean => { + let actionWasRemoved = false; + + this.workflowRecord.workflow = this.workflowRecord.workflow + .map((pair) => { + const filteredWhat = pair.what.filter( + (whatItem: any) => whatItem.actionId !== actionId + ); + + if (filteredWhat.length < pair.what.length) { + actionWasRemoved = true; + + if (filteredWhat.length > 0) { + return { + ...pair, + what: filteredWhat + }; + } + + return null; + } + + return pair; + }) + .filter((pair) => pair !== null) as WhereWhatPair[]; // Remove null entries + + if (actionWasRemoved) { + logger.log("info", `Action with actionId ${actionId} removed from workflow`); + } else { + logger.log("debug", `No action found with actionId ${actionId}`); + } + + return actionWasRemoved; + }; + /** * Updates the socket used for communication with the client. * @param socket The socket to be used for communication. diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 5e843a80..6142ed24 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -91,13 +91,16 @@ export class WorkflowInterpreter { * Storage for different types of serializable data */ public serializableDataByType: { - scrapeSchema: any[], - scrapeList: any[], + scrapeSchema: Record; + scrapeList: Record; + [key: string]: any; } = { - scrapeSchema: [], - scrapeList: [], + scrapeSchema: {}, + scrapeList: {}, }; + private currentActionName: string | null = null; + /** * Track the current action type being processed */ @@ -106,7 +109,7 @@ export class WorkflowInterpreter { /** * An array of all the binary data extracted from the run. */ - public binaryData: { mimetype: string, data: string }[] = []; + public binaryData: { name: string; mimeType: string; data: string }[] = []; /** * Track current scrapeList index @@ -259,14 +262,19 @@ export class WorkflowInterpreter { } }, binaryCallback: async (data: string, mimetype: string) => { - const binaryItem = { mimetype, data: JSON.stringify(data) }; + // For editor mode, we don't have the name yet, so use a timestamp-based name + const binaryItem = { + name: `Screenshot ${Date.now()}`, + mimeType: mimetype, + data: JSON.stringify(data) + }; this.binaryData.push(binaryItem); - + // Persist binary data to database await this.persistBinaryDataToDatabase(binaryItem); - - this.socket.emit('binaryCallback', { - data, + + this.socket.emit('binaryCallback', { + data, mimetype, type: 'captureScreenshot' }); @@ -364,9 +372,10 @@ export class WorkflowInterpreter { this.breakpoints = []; this.interpretationResume = null; this.currentActionType = null; + this.currentActionName = null; this.serializableDataByType = { - scrapeSchema: [], - scrapeList: [], + scrapeSchema: {}, + scrapeList: {}, }; this.binaryData = []; this.currentScrapeListIndex = 0; @@ -409,7 +418,7 @@ export class WorkflowInterpreter { * Persists binary data to database in real-time * @private */ - private persistBinaryDataToDatabase = async (binaryItem: { mimetype: string, data: string }): Promise => { + private persistBinaryDataToDatabase = async (binaryItem: { name: string; mimeType: string; data: string }): Promise => { if (!this.currentRunId) { logger.log('debug', 'No run ID available for binary data persistence'); return; @@ -422,22 +431,29 @@ export class WorkflowInterpreter { return; } - const currentBinaryOutput = run.binaryOutput ? - JSON.parse(JSON.stringify(run.binaryOutput)) : - {}; - - const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`; - + const currentBinaryOutput = + run.binaryOutput && typeof run.binaryOutput === 'object' + ? JSON.parse(JSON.stringify(run.binaryOutput)) + : {}; + + const baseName = binaryItem.name?.trim() || `Screenshot ${Object.keys(currentBinaryOutput).length + 1}`; + + let uniqueName = baseName; + let counter = 1; + while (currentBinaryOutput[uniqueName]) { + uniqueName = `${baseName} (${counter++})`; + } + const updatedBinaryOutput = { ...currentBinaryOutput, - [uniqueKey]: binaryItem + [uniqueName]: binaryItem, }; await run.update({ binaryOutput: updatedBinaryOutput }); - - logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.mimetype}`); + + logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.name} (${binaryItem.mimeType})`); } catch (error: any) { logger.log('error', `Failed to persist binary data in real-time for run ${this.currentRunId}: ${error.message}`); } @@ -478,41 +494,101 @@ export class WorkflowInterpreter { }, incrementScrapeListIndex: () => { this.currentScrapeListIndex++; - } + }, + setActionName: (name: string) => { + this.currentActionName = name; + }, }, serializableCallback: async (data: any) => { - if (this.currentActionType === 'scrapeSchema') { - if (Array.isArray(data) && data.length > 0) { - mergedScrapeSchema = { ...mergedScrapeSchema, ...data[0] }; - this.serializableDataByType.scrapeSchema.push(data); - } else { - mergedScrapeSchema = { ...mergedScrapeSchema, ...data }; - this.serializableDataByType.scrapeSchema.push([data]); + try { + if (!data || typeof data !== "object") return; + + if (!this.currentActionType && Array.isArray(data) && data.length > 0) { + const first = data[0]; + if (first && Object.keys(first).some(k => k.toLowerCase().includes("label") || k.toLowerCase().includes("text"))) { + this.currentActionType = "scrapeSchema"; + } } - - // Persist the cumulative scrapeSchema data - const cumulativeScrapeSchemaData = Object.keys(mergedScrapeSchema).length > 0 ? [mergedScrapeSchema] : []; - if (cumulativeScrapeSchemaData.length > 0) { - await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData); + + let typeKey = this.currentActionType || "unknown"; + + if (this.currentActionType === "scrapeList") { + typeKey = "scrapeList"; + } else if (this.currentActionType === "scrapeSchema") { + typeKey = "scrapeSchema"; } - } else if (this.currentActionType === 'scrapeList') { - if (data && Array.isArray(data) && data.length > 0) { - // Use the current index for persistence - await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex); + + if (this.currentActionType === "scrapeList" && data.scrapeList) { + data = data.scrapeList; + } else if (this.currentActionType === "scrapeSchema" && data.scrapeSchema) { + data = data.scrapeSchema; } - this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data; - } - - this.socket.emit('serializableCallback', data); + + let actionName = this.currentActionName || ""; + + if (!actionName) { + if (!Array.isArray(data) && Object.keys(data).length === 1) { + const soleKey = Object.keys(data)[0]; + const soleValue = data[soleKey]; + if (Array.isArray(soleValue) || typeof soleValue === "object") { + actionName = soleKey; + data = soleValue; + } + } + } + + if (!actionName) { + actionName = "Unnamed Action"; + } + + const flattened = Array.isArray(data) + ? data + : (data?.List ?? (data && typeof data === 'object' ? Object.values(data).flat?.() ?? data : [])); + + if (!this.serializableDataByType[typeKey]) { + this.serializableDataByType[typeKey] = {}; + } + + this.serializableDataByType[typeKey][actionName] = flattened; + + await this.persistDataToDatabase(typeKey, { [actionName]: flattened }); + + this.socket.emit("serializableCallback", { + type: typeKey, + name: actionName, + data: flattened, + }); + + this.currentActionType = null; + this.currentActionName = null; + } catch (err: any) { + logger.log('error', `serializableCallback handler failed: ${err.message}`); + } }, - binaryCallback: async (data: string, mimetype: string) => { - const binaryItem = { mimetype, data: JSON.stringify(data) }; - this.binaryData.push(binaryItem); - - // Persist binary data to database - await this.persistBinaryDataToDatabase(binaryItem); - - this.socket.emit('binaryCallback', { data, mimetype }); + binaryCallback: async (payload: { name: string; data: Buffer; mimeType: string }) => { + try { + const { name, data, mimeType } = payload; + + const base64Data = data.toString("base64"); + + const binaryItem = { + name, + mimeType, + data: base64Data + }; + + this.binaryData.push(binaryItem); + + await this.persistBinaryDataToDatabase(binaryItem); + + this.socket.emit("binaryCallback", { + name, + data: base64Data, + mimeType + }); + } catch (err: any) { + logger.log("error", `binaryCallback handler failed: ${err.message}`); + } } } @@ -542,20 +618,13 @@ export class WorkflowInterpreter { const result = { log: this.debugMessages, result: status, - scrapeSchemaOutput: Object.keys(mergedScrapeSchema).length > 0 - ? { "schema_merged": [mergedScrapeSchema] } - : this.serializableDataByType.scrapeSchema.reduce((reducedObject, item, index) => { - reducedObject[`schema_${index}`] = item; - return reducedObject; - }, {} as Record), - scrapeListOutput: this.serializableDataByType.scrapeList.reduce((reducedObject, item, index) => { - reducedObject[`list_${index}`] = item; - return reducedObject; - }, {} as Record), - binaryOutput: this.binaryData.reduce((reducedObject, item, index) => { - reducedObject[`item_${index}`] = item; - return reducedObject; - }, {} as Record) + scrapeSchemaOutput: this.serializableDataByType.scrapeSchema, + scrapeListOutput: this.serializableDataByType.scrapeList, + binaryOutput: this.binaryData.reduce>((acc, item) => { + const key = item.name || `Screenshot ${Object.keys(acc).length + 1}`; + acc[key] = { data: item.data, mimeType: item.mimeType }; + return acc; + }, {}) } logger.log('debug', `Interpretation finished`); @@ -642,19 +711,37 @@ export class WorkflowInterpreter { const currentSerializableOutput = run.serializableOutput ? JSON.parse(JSON.stringify(run.serializableOutput)) : { scrapeSchema: [], scrapeList: [] }; + + if (Array.isArray(currentSerializableOutput.scrapeList)) { + currentSerializableOutput.scrapeList = {}; + } + if (Array.isArray(currentSerializableOutput.scrapeSchema)) { + currentSerializableOutput.scrapeSchema = {}; + } let hasUpdates = false; + const mergeLists = (target: Record, updates: Record) => { + for (const [key, val] of Object.entries(updates)) { + const flattened = Array.isArray(val) + ? val + : (val?.List ?? (val && typeof val === 'object' ? Object.values(val).flat?.() ?? val : [])); + target[key] = flattened; + } + }; + for (const item of batchToProcess) { if (item.actionType === 'scrapeSchema') { - const newSchemaData = Array.isArray(item.data) ? item.data : [item.data]; - currentSerializableOutput.scrapeSchema = newSchemaData; - hasUpdates = true; - } else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') { - if (!Array.isArray(currentSerializableOutput.scrapeList)) { - currentSerializableOutput.scrapeList = []; + if (!currentSerializableOutput.scrapeSchema || typeof currentSerializableOutput.scrapeSchema !== 'object') { + currentSerializableOutput.scrapeSchema = {}; } - currentSerializableOutput.scrapeList[item.listIndex] = item.data; + mergeLists(currentSerializableOutput.scrapeSchema, item.data); + hasUpdates = true; + } else if (item.actionType === 'scrapeList') { + if (!currentSerializableOutput.scrapeList || typeof currentSerializableOutput.scrapeList !== 'object') { + currentSerializableOutput.scrapeList = {}; + } + mergeLists(currentSerializableOutput.scrapeList, item.data); hasUpdates = true; } } diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index e1f27264..0437afc3 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -12,8 +12,8 @@ interface AirtableUpdateTask { } interface SerializableOutput { - scrapeSchema?: any[]; - scrapeList?: any[]; + scrapeSchema?: Record; + scrapeList?: Record; } const MAX_RETRIES = 3; @@ -48,47 +48,94 @@ async function refreshAirtableToken(refreshToken: string) { function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record) { const allRecords: Record[] = []; - const schemaData: Array<{key: string, value: any}> = []; + const schemaData: Array<{ Group: string; Field: string; Value: any }> = []; const listData: any[] = []; const screenshotData: Array<{key: string, url: string}> = []; // Collect schema data if (serializableOutput.scrapeSchema) { - for (const schemaArray of serializableOutput.scrapeSchema) { - if (!Array.isArray(schemaArray)) continue; - for (const schemaItem of schemaArray) { - Object.entries(schemaItem).forEach(([key, value]) => { - if (key && key.trim() !== '' && value !== null && value !== undefined && value !== '') { - schemaData.push({key, value}); - } - }); + if (Array.isArray(serializableOutput.scrapeSchema)) { + for (const schemaArray of serializableOutput.scrapeSchema) { + if (!Array.isArray(schemaArray)) continue; + for (const schemaItem of schemaArray) { + Object.entries(schemaItem || {}).forEach(([key, value]) => { + if (key && key.trim() !== "" && value !== null && value !== undefined && value !== "") { + schemaData.push({ Group: "Default", Field: key, Value: value }); + } + }); + } + } + } else if (typeof serializableOutput.scrapeSchema === "object") { + for (const [groupName, schemaArray] of Object.entries(serializableOutput.scrapeSchema)) { + if (!Array.isArray(schemaArray)) continue; + for (const schemaItem of schemaArray) { + Object.entries(schemaItem || {}).forEach(([fieldName, value]) => { + if (fieldName && fieldName.trim() !== "" && value !== null && value !== undefined && value !== "") { + schemaData.push({ + Group: groupName, + Field: fieldName, + Value: value, + }); + } + }); + } } } } // Collect list data if (serializableOutput.scrapeList) { - for (const listArray of serializableOutput.scrapeList) { - if (!Array.isArray(listArray)) continue; - listArray.forEach(listItem => { - const hasContent = Object.values(listItem).some(value => - value !== null && value !== undefined && value !== '' - ); - if (hasContent) { - listData.push(listItem); - } - }); + if (Array.isArray(serializableOutput.scrapeList)) { + for (const listArray of serializableOutput.scrapeList) { + if (!Array.isArray(listArray)) continue; + listArray.forEach((listItem) => { + const hasContent = Object.values(listItem || {}).some( + (value) => value !== null && value !== undefined && value !== "" + ); + if (hasContent) listData.push(listItem); + }); + } + } else if (typeof serializableOutput.scrapeList === "object") { + for (const [listName, listArray] of Object.entries(serializableOutput.scrapeList)) { + if (!Array.isArray(listArray)) continue; + listArray.forEach((listItem) => { + const hasContent = Object.values(listItem || {}).some( + (value) => value !== null && value !== undefined && value !== "" + ); + if (hasContent) listData.push({ List: listName, ...listItem }); + }); + } } } // Collect screenshot data - if (binaryOutput && Object.keys(binaryOutput).length > 0) { - Object.entries(binaryOutput).forEach(([key, url]) => { - if (key && key.trim() !== '' && url && url.trim() !== '') { - screenshotData.push({key, url}); - } - }); - } + // if (binaryOutput && Object.keys(binaryOutput).length > 0) { + // Object.entries(binaryOutput).forEach(([key, rawValue]: [string, any]) => { + // if (!key || key.trim() === "") return; + + // let urlString = ""; + + // // Case 1: old format (string URL) + // if (typeof rawValue === "string") { + // urlString = rawValue; + // } + // // Case 2: new format (object with { url?, data?, mimeType? }) + // else if (rawValue && typeof rawValue === "object") { + // const valueObj = rawValue as { url?: string; data?: string; mimeType?: string }; + + // if (typeof valueObj.url === "string") { + // urlString = valueObj.url; + // } else if (typeof valueObj.data === "string") { + // const mime = valueObj.mimeType || "image/png"; + // urlString = `data:${mime};base64,${valueObj.data}`; + // } + // } + + // if (typeof urlString === "string" && urlString.trim() !== "") { + // screenshotData.push({ key, url: urlString }); + // } + // }); + // } // Mix all data types together to create consecutive records const maxLength = Math.max(schemaData.length, listData.length, screenshotData.length); @@ -97,8 +144,9 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: const record: Record = {}; if (i < schemaData.length) { - record.Label = schemaData[i].key; - record.Value = schemaData[i].value; + record.Group = schemaData[i].Group; + record.Label = schemaData[i].Field; + record.Value = schemaData[i].Value; } if (i < listData.length) { @@ -120,20 +168,15 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: } for (let i = maxLength; i < schemaData.length; i++) { - allRecords.push({ - Label: schemaData[i].key, - Value: schemaData[i].value - }); + allRecords.push({ Label: schemaData[i].Field, Value: schemaData[i].Value }); } - for (let i = maxLength; i < listData.length; i++) { allRecords.push(listData[i]); } - for (let i = maxLength; i < screenshotData.length; i++) { allRecords.push({ Key: screenshotData[i].key, - Screenshot: screenshotData[i].url + Screenshot: screenshotData[i].url, }); } diff --git a/server/src/workflow-management/integrations/gsheet.ts b/server/src/workflow-management/integrations/gsheet.ts index fcf9b95c..c32e4fe0 100644 --- a/server/src/workflow-management/integrations/gsheet.ts +++ b/server/src/workflow-management/integrations/gsheet.ts @@ -49,25 +49,34 @@ export async function updateGoogleSheet(robotId: string, runId: string) { const serializableOutput = plainRun.serializableOutput as SerializableOutput; if (serializableOutput) { - if (serializableOutput.scrapeSchema && serializableOutput.scrapeSchema.length > 0) { - await processOutputType( - robotId, - spreadsheetId, - 'Text', - serializableOutput.scrapeSchema, - plainRobot - ); + if (serializableOutput.scrapeSchema && typeof serializableOutput.scrapeSchema === "object") { + for (const [groupName, schemaArray] of Object.entries(serializableOutput.scrapeSchema)) { + if (!Array.isArray(schemaArray) || schemaArray.length === 0) continue; + + await processOutputType( + robotId, + spreadsheetId, + `Schema - ${groupName}`, + schemaArray, + plainRobot + ); + } } - - if (serializableOutput.scrapeList && serializableOutput.scrapeList.length > 0) { - await processOutputType( - robotId, - spreadsheetId, - 'List', - serializableOutput.scrapeList, - plainRobot - ); + + if (serializableOutput.scrapeList && typeof serializableOutput.scrapeList === "object") { + for (const [listName, listArray] of Object.entries(serializableOutput.scrapeList)) { + if (!Array.isArray(listArray) || listArray.length === 0) continue; + + await processOutputType( + robotId, + spreadsheetId, + `List - ${listName}`, + listArray, + plainRobot + ); + } } + } if (plainRun.binaryOutput && Object.keys(plainRun.binaryOutput).length > 0) { @@ -102,30 +111,27 @@ async function processOutputType( outputData: any[], robotConfig: any ) { - for (let i = 0; i < outputData.length; i++) { - const data = outputData[i]; - - if (!data || data.length === 0) { - console.log(`No data to write for ${outputType}-${i}. Skipping.`); - continue; - } - - const sheetName = `${outputType}-${i}`; - - await ensureSheetExists(spreadsheetId, sheetName, robotConfig); - - let formattedData = data; - if (outputType === 'Text' && data.length > 0) { - const schemaItem = data[0]; - formattedData = Object.entries(schemaItem).map(([key, value]) => ({ - Label: key, - Value: value - })); - } - - await writeDataToSheet(robotId, spreadsheetId, formattedData, sheetName, robotConfig); - console.log(`Data written to ${sheetName} sheet for ${outputType} data`); + const data = outputData; + const sheetName = outputType; + + if (!Array.isArray(data) || data.length === 0) { + console.log(`No data to write for ${sheetName}. Skipping.`); + return; } + + await ensureSheetExists(spreadsheetId, sheetName, robotConfig); + + const formattedData = data.map(item => { + const flatRow: Record = {}; + for (const [key, value] of Object.entries(item || {})) { + flatRow[key] = + typeof value === "object" && value !== null ? JSON.stringify(value) : value; + } + return flatRow; + }); + + await writeDataToSheet(robotId, spreadsheetId, formattedData, sheetName, robotConfig); + console.log(`Data written to ${sheetName} sheet for ${outputType} data`); } async function ensureSheetExists(spreadsheetId: string, sheetName: string, robotConfig: any) { diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index ce272689..899cb7f6 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -4,7 +4,7 @@ import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import { io, Socket } from "socket.io-client"; import { createRemoteBrowserForRun, destroyRemoteBrowser } from '../../browser-management/controller'; import logger from '../../logger'; -import { browserPool } from "../../server"; +import { browserPool, io as serverIo } from "../../server"; import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; import Robot from "../../models/Robot"; import Run from "../../models/Run"; @@ -46,7 +46,7 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { }; } - const browserId = createRemoteBrowserForRun( userId); + const browserId = createRemoteBrowserForRun(userId); const runId = uuid(); const run = await Run.create({ @@ -63,10 +63,30 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { runByScheduleId: uuid(), serializableOutput: {}, binaryOutput: {}, + retryCount: 0 }); const plainRun = run.toJSON(); + try { + const runScheduledData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: plainRun.name, + status: 'scheduled', + startedAt: plainRun.startedAt, + runByUserId: plainRun.runByUserId, + runByScheduleId: plainRun.runByScheduleId, + runByAPI: plainRun.runByAPI || false, + browserId: plainRun.browserId + }; + + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-scheduled', runScheduledData); + logger.log('info', `Scheduled run notification sent for run: ${plainRun.runId} to user-${userId}`); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-scheduled notification for run ${plainRun.runId}: ${socketError.message}`); + } + return { browserId, runId: plainRun.runId, @@ -83,6 +103,29 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { } } +async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { + try { + googleSheetUpdateTasks[runId] = { + robotId: robotMetaId, + runId: runId, + status: 'pending', + retries: 5, + }; + + airtableUpdateTasks[runId] = { + robotId: robotMetaId, + runId: runId, + status: 'pending', + retries: 5, + }; + + processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + } catch (err: any) { + logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); + } +} + function AddGeneratedFlags(workflow: WorkflowFile) { const copy = JSON.parse(JSON.stringify(workflow)); for (let i = 0; i < workflow.workflow.length; i++) { @@ -95,6 +138,8 @@ function AddGeneratedFlags(workflow: WorkflowFile) { }; async function executeRun(id: string, userId: string) { + let browser: any = null; + try { const run = await Run.findOne({ where: { runId: id } }); if (!run) { @@ -133,6 +178,21 @@ async function executeRun(id: string, userId: string) { log: plainRun.log ? `${plainRun.log}\nMax retries exceeded (3/3) - Run failed after multiple attempts.` : `Max retries exceeded (3/3) - Run failed after multiple attempts.` }); + try { + const failureSocketData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(run.browserId).emit('run-completed', failureSocketData); + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-completed', failureSocketData); + } catch (socketError: any) { + logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`); + } + return { success: false, error: 'Max retries exceeded' @@ -149,7 +209,22 @@ async function executeRun(id: string, userId: string) { plainRun.status = 'running'; - const browser = browserPool.getRemoteBrowser(plainRun.browserId); + try { + const runStartedData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'running', + startedAt: plainRun.startedAt + }; + + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-started', runStartedData); + logger.log('info', `Run started notification sent for run: ${plainRun.runId} to user-${userId}`); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-started notification for run ${plainRun.runId}: ${socketError.message}`); + } + + browser = browserPool.getRemoteBrowser(plainRun.browserId); if (!browser) { throw new Error('Could not access browser'); } @@ -168,56 +243,52 @@ async function executeRun(id: string, userId: string) { workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + + const finalRun = await Run.findByPk(run.id); + const categorizedOutput = { + scrapeSchema: finalRun?.serializableOutput?.scrapeSchema || {}, + scrapeList: finalRun?.serializableOutput?.scrapeList || {}, + }; + await destroyRemoteBrowser(plainRun.browserId, userId); await run.update({ status: 'success', finishedAt: new Date().toLocaleString(), log: interpretationInfo.log.join('\n'), + binaryOutput: uploadedBinaryOutput }); - // Upload binary output to MinIO and update run with MinIO URLs - const updatedRun = await Run.findOne({ where: { runId: id } }); - if (updatedRun && updatedRun.binaryOutput && Object.keys(updatedRun.binaryOutput).length > 0) { - try { - const binaryService = new BinaryOutputService('maxun-run-screenshots'); - await binaryService.uploadAndStoreBinaryOutput(updatedRun, updatedRun.binaryOutput); - logger.log('info', `Uploaded binary output to MinIO for scheduled run ${id}`); - } catch (minioError: any) { - logger.log('error', `Failed to upload binary output to MinIO for scheduled run ${id}: ${minioError.message}`); - } - } - // Get metrics from persisted data for analytics and webhooks let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (updatedRun) { - if (updatedRun.serializableOutput) { - if (updatedRun.serializableOutput.scrapeSchema) { - Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalSchemaItemsExtracted += 1; - } - }); - } - - if (updatedRun.serializableOutput.scrapeList) { - Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - totalListItemsExtracted += listResult.length; - } - }); - } + if (categorizedOutput) { + if (categorizedOutput.scrapeSchema) { + Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); } - if (updatedRun.binaryOutput) { - extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length; + if (categorizedOutput.scrapeList) { + Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); } } + + if (run.binaryOutput) { + extractedScreenshotsCount = Object.keys(run.binaryOutput).length; + } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; @@ -234,6 +305,21 @@ async function executeRun(id: string, userId: string) { } ); + try { + const completionData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording.recording_meta.name, + status: 'success', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(plainRun.browserId).emit('run-completed', completionData); + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-completed', completionData); + } catch (emitError: any) { + logger.log('warn', `Failed to emit success event: ${emitError.message}`); + } + const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: plainRun.runId, @@ -242,16 +328,20 @@ async function executeRun(id: string, userId: string) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [], - captured_lists: updatedRun?.serializableOutput?.scrapeList || {}, - total_rows: totalRowsExtracted, + captured_texts: Object.keys(categorizedOutput.scrapeSchema || {}).length > 0 + ? Object.entries(categorizedOutput.scrapeSchema).reduce((acc, [name, value]) => { + acc[name] = Array.isArray(value) ? value : [value]; + return acc; + }, {} as Record) + : {}, + captured_lists: categorizedOutput.scrapeList, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, screenshots_count: extractedScreenshotsCount }, metadata: { browser_id: plainRun.browserId, - user_id: userId + user_id: userId, } }; @@ -262,26 +352,7 @@ async function executeRun(id: string, userId: string) { logger.log('error', `Failed to send webhooks for run ${plainRun.runId}: ${webhookError.message}`); } - try { - googleSheetUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; - - airtableUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; - - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); - } catch (err: any) { - logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); - } + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); return true; } catch (error: any) { logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); @@ -320,6 +391,21 @@ async function executeRun(id: string, userId: string) { } catch (webhookError: any) { logger.log('error', `Failed to send failure webhooks for run ${run.runId}: ${webhookError.message}`); } + + try { + const failureSocketData = { + runId: run.runId, + robotMetaId: run.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(run.browserId).emit('run-completed', failureSocketData); + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-completed', failureSocketData); + } catch (socketError: any) { + logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`); + } } capture( 'maxun-oss-run-created-scheduled', diff --git a/src/context/browserSteps.tsx b/src/context/browserSteps.tsx index de7a6460..d28aef46 100644 --- a/src/context/browserSteps.tsx +++ b/src/context/browserSteps.tsx @@ -1,6 +1,7 @@ import React, { createContext, useContext, useEffect, useRef, useState } from 'react'; import { useSocketStore } from "./socket"; import { useGlobalInfoStore } from "./globalInfo"; +import { useActionContext } from './browserActions'; export interface TextStep { id: number; @@ -96,6 +97,7 @@ export const BrowserStepsProvider: React.FC<{ children: React.ReactNode }> = ({ const { currentTextGroupName } = useGlobalInfoStore(); const [browserSteps, setBrowserSteps] = useState([]); const [discardedFields, setDiscardedFields] = useState>(new Set()); + const { paginationType, limitType, customLimit } = useActionContext(); const browserStepsRef = useRef(browserSteps); useEffect(() => { @@ -127,15 +129,21 @@ export const BrowserStepsProvider: React.FC<{ children: React.ReactNode }> = ({ } }); + const livePaginationType = paginationType || listStep.pagination?.type || ""; + const liveLimit = + limitType === "custom" + ? parseInt(customLimit || "0", 10) + : parseInt(limitType || "0", 10); + return { listSelector: listStep.listSelector, fields: fields, pagination: { - type: listStep.pagination?.type || "", + type: livePaginationType, selector: listStep.pagination?.selector, isShadow: listStep.isShadow }, - limit: listStep.limit, + limit: liveLimit > 0 ? liveLimit : listStep.limit, isShadow: listStep.isShadow }; };