From 3fd5526685aa2296d2bf96675c3b31233781661e Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 8 Apr 2025 18:27:27 +0530 Subject: [PATCH 1/6] feat: add abort run job to pgboss queue --- server/src/routes/storage.ts | 42 ++++++++++++++---------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index b8c9a1ab..e7e3939c 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -904,42 +904,32 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { try { if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); } - const run = await Run.findOne({ where: { + + const run = await Run.findOne({ where: { runId: req.params.id, runByUserId: req.user.id, } }); + if (!run) { return res.status(404).send(false); } - const plainRun = run.toJSON(); - - const browser = browserPool.getRemoteBrowser(plainRun.browserId); - const currentLog = browser?.interpreter.debugMessages.join('/n'); - const serializableOutput = browser?.interpreter.serializableData.reduce((reducedObject, item, index) => { - return { - [`item-${index}`]: item, - ...reducedObject, - } - }, {}); - const binaryOutput = browser?.interpreter.binaryData.reduce((reducedObject, item, index) => { - return { - [`item-${index}`]: item, - ...reducedObject, - } - }, {}); - await run.update({ - ...run, - status: 'aborted', - finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: currentLog, - serializableOutput, - binaryOutput, + + const userQueueName = `abort-run-user-${req.user.id}`; + await pgBoss.createQueue(userQueueName); + + await pgBoss.send(userQueueName, { + userId: req.user.id, + runId: req.params.id }); + + await run.update({ + status: 'aborting' + }); + return res.send(true); } catch (e) { const { message } = e as Error; - logger.log('info', `Error while running a robot with name: ${req.params.fileName}_${req.params.runId}.json`); + logger.log('info', `Error while aborting run with id: ${req.params.id} - ${message}`); return res.send(false); } }); From d1c7b5065e9afa5417a4b9cd1f0f125ff0e3228c Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 8 Apr 2025 18:28:12 +0530 Subject: [PATCH 2/6] feat: register abort run queue worker --- server/src/pgboss-worker.ts | 126 ++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 0771cc27..f197489d 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -46,6 +46,11 @@ interface ExecuteRunData { browserId: string; } +interface AbortRunData { + userId: string; + runId: string; +} + const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); /** @@ -358,6 +363,78 @@ async function processRunExecution(job: Job) { } } +async function abortRun(runId: string, userId: string): Promise { + try { + const run = await Run.findOne({ + where: { + runId: runId, + runByUserId: userId + } + }); + + if (!run) { + logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); + return false; + } + + const plainRun = run.toJSON(); + + const browser = browserPool.getRemoteBrowser(plainRun.browserId); + + if (!browser) { + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + log: 'Aborted: Browser not found or already closed' + }); + + logger.log('warn', `Browser not found for run ${runId}`); + return true; + } + + const currentLog = browser.interpreter.debugMessages.join('\n'); + const serializableOutput = browser.interpreter.serializableData.reduce((reducedObject, item, index) => { + return { + [`item-${index}`]: item, + ...reducedObject, + } + }, {}); + + const binaryOutput = browser.interpreter.binaryData.reduce((reducedObject, item, index) => { + return { + [`item-${index}`]: item, + ...reducedObject, + } + }, {}); + + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + browserId: plainRun.browserId, + log: currentLog || 'Run aborted by user', + serializableOutput, + binaryOutput, + }); + + const queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + + if (!queuedRunProcessed) { + try { + await destroyRemoteBrowser(plainRun.browserId, userId); + logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + } catch (cleanupError) { + logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); + } + } + + return true; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to abort run ${runId}: ${errorMessage}`); + return false; + } +} + async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); @@ -412,6 +489,52 @@ async function registerRunExecutionWorker() { } } +async function registerAbortRunWorker() { + try { + const registeredAbortQueues = new Map(); + + const checkForNewAbortQueues = async () => { + try { + const activeQueues = await pgBoss.getQueues(); + + const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-')); + + for (const queue of abortQueues) { + if (!registeredAbortQueues.has(queue.name)) { + await pgBoss.work(queue.name, async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const { userId, runId } = data; + + logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); + const success = await abortRun(runId, userId); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`); + throw error; + } + }); + + registeredAbortQueues.set(queue.name, true); + logger.log('info', `Registered abort worker for queue: ${queue.name}`); + } + } + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to check for new abort queues: ${errorMessage}`); + } + }; + + await checkForNewAbortQueues(); + + logger.log('info', 'Abort run worker registration system initialized'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to initialize abort run worker system: ${errorMessage}`); + } +} + /** * Initialize PgBoss and register all workers @@ -493,6 +616,9 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); + // Register the abort run worker + await registerAbortRunWorker(); + logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); From 395d08ba922c67c75a3d12b97816cd94f8d358b9 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 24 Apr 2025 13:47:26 +0530 Subject: [PATCH 3/6] feat: rm spread operation --- server/src/pgboss-worker.ts | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index f197489d..bd5ae801 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -393,19 +393,15 @@ async function abortRun(runId: string, userId: string): Promise { } const currentLog = browser.interpreter.debugMessages.join('\n'); - const serializableOutput = browser.interpreter.serializableData.reduce((reducedObject, item, index) => { - return { - [`item-${index}`]: item, - ...reducedObject, - } - }, {}); + const serializableOutput: Record = {}; + browser.interpreter.serializableData.forEach((item, index) => { + serializableOutput[`item-${index}`] = item; + }); - const binaryOutput = browser.interpreter.binaryData.reduce((reducedObject, item, index) => { - return { - [`item-${index}`]: item, - ...reducedObject, - } - }, {}); + const binaryOutput: Record = {}; + browser.interpreter.binaryData.forEach((item, index) => { + binaryOutput[`item-${index}`] = item; + }); await run.update({ status: 'aborted', From f6f23419d9ed5b50d95b6ccfb18e4832f94449ab Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 24 Apr 2025 18:46:04 +0530 Subject: [PATCH 4/6] feat: add error handling destroy browser --- server/src/browser-management/controller.ts | 36 ++++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index ef1e0011..a02b3b7c 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -81,13 +81,39 @@ export const createRemoteBrowserForRun = (userId: string): string => { * @category BrowserManagement-Controller */ export const destroyRemoteBrowser = async (id: string, userId: string): Promise => { - const browserSession = browserPool.getRemoteBrowser(id); - if (browserSession) { + try { + const browserSession = browserPool.getRemoteBrowser(id); + if (!browserSession) { + logger.log('info', `Browser with id: ${id} not found, may have already been destroyed`); + return true; + } + logger.log('debug', `Switching off the browser with id: ${id}`); - await browserSession.stopCurrentInterpretation(); - await browserSession.switchOff(); + + try { + await browserSession.stopCurrentInterpretation(); + } catch (stopError) { + logger.log('warn', `Error stopping interpretation for browser ${id}: ${stopError}`); + } + + try { + await browserSession.switchOff(); + } catch (switchOffError) { + logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`); + } + + return browserPool.deleteRemoteBrowser(id); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to destroy browser ${id}: ${errorMessage}`); + + try { + return browserPool.deleteRemoteBrowser(id); + } catch (deleteError) { + logger.log('error', `Failed to delete browser ${id} from pool: ${deleteError}`); + return false; + } } - return browserPool.deleteRemoteBrowser(id); }; /** From 9878db060adcd3f57ccc26b56ffb8a21a2b3bd13 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 24 Apr 2025 18:46:42 +0530 Subject: [PATCH 5/6] feat: add abort checks and update status --- server/src/pgboss-worker.ts | 170 ++++++++++++++++++++++++++++-------- 1 file changed, 132 insertions(+), 38 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index bd5ae801..b09ca253 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -180,6 +180,11 @@ async function processRunExecution(job: Job) { return { success: false }; } + if (run.status === 'aborted' || run.status === 'aborting') { + logger.log('info', `Run ${data.runId} has status ${run.status}, skipping execution`); + return { success: true }; + } + const plainRun = run.toJSON(); // Find the recording @@ -187,12 +192,14 @@ async function processRunExecution(job: Job) { if (!recording) { logger.log('error', `Recording for run ${data.runId} not found`); - // Update run status to failed - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Failed: Recording not found', - }); + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Failed: Recording not found', + }); + } // Check for queued runs even if this one failed await checkAndProcessQueuedRun(data.userId, data.browserId); @@ -207,8 +214,6 @@ async function processRunExecution(job: Job) { if (!browser || !currentPage) { logger.log('error', `Browser or page not available for run ${data.runId}`); - await pgBoss.fail(job.id, "Failed to get browser or page for run"); - // Even if this run failed, check for queued runs await checkAndProcessQueuedRun(data.userId, data.browserId); @@ -219,6 +224,11 @@ async function processRunExecution(job: Job) { // Reset the browser state before executing this run await resetBrowserState(browser); + const isRunAborted = async (): Promise => { + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; + }; + // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); const interpretationInfo = await browser.interpreter.InterpretRecording( @@ -228,10 +238,28 @@ async function processRunExecution(job: Job) { plainRun.interpreterSettings ); + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); + + const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); + + if (!queuedRunProcessed) { + await destroyRemoteBrowser(plainRun.browserId, data.userId); + logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + } + + return { success: true }; + } + // Process the results const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); + return { success: true }; + } + // Update the run record with results await run.update({ ...run, @@ -322,11 +350,28 @@ async function processRunExecution(job: Job) { } catch (executionError: any) { logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`); - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: `Failed: ${executionError.message}`, - }); + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${executionError.message}`, + }); + + // Capture failure metrics + capture( + 'maxun-oss-run-created-manual', + { + runId: data.runId, + user_id: data.userId, + created_at: new Date().toISOString(), + status: 'failed', + error_message: executionError.message, + } + ); + } else { + logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); + } // Check for queued runs before destroying the browser const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); @@ -340,18 +385,6 @@ async function processRunExecution(job: Job) { logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`); } } - - // Capture failure metrics - capture( - 'maxun-oss-run-created-manual', - { - runId: data.runId, - user_id: data.userId, - created_at: new Date().toISOString(), - status: 'failed', - error_message: executionError.message, - } - ); return { success: false }; } @@ -377,9 +410,26 @@ async function abortRun(runId: string, userId: string): Promise { return false; } + await run.update({ + status: 'aborting' + }); + const plainRun = run.toJSON(); - const browser = browserPool.getRemoteBrowser(plainRun.browserId); + const recording = await Robot.findOne({ + where: { 'recording_meta.id': plainRun.robotMetaId }, + raw: true + }); + + const robotName = recording?.recording_meta?.name || 'Unknown Robot'; + + let browser; + try { + browser = browserPool.getRemoteBrowser(plainRun.browserId); + } catch (browserError) { + logger.log('warn', `Could not get browser for run ${runId}: ${browserError}`); + browser = null; + } if (!browser) { await run.update({ @@ -388,36 +438,80 @@ async function abortRun(runId: string, userId: string): Promise { log: 'Aborted: Browser not found or already closed' }); + try { + serverIo.of(plainRun.browserId).emit('run-aborted', { + runId, + robotName: robotName, + status: 'aborted', + finishedAt: new Date().toLocaleString() + }); + } catch (socketError) { + logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); + } + logger.log('warn', `Browser not found for run ${runId}`); return true; } - const currentLog = browser.interpreter.debugMessages.join('\n'); - const serializableOutput: Record = {}; - browser.interpreter.serializableData.forEach((item, index) => { - serializableOutput[`item-${index}`] = item; - }); + let currentLog = 'Run aborted by user'; + let serializableOutput: Record = {}; + let binaryOutput: Record = {}; - const binaryOutput: Record = {}; - browser.interpreter.binaryData.forEach((item, index) => { - binaryOutput[`item-${index}`] = item; - }); + try { + if (browser.interpreter) { + if (browser.interpreter.debugMessages) { + currentLog = browser.interpreter.debugMessages.join('\n') || currentLog; + } + + if (browser.interpreter.serializableData) { + browser.interpreter.serializableData.forEach((item, index) => { + serializableOutput[`item-${index}`] = item; + }); + } + + if (browser.interpreter.binaryData) { + browser.interpreter.binaryData.forEach((item, index) => { + binaryOutput[`item-${index}`] = item; + }); + } + } + } catch (interpreterError) { + logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`); + } await run.update({ status: 'aborted', finishedAt: new Date().toLocaleString(), browserId: plainRun.browserId, - log: currentLog || 'Run aborted by user', + log: currentLog, serializableOutput, binaryOutput, }); - const queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + try { + serverIo.of(plainRun.browserId).emit('run-aborted', { + runId, + robotName: robotName, + status: 'aborted', + finishedAt: new Date().toLocaleString() + }); + } catch (socketError) { + logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); + } + + let queuedRunProcessed = false; + try { + queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + } catch (queueError) { + logger.log('warn', `Error checking queued runs: ${queueError}`); + } if (!queuedRunProcessed) { try { + await new Promise(resolve => setTimeout(resolve, 500)); + await destroyRemoteBrowser(plainRun.browserId, userId); - logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`); } catch (cleanupError) { logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); } From d47f696ca5fec36d9f91086d963d0199815045e1 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 24 Apr 2025 18:47:23 +0530 Subject: [PATCH 6/6] feat: notify on socket abort run --- src/pages/MainPage.tsx | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index aa896d50..0587f2a4 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -71,7 +71,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) interpretStoredRecording(runId).then(async (interpretation: boolean) => { if (!aborted) { if (interpretation) { - notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); + // notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); } else { notify('success', t('main_page.notifications.interpretation_failed', { name: runningRecordingName })); // destroy the created browser @@ -112,6 +112,14 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); } }); + + socket.on('run-aborted', (data) => { + setRerenderRuns(true); + + const abortedRobotName = data.robotName; + notify('success', t('main_page.notifications.abort_success', { name: abortedRobotName })); + }); + setContent('runs'); if (browserId) { notify('info', t('main_page.notifications.run_started', { name: runningRecordingName }));