From 01f4ea3e21a5519e7fe3ec381f8bacf02d4bd55d Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 12 Jun 2025 11:08:20 +0530 Subject: [PATCH] feat: revamp abort run execution --- server/src/pgboss-worker.ts | 79 ++++++++++--------------------------- 1 file changed, 20 insertions(+), 59 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 15907b19..7b9ae5f1 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -627,12 +627,7 @@ async function processRunExecution(job: Job) { async function abortRun(runId: string, userId: string): Promise { try { - const run = await Run.findOne({ - where: { - runId: runId, - runByUserId: userId - } - }); + const run = await Run.findOne({ where: { runId: runId } }); if (!run) { logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); @@ -683,32 +678,9 @@ async function abortRun(runId: string, userId: string): Promise { } let currentLog = 'Run aborted by user'; - let categorizedOutput = { - scrapeSchema: {}, - scrapeList: {}, - }; - let binaryOutput: Record = {}; - - try { - if (browser.interpreter) { - if (browser.interpreter.debugMessages) { - currentLog = browser.interpreter.debugMessages.join('\n') || currentLog; - } - - if (browser.interpreter.serializableDataByType) { - categorizedOutput = { - scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []), - scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []), - }; - } - - if (browser.interpreter.binaryData) { - binaryOutput = collectBinaryData(browser.interpreter.binaryData); - } - } - } catch (interpreterError) { - logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`); - } + const extractedData = await extractAndProcessScrapedData(browser, run); + + console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`); await run.update({ status: 'aborted', @@ -716,12 +688,16 @@ async function abortRun(runId: string, userId: string): Promise { browserId: plainRun.browserId, log: currentLog, serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), + scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema), + scrapeList: Object.values(extractedData.categorizedOutput.scrapeList), }, - binaryOutput, + binaryOutput: extractedData.uploadedBinaryOutput, }); + if (extractedData.totalDataPointsExtracted > 0) { + await triggerIntegrationUpdates(runId, plainRun.robotMetaId); + } + try { serverIo.of(plainRun.browserId).emit('run-aborted', { runId, @@ -733,6 +709,15 @@ async function abortRun(runId: string, userId: string): Promise { logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); } + try { + await new Promise(resolve => setTimeout(resolve, 500)); + + await destroyRemoteBrowser(plainRun.browserId, userId); + logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`); + } catch (cleanupError) { + logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); + } + return true; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -741,30 +726,6 @@ async function abortRun(runId: string, userId: string): Promise { } } -/** - * Helper function to collect data from arrays into indexed objects - * @param dataArray Array of data to be transformed into an object with indexed keys - * @returns Object with indexed keys - */ -function collectDataByType(dataArray: any[]): Record { - return dataArray.reduce((result: Record, item, index) => { - result[`item-${index}`] = item; - return result; - }, {}); -} - -/** - * Helper function to collect binary data (like screenshots) - * @param binaryDataArray Array of binary data objects to be transformed - * @returns Object with indexed keys - */ -function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record { - return binaryDataArray.reduce((result: Record, item, index) => { - result[`item-${index}`] = item; - return result; - }, {}); -} - async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map();