diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index d18c4534..ff5b5734 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -81,13 +81,39 @@ export const createRemoteBrowserForRun = (userId: string): string => { * @category BrowserManagement-Controller */ export const destroyRemoteBrowser = async (id: string, userId: string): Promise => { - const browserSession = browserPool.getRemoteBrowser(id); - if (browserSession) { + try { + const browserSession = browserPool.getRemoteBrowser(id); + if (!browserSession) { + logger.log('info', `Browser with id: ${id} not found, may have already been destroyed`); + return true; + } + logger.log('debug', `Switching off the browser with id: ${id}`); - await browserSession.stopCurrentInterpretation(); - await browserSession.switchOff(); + + try { + await browserSession.stopCurrentInterpretation(); + } catch (stopError) { + logger.log('warn', `Error stopping interpretation for browser ${id}: ${stopError}`); + } + + try { + await browserSession.switchOff(); + } catch (switchOffError) { + logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`); + } + + return browserPool.deleteRemoteBrowser(id); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to destroy browser ${id}: ${errorMessage}`); + + try { + return browserPool.deleteRemoteBrowser(id); + } catch (deleteError) { + logger.log('error', `Failed to delete browser ${id} from pool: ${deleteError}`); + return false; + } } - return browserPool.deleteRemoteBrowser(id); }; /** diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 43dfbe73..3ac993a7 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -50,6 +50,11 @@ interface ExecuteRunData { browserId: string; } +interface AbortRunData { + userId: string; + runId: string; +} + const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); /** @@ -179,6 +184,11 @@ async function processRunExecution(job: Job) { return { success: false }; } + if (run.status === 'aborted' || run.status === 'aborting') { + logger.log('info', `Run ${data.runId} has status ${run.status}, skipping execution`); + return { success: true }; + } + const plainRun = run.toJSON(); // Find the recording @@ -186,12 +196,14 @@ async function processRunExecution(job: Job) { if (!recording) { logger.log('error', `Recording for run ${data.runId} not found`); - // Update run status to failed - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Failed: Recording not found', - }); + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Failed: Recording not found', + }); + } // Check for queued runs even if this one failed await checkAndProcessQueuedRun(data.userId, data.browserId); @@ -206,8 +218,6 @@ async function processRunExecution(job: Job) { if (!browser || !currentPage) { logger.log('error', `Browser or page not available for run ${data.runId}`); - await pgBoss.fail(job.id, "Failed to get browser or page for run"); - // Even if this run failed, check for queued runs await checkAndProcessQueuedRun(data.userId, data.browserId); @@ -218,6 +228,11 @@ async function processRunExecution(job: Job) { // Reset the browser state before executing this run await resetBrowserState(browser); + const isRunAborted = async (): Promise => { + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; + }; + // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); const interpretationInfo = await browser.interpreter.InterpretRecording( @@ -227,10 +242,28 @@ async function processRunExecution(job: Job) { plainRun.interpreterSettings ); + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); + + const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); + + if (!queuedRunProcessed) { + await destroyRemoteBrowser(plainRun.browserId, data.userId); + logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + } + + return { success: true }; + } + // Process the results const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); + return { success: true }; + } + // Update the run record with results await run.update({ ...run, @@ -321,11 +354,28 @@ async function processRunExecution(job: Job) { } catch (executionError: any) { logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`); - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: `Failed: ${executionError.message}`, - }); + const currentRun = await Run.findOne({ where: { runId: data.runId } }); + if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${executionError.message}`, + }); + + // Capture failure metrics + capture( + 'maxun-oss-run-created-manual', + { + runId: data.runId, + user_id: data.userId, + created_at: new Date().toISOString(), + status: 'failed', + error_message: executionError.message, + } + ); + } else { + logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); + } // Check for queued runs before destroying the browser const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); @@ -339,18 +389,6 @@ async function processRunExecution(job: Job) { logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`); } } - - // Capture failure metrics - capture( - 'maxun-oss-run-created-manual', - { - runId: data.runId, - user_id: data.userId, - created_at: new Date().toISOString(), - status: 'failed', - error_message: executionError.message, - } - ); return { success: false }; } @@ -362,6 +400,135 @@ async function processRunExecution(job: Job) { } } +async function abortRun(runId: string, userId: string): Promise { + try { + const run = await Run.findOne({ + where: { + runId: runId, + runByUserId: userId + } + }); + + if (!run) { + logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); + return false; + } + + await run.update({ + status: 'aborting' + }); + + const plainRun = run.toJSON(); + + const recording = await Robot.findOne({ + where: { 'recording_meta.id': plainRun.robotMetaId }, + raw: true + }); + + const robotName = recording?.recording_meta?.name || 'Unknown Robot'; + + let browser; + try { + browser = browserPool.getRemoteBrowser(plainRun.browserId); + } catch (browserError) { + logger.log('warn', `Could not get browser for run ${runId}: ${browserError}`); + browser = null; + } + + if (!browser) { + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + log: 'Aborted: Browser not found or already closed' + }); + + try { + serverIo.of(plainRun.browserId).emit('run-aborted', { + runId, + robotName: robotName, + status: 'aborted', + finishedAt: new Date().toLocaleString() + }); + } catch (socketError) { + logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); + } + + logger.log('warn', `Browser not found for run ${runId}`); + return true; + } + + let currentLog = 'Run aborted by user'; + let serializableOutput: Record = {}; + let binaryOutput: Record = {}; + + try { + if (browser.interpreter) { + if (browser.interpreter.debugMessages) { + currentLog = browser.interpreter.debugMessages.join('\n') || currentLog; + } + + if (browser.interpreter.serializableData) { + browser.interpreter.serializableData.forEach((item, index) => { + serializableOutput[`item-${index}`] = item; + }); + } + + if (browser.interpreter.binaryData) { + browser.interpreter.binaryData.forEach((item, index) => { + binaryOutput[`item-${index}`] = item; + }); + } + } + } catch (interpreterError) { + logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`); + } + + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + browserId: plainRun.browserId, + log: currentLog, + serializableOutput, + binaryOutput, + }); + + try { + serverIo.of(plainRun.browserId).emit('run-aborted', { + runId, + robotName: robotName, + status: 'aborted', + finishedAt: new Date().toLocaleString() + }); + } catch (socketError) { + logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); + } + + let queuedRunProcessed = false; + try { + queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + } catch (queueError) { + logger.log('warn', `Error checking queued runs: ${queueError}`); + } + + if (!queuedRunProcessed) { + try { + await new Promise(resolve => setTimeout(resolve, 500)); + + await destroyRemoteBrowser(plainRun.browserId, userId); + logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`); + } catch (cleanupError) { + logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); + } + } + + return true; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to abort run ${runId}: ${errorMessage}`); + return false; + } +} + async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); @@ -416,6 +583,52 @@ async function registerRunExecutionWorker() { } } +async function registerAbortRunWorker() { + try { + const registeredAbortQueues = new Map(); + + const checkForNewAbortQueues = async () => { + try { + const activeQueues = await pgBoss.getQueues(); + + const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-')); + + for (const queue of abortQueues) { + if (!registeredAbortQueues.has(queue.name)) { + await pgBoss.work(queue.name, async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const { userId, runId } = data; + + logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); + const success = await abortRun(runId, userId); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`); + throw error; + } + }); + + registeredAbortQueues.set(queue.name, true); + logger.log('info', `Registered abort worker for queue: ${queue.name}`); + } + } + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to check for new abort queues: ${errorMessage}`); + } + }; + + await checkForNewAbortQueues(); + + logger.log('info', 'Abort run worker registration system initialized'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to initialize abort run worker system: ${errorMessage}`); + } +} + /** * Initialize PgBoss and register all workers @@ -497,6 +710,9 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); + // Register the abort run worker + await registerAbortRunWorker(); + logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index b8c9a1ab..e7e3939c 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -904,42 +904,32 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { try { if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); } - const run = await Run.findOne({ where: { + + const run = await Run.findOne({ where: { runId: req.params.id, runByUserId: req.user.id, } }); + if (!run) { return res.status(404).send(false); } - const plainRun = run.toJSON(); - - const browser = browserPool.getRemoteBrowser(plainRun.browserId); - const currentLog = browser?.interpreter.debugMessages.join('/n'); - const serializableOutput = browser?.interpreter.serializableData.reduce((reducedObject, item, index) => { - return { - [`item-${index}`]: item, - ...reducedObject, - } - }, {}); - const binaryOutput = browser?.interpreter.binaryData.reduce((reducedObject, item, index) => { - return { - [`item-${index}`]: item, - ...reducedObject, - } - }, {}); - await run.update({ - ...run, - status: 'aborted', - finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: currentLog, - serializableOutput, - binaryOutput, + + const userQueueName = `abort-run-user-${req.user.id}`; + await pgBoss.createQueue(userQueueName); + + await pgBoss.send(userQueueName, { + userId: req.user.id, + runId: req.params.id }); + + await run.update({ + status: 'aborting' + }); + return res.send(true); } catch (e) { const { message } = e as Error; - logger.log('info', `Error while running a robot with name: ${req.params.fileName}_${req.params.runId}.json`); + logger.log('info', `Error while aborting run with id: ${req.params.id} - ${message}`); return res.send(false); } }); diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index aa896d50..0587f2a4 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -71,7 +71,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) interpretStoredRecording(runId).then(async (interpretation: boolean) => { if (!aborted) { if (interpretation) { - notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); + // notify('success', t('main_page.notifications.interpretation_success', { name: runningRecordingName })); } else { notify('success', t('main_page.notifications.interpretation_failed', { name: runningRecordingName })); // destroy the created browser @@ -112,6 +112,14 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); } }); + + socket.on('run-aborted', (data) => { + setRerenderRuns(true); + + const abortedRobotName = data.robotName; + notify('success', t('main_page.notifications.abort_success', { name: abortedRobotName })); + }); + setContent('runs'); if (browserId) { notify('info', t('main_page.notifications.run_started', { name: runningRecordingName }));