From 67e6e0c3c18db071c25461f1c5a1bb6d7c0c9fc5 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 12 Jun 2025 11:03:39 +0530 Subject: [PATCH] feat: revamp process run execution --- server/src/pgboss-worker.ts | 383 ++++++++++++++++++++++++------------ 1 file changed, 254 insertions(+), 129 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index aec74159..15907b19 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -212,10 +212,12 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr * Modified processRunExecution function - only add browser reset */ async function processRunExecution(job: Job) { - try { - const data = job.data; - logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`); - + const BROWSER_INIT_TIMEOUT = 30000; + + const data = job.data; + logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`); + + try { // Find the run const run = await Run.findOne({ where: { runId: data.runId } }); if (!run) { @@ -229,64 +231,56 @@ async function processRunExecution(job: Job) { } const plainRun = run.toJSON(); - - // Find the recording - const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); - if (!recording) { - logger.log('error', `Recording for run ${data.runId} not found`); - - const currentRun = await Run.findOne({ where: { runId: data.runId } }); - if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Failed: Recording not found', - }); - - // Trigger webhooks for run failure - const failedWebhookPayload = { - robot_id: plainRun.robotMetaId, - run_id: data.runId, - robot_name: 'Unknown Robot', - status: 'failed', - started_at: plainRun.startedAt, - finished_at: new Date().toLocaleString(), - error: { - message: "Failed: Recording not found", - type: 'RecordingNotFoundError' - }, - metadata: { - browser_id: plainRun.browserId, - user_id: data.userId, - } - }; - - try { - await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); - logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); - } catch (webhookError: any) { - logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); - } - } - - return { success: false }; - } - - // Get the browser and execute the run - const browser = browserPool.getRemoteBrowser(plainRun.browserId); - let currentPage = browser?.getCurrentPage(); + const browserId = data.browserId || plainRun.browserId; - if (!browser || !currentPage) { - logger.log('error', `Browser or page not available for run ${data.runId}`); - - return { success: false }; + if (!browserId) { + throw new Error(`No browser ID available for run ${data.runId}`); } + logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`); + + let browser = browserPool.getRemoteBrowser(browserId); + const browserWaitStart = Date.now(); + + while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) { + logger.log('debug', `Browser ${browserId} not ready yet, waiting...`); + await new Promise(resolve => setTimeout(resolve, 1000)); + browser = browserPool.getRemoteBrowser(browserId); + } + + if (!browser) { + throw new Error(`Browser ${browserId} not found in pool after timeout`); + } + + logger.log('info', `Browser ${browserId} found and ready for execution`); + try { + // Find the recording + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); + + if (!recording) { + throw new Error(`Recording for run ${data.runId} not found`); + } + const isRunAborted = async (): Promise => { const currentRun = await Run.findOne({ where: { runId: data.runId } }); return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; }; + + let currentPage = browser.getCurrentPage(); + + const pageWaitStart = Date.now(); + while (!currentPage && (Date.now() - pageWaitStart) < 30000) { + logger.log('debug', `Page not ready for browser ${browserId}, waiting...`); + await new Promise(resolve => setTimeout(resolve, 1000)); + currentPage = browser.getCurrentPage(); + } + + if (!currentPage) { + throw new Error(`No current page available for browser ${browserId} after timeout`); + } + + logger.log('info', `Starting workflow execution for run ${data.runId}`); // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); @@ -299,23 +293,27 @@ async function processRunExecution(job: Job) { if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); + + await destroyRemoteBrowser(plainRun.browserId, data.userId); return { success: true }; } + + logger.log('info', `Workflow execution completed for run ${data.runId}`); const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - if (await isRunAborted()) { - logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); - return { success: true }; - } - const categorizedOutput = { scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, scrapeList: interpretationInfo.scrapeListOutput || {} }; + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); + return { success: true }; + } + await run.update({ ...run, status: 'success', @@ -330,6 +328,7 @@ async function processRunExecution(job: Job) { }); // Track extraction metrics + let totalDataPointsExtracted = 0; let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; @@ -337,23 +336,35 @@ async function processRunExecution(job: Job) { if (categorizedOutput.scrapeSchema) { Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { if (Array.isArray(schemaResult)) { + schemaResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); totalSchemaItemsExtracted += schemaResult.length; } else if (schemaResult && typeof schemaResult === 'object') { + totalDataPointsExtracted += Object.keys(schemaResult).length; totalSchemaItemsExtracted += 1; } }); } - + if (categorizedOutput.scrapeList) { Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { if (Array.isArray(listResult)) { + listResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); totalListItemsExtracted += listResult.length; } }); } - + if (uploadedBinaryOutput) { extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; + totalDataPointsExtracted += extractedScreenshotsCount; } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; @@ -362,6 +373,7 @@ async function processRunExecution(job: Job) { console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); console.log(`Total Rows Extracted: ${totalRowsExtracted}`); + console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`); // Capture metrics capture( @@ -392,7 +404,8 @@ async function processRunExecution(job: Job) { total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, - screenshots_count: extractedScreenshotsCount + screenshots_count: extractedScreenshotsCount, + total_data_points_extracted: totalDataPointsExtracted, }, metadata: { browser_id: plainRun.browserId, @@ -408,94 +421,206 @@ async function processRunExecution(job: Job) { } // Schedule updates for Google Sheets and Airtable - try { - googleSheetUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); - airtableUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; - - processAirtableUpdates(); - processGoogleSheetUpdates(); - } catch (err: any) { - logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); - } - - serverIo.of(plainRun.browserId).emit('run-completed', { + const completionData = { runId: data.runId, robotMetaId: plainRun.robotMetaId, robotName: recording.recording_meta.name, status: 'success', finishedAt: new Date().toLocaleString() - }); + }; + + serverIo.of(browserId).emit('run-completed', completionData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData); + + await destroyRemoteBrowser(browserId, data.userId); + logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`); return { success: true }; } catch (executionError: any) { logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`); - const currentRun = await Run.findOne({ where: { runId: data.runId } }); - if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { - await run.update({ + let partialDataExtracted = false; + let partialData: any = null; + let partialUpdateData: any = { + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${executionError.message}`, + }; + + try { + if (browser && browser.interpreter) { + const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0; + const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0; + const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0; + + if (hasSchemaData || hasListData || hasBinaryData) { + logger.log('info', `Extracting partial data from failed run ${data.runId}`); + + partialData = await extractAndProcessScrapedData(browser, run); + + partialUpdateData.serializableOutput = { + scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema), + scrapeList: Object.values(partialData.categorizedOutput.scrapeList), + }; + partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput; + + partialDataExtracted = true; + logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`); + + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); + } + } + } catch (partialDataError: any) { + logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`); + } + + await run.update(partialUpdateData); + + try { + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + + const failureData = { + runId: data.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', status: 'failed', finishedAt: new Date().toLocaleString(), - log: `Failed: ${executionError.message}`, - }); - - // Capture failure metrics - capture( - 'maxun-oss-run-created-manual', - { - runId: data.runId, - user_id: data.userId, - created_at: new Date().toISOString(), - status: 'failed', - error_message: executionError.message, - } - ); - - // Trigger webhooks for run failure - const failedWebhookPayload = { - robot_id: plainRun.robotMetaId, - run_id: data.runId, - robot_name: recording.recording_meta.name, - status: 'failed', - started_at: plainRun.startedAt, - finished_at: new Date().toLocaleString(), - error: { - message: executionError.message, - stack: executionError.stack, - type: executionError.name || 'ExecutionError' - }, - metadata: { - browser_id: plainRun.browserId, - user_id: data.userId, - } + hasPartialData: partialDataExtracted }; - try { - await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); - logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); - } catch (webhookError: any) { - logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); - } - } else { - logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); + serverIo.of(browserId).emit('run-completed', failureData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData); + } catch (emitError: any) { + logger.log('warn', `Failed to emit failure event: ${emitError.message}`); } - - return { success: false }; + + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + + const failedWebhookPayload = { + robot_id: plainRun.robotMetaId, + run_id: data.runId, + robot_name: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + started_at: plainRun.startedAt, + finished_at: new Date().toLocaleString(), + error: { + message: executionError.message, + stack: executionError.stack, + type: 'ExecutionError', + }, + partial_data_extracted: partialDataExtracted, + extracted_data: partialDataExtracted ? { + captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [], + captured_lists: partialUpdateData.serializableOutput?.scrapeList || {}, + total_data_points_extracted: partialData?.totalDataPointsExtracted || 0, + captured_texts_count: partialData?.totalSchemaItemsExtracted || 0, + captured_lists_count: partialData?.totalListItemsExtracted || 0, + screenshots_count: partialData?.extractedScreenshotsCount || 0 + } : null, + metadata: { + browser_id: plainRun.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); + } + + try { + const failureSocketData = { + runId: data.runId, + robotMetaId: run.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(run.browserId).emit('run-completed', failureSocketData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData); + } catch (socketError: any) { + logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`); + } + + capture('maxun-oss-run-created-manual', { + runId: data.runId, + user_id: data.userId, + created_at: new Date().toISOString(), + status: 'failed', + error_message: executionError.message, + partial_data_extracted: partialDataExtracted, + totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0, + }); + + await destroyRemoteBrowser(browserId, data.userId); + logger.log('info', `Browser ${browserId} destroyed after failed run`); + + return { success: false, partialDataExtracted }; } } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); logger.log('error', `Failed to process run execution job: ${errorMessage}`); + + try { + const run = await Run.findOne({ where: { runId: data.runId }}); + + if (run) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${errorMessage}`, + }); + + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + + const failedWebhookPayload = { + robot_id: run.robotMetaId, + run_id: data.runId, + robot_name: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + started_at: run.startedAt, + finished_at: new Date().toLocaleString(), + error: { + message: errorMessage, + }, + metadata: { + browser_id: run.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); + } + + try { + const failureSocketData = { + runId: data.runId, + robotMetaId: run.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(run.browserId).emit('run-completed', failureSocketData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData); + } catch (socketError: any) { + logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`); + } + } + } catch (updateError: any) { + logger.log('error', `Failed to update run status: ${updateError.message}`); + } + return { success: false }; } }