diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 0771cc27..f197489d 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -46,6 +46,11 @@ interface ExecuteRunData { browserId: string; } +interface AbortRunData { + userId: string; + runId: string; +} + const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); /** @@ -358,6 +363,78 @@ async function processRunExecution(job: Job) { } } +async function abortRun(runId: string, userId: string): Promise { + try { + const run = await Run.findOne({ + where: { + runId: runId, + runByUserId: userId + } + }); + + if (!run) { + logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); + return false; + } + + const plainRun = run.toJSON(); + + const browser = browserPool.getRemoteBrowser(plainRun.browserId); + + if (!browser) { + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + log: 'Aborted: Browser not found or already closed' + }); + + logger.log('warn', `Browser not found for run ${runId}`); + return true; + } + + const currentLog = browser.interpreter.debugMessages.join('\n'); + const serializableOutput = browser.interpreter.serializableData.reduce((reducedObject, item, index) => { + return { + [`item-${index}`]: item, + ...reducedObject, + } + }, {}); + + const binaryOutput = browser.interpreter.binaryData.reduce((reducedObject, item, index) => { + return { + [`item-${index}`]: item, + ...reducedObject, + } + }, {}); + + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + browserId: plainRun.browserId, + log: currentLog || 'Run aborted by user', + serializableOutput, + binaryOutput, + }); + + const queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); + + if (!queuedRunProcessed) { + try { + await destroyRemoteBrowser(plainRun.browserId, userId); + logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + } catch (cleanupError) { + logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); + } + } + + return true; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to abort run ${runId}: ${errorMessage}`); + return false; + } +} + async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); @@ -412,6 +489,52 @@ async function registerRunExecutionWorker() { } } +async function registerAbortRunWorker() { + try { + const registeredAbortQueues = new Map(); + + const checkForNewAbortQueues = async () => { + try { + const activeQueues = await pgBoss.getQueues(); + + const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-')); + + for (const queue of abortQueues) { + if (!registeredAbortQueues.has(queue.name)) { + await pgBoss.work(queue.name, async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const { userId, runId } = data; + + logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); + const success = await abortRun(runId, userId); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`); + throw error; + } + }); + + registeredAbortQueues.set(queue.name, true); + logger.log('info', `Registered abort worker for queue: ${queue.name}`); + } + } + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to check for new abort queues: ${errorMessage}`); + } + }; + + await checkForNewAbortQueues(); + + logger.log('info', 'Abort run worker registration system initialized'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to initialize abort run worker system: ${errorMessage}`); + } +} + /** * Initialize PgBoss and register all workers @@ -493,6 +616,9 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); + // Register the abort run worker + await registerAbortRunWorker(); + logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error);