From 9878db060adcd3f57ccc26b56ffb8a21a2b3bd13 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 24 Apr 2025 18:46:42 +0530 Subject: [PATCH] feat: add abort checks and update status --- server/src/pgboss-worker.ts | 170 ++++++++++++++++++++++++++++-------- 1 file changed, 132 insertions(+), 38 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index bd5ae801..b09ca253 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -180,6 +180,11 @@ async function processRunExecution(job: Job) { return { success: false }; } + if (run.status === 'aborted' || run.status === 'aborting') { + logger.log('info', `Run ${data.runId} has status ${run.status}, skipping execution`); + return { success: true }; + } + const plainRun = run.toJSON(); // Find the recording @@ -187,12 +192,14 @@ async function processRunExecution(job: Job) { if (!recording) { logger.log('error', `Recording for run ${data.runId} not found`); - // Update run status to failed - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Failed: Recording not found', - }); + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Failed: Recording not found', + }); + } // Check for queued runs even if this one failed await checkAndProcessQueuedRun(data.userId, data.browserId); @@ -207,8 +214,6 @@ async function processRunExecution(job: Job) { if (!browser || !currentPage) { logger.log('error', `Browser or page not available for run ${data.runId}`); - await pgBoss.fail(job.id, "Failed to get browser or page for run"); - // Even if this run failed, check for queued runs await checkAndProcessQueuedRun(data.userId, data.browserId); @@ -219,6 +224,11 @@ async function processRunExecution(job: Job) { // Reset the browser state before executing this run await resetBrowserState(browser); + const isRunAborted = async (): Promise => { + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; + }; + // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); const interpretationInfo = await browser.interpreter.InterpretRecording( @@ -228,10 +238,28 @@ async function processRunExecution(job: Job) { plainRun.interpreterSettings ); + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); + + const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); + + if (!queuedRunProcessed) { + await destroyRemoteBrowser(plainRun.browserId, data.userId); + logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + } + + return { success: true }; + } + // Process the results const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); + return { success: true }; + } + // Update the run record with results await run.update({ ...run, @@ -322,11 +350,28 @@ async function processRunExecution(job: Job) { } catch (executionError: any) { logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`); - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: `Failed: ${executionError.message}`, - }); + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${executionError.message}`, + }); + + // Capture failure metrics + capture( + 'maxun-oss-run-created-manual', + { + runId: data.runId, + user_id: data.userId, + created_at: new Date().toISOString(), + status: 'failed', + error_message: executionError.message, + } + ); + } else { + logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); + } // Check for queued runs before destroying the browser const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); @@ -340,18 +385,6 @@ async function processRunExecution(job: Job) { logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`); } } - - // Capture failure metrics - capture( - 'maxun-oss-run-created-manual', - { - runId: data.runId, - user_id: data.userId, - created_at: new Date().toISOString(), - status: 'failed', - error_message: executionError.message, - } - ); return { success: false }; } @@ -377,9 +410,26 @@ async function abortRun(runId: string, userId: string): Promise { return false; } + await run.update({ + status: 'aborting' + }); + const plainRun = run.toJSON(); - const browser = browserPool.getRemoteBrowser(plainRun.browserId); + const recording = await Robot.findOne({ + where: { 'recording_meta.id': plainRun.robotMetaId }, + raw: true + }); + + const robotName = recording?.recording_meta?.name || 'Unknown Robot'; + + let browser; + try { + browser = browserPool.getRemoteBrowser(plainRun.browserId); + } catch (browserError) { + logger.log('warn', `Could not get browser for run ${runId}: ${browserError}`); + browser = null; + } if (!browser) { await run.update({ @@ -388,36 +438,80 @@ async function abortRun(runId: string, userId: string): Promise { log: 'Aborted: Browser not found or already closed' }); + try { + serverIo.of(plainRun.browserId).emit('run-aborted', { + runId, + robotName: robotName, + status: 'aborted', + finishedAt: new Date().toLocaleString() + }); + } catch (socketError) { + logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); + } + logger.log('warn', `Browser not found for run ${runId}`); return true; } - const currentLog = browser.interpreter.debugMessages.join('\n'); - const serializableOutput: Record = {}; - browser.interpreter.serializableData.forEach((item, index) => { - serializableOutput[`item-${index}`] = item; - }); + let currentLog = 'Run aborted by user'; + let serializableOutput: Record = {}; + let binaryOutput: Record = {}; - const binaryOutput: Record = {}; - browser.interpreter.binaryData.forEach((item, index) => { - binaryOutput[`item-${index}`] = item; - }); + try { + if (browser.interpreter) { + if (browser.interpreter.debugMessages) { + currentLog = browser.interpreter.debugMessages.join('\n') || currentLog; + } + + if (browser.interpreter.serializableData) { + browser.interpreter.serializableData.forEach((item, index) => { + serializableOutput[`item-${index}`] = item; + }); + } + + if (browser.interpreter.binaryData) { + browser.interpreter.binaryData.forEach((item, index) => { + binaryOutput[`item-${index}`] = item; + }); + } + } + } catch (interpreterError) { + logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`); + } await run.update({ status: 'aborted', finishedAt: new Date().toLocaleString(), browserId: plainRun.browserId, - log: currentLog || 'Run aborted by user', + log: currentLog, serializableOutput, binaryOutput, }); - const queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + try { + serverIo.of(plainRun.browserId).emit('run-aborted', { + runId, + robotName: robotName, + status: 'aborted', + finishedAt: new Date().toLocaleString() + }); + } catch (socketError) { + logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); + } + + let queuedRunProcessed = false; + try { + queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + } catch (queueError) { + logger.log('warn', `Error checking queued runs: ${queueError}`); + } if (!queuedRunProcessed) { try { + await new Promise(resolve => setTimeout(resolve, 500)); + await destroyRemoteBrowser(plainRun.browserId, userId); - logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`); } catch (cleanupError) { logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); }