feat: add orphaned run recovery

This commit is contained in:
Rohit Rajan
2025-09-10 00:22:09 +05:30
parent d619097600
commit 0b556714f1

View File

@@ -949,6 +949,17 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
});
}
// Immediately stop interpreter like cloud version
try {
const browser = browserPool.getRemoteBrowser(run.browserId);
if (browser && browser.interpreter) {
logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`);
await browser.interpreter.stopInterpretation();
}
} catch (immediateStopError: any) {
logger.log('warn', `Failed to immediately stop interpreter: ${immediateStopError.message}`);
}
const userQueueName = `abort-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName);
@@ -961,7 +972,7 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
return res.send({
success: true,
message: 'Abort signal sent',
message: 'Run stopped immediately, cleanup queued',
jobId,
isQueued: false
});
@@ -1041,4 +1052,81 @@ async function processQueuedRuns() {
}
}
/**
* Recovers orphaned runs that were left in "running" status due to instance crashes
* This function runs on server startup to ensure data reliability
*/
export async function recoverOrphanedRuns() {
try {
logger.log('info', 'Starting recovery of orphaned runs...');
const orphanedRuns = await Run.findAll({
where: {
status: ['running', 'scheduled']
},
order: [['startedAt', 'ASC']]
});
if (orphanedRuns.length === 0) {
logger.log('info', 'No orphaned runs found');
return;
}
logger.log('info', `Found ${orphanedRuns.length} orphaned runs to recover (including scheduled runs)`);
for (const run of orphanedRuns) {
try {
const runData = run.toJSON();
logger.log('info', `Recovering orphaned run: ${runData.runId}`);
const browser = browserPool.getRemoteBrowser(runData.browserId);
if (!browser) {
const retryCount = runData.retryCount || 0;
if (retryCount < 3) {
await run.update({
status: 'queued',
retryCount: retryCount + 1,
serializableOutput: {},
binaryOutput: {},
browserId: undefined,
log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash`
});
logger.log('info', `Re-queued crashed run ${runData.runId} (retry ${retryCount + 1}/3)`);
} else {
const crashRecoveryMessage = `Max retries exceeded (3/3) - Run failed after multiple server crashes.`;
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: runData.log ? `${runData.log}\n${crashRecoveryMessage}` : crashRecoveryMessage
});
logger.log('warn', `Max retries reached for run ${runData.runId}, marked as permanently failed`);
}
if (runData.browserId) {
try {
browserPool.deleteRemoteBrowser(runData.browserId);
logger.log('info', `Cleaned up stale browser reference: ${runData.browserId}`);
} catch (cleanupError: any) {
logger.log('warn', `Failed to cleanup browser reference ${runData.browserId}: ${cleanupError.message}`);
}
}
} else {
logger.log('info', `Run ${runData.runId} browser still active, not orphaned`);
}
} catch (runError: any) {
logger.log('error', `Failed to recover run ${run.runId}: ${runError.message}`);
}
}
logger.log('info', `Orphaned run recovery completed. Processed ${orphanedRuns.length} runs.`);
} catch (error: any) {
logger.log('error', `Failed to recover orphaned runs: ${error.message}`);
}
}
export { processQueuedRuns };