diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 35491f8c..3b68ec22 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -949,6 +949,17 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, }); } + // Immediately stop interpreter like cloud version + try { + const browser = browserPool.getRemoteBrowser(run.browserId); + if (browser && browser.interpreter) { + logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`); + await browser.interpreter.stopInterpretation(); + } + } catch (immediateStopError: any) { + logger.log('warn', `Failed to immediately stop interpreter: ${immediateStopError.message}`); + } + const userQueueName = `abort-run-user-${req.user.id}`; await pgBoss.createQueue(userQueueName); @@ -961,7 +972,7 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, return res.send({ success: true, - message: 'Abort signal sent', + message: 'Run stopped immediately, cleanup queued', jobId, isQueued: false }); @@ -1041,4 +1052,81 @@ async function processQueuedRuns() { } } +/** + * Recovers orphaned runs that were left in "running" status due to instance crashes + * This function runs on server startup to ensure data reliability + */ +export async function recoverOrphanedRuns() { + try { + logger.log('info', 'Starting recovery of orphaned runs...'); + + const orphanedRuns = await Run.findAll({ + where: { + status: ['running', 'scheduled'] + }, + order: [['startedAt', 'ASC']] + }); + + if (orphanedRuns.length === 0) { + logger.log('info', 'No orphaned runs found'); + return; + } + + logger.log('info', `Found ${orphanedRuns.length} orphaned runs to recover (including scheduled runs)`); + + for (const run of orphanedRuns) { + try { + const runData = run.toJSON(); + logger.log('info', `Recovering orphaned run: ${runData.runId}`); + + const browser = browserPool.getRemoteBrowser(runData.browserId); + + if (!browser) { + const retryCount = runData.retryCount || 0; + + if (retryCount < 3) { + await run.update({ + status: 'queued', + retryCount: retryCount + 1, + serializableOutput: {}, + binaryOutput: {}, + browserId: undefined, + log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` + }); + + logger.log('info', `Re-queued crashed run ${runData.runId} (retry ${retryCount + 1}/3)`); + } else { + const crashRecoveryMessage = `Max retries exceeded (3/3) - Run failed after multiple server crashes.`; + + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: runData.log ? `${runData.log}\n${crashRecoveryMessage}` : crashRecoveryMessage + }); + + logger.log('warn', `Max retries reached for run ${runData.runId}, marked as permanently failed`); + } + + if (runData.browserId) { + try { + browserPool.deleteRemoteBrowser(runData.browserId); + logger.log('info', `Cleaned up stale browser reference: ${runData.browserId}`); + } catch (cleanupError: any) { + logger.log('warn', `Failed to cleanup browser reference ${runData.browserId}: ${cleanupError.message}`); + } + } + } else { + logger.log('info', `Run ${runData.runId} browser still active, not orphaned`); + } + } catch (runError: any) { + logger.log('error', `Failed to recover run ${run.runId}: ${runError.message}`); + } + } + + logger.log('info', `Orphaned run recovery completed. Processed ${orphanedRuns.length} runs.`); + } catch (error: any) { + logger.log('error', `Failed to recover orphaned runs: ${error.message}`); + } +} + export { processQueuedRuns };