diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index ce46e830..7e568712 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -608,9 +608,52 @@ async function abortRun(runId: string, userId: string): Promise { } } +// Track registered queues globally for individual queue registration +const registeredUserQueues = new Map(); +const registeredAbortQueues = new Map(); + +async function registerWorkerForQueue(queueName: string) { + if (!registeredUserQueues.has(queueName)) { + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processRunExecution(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Run execution job failed in ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredUserQueues.set(queueName, true); + logger.log('info', `Registered worker for queue: ${queueName}`); + } +} + +async function registerAbortWorkerForQueue(queueName: string) { + if (!registeredAbortQueues.has(queueName)) { + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const { userId, runId } = data; + + logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); + const success = await abortRun(runId, userId); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Abort run job failed in ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredAbortQueues.set(queueName, true); + logger.log('info', `Registered abort worker for queue: ${queueName}`); + } +} + async function registerRunExecutionWorker() { try { - const registeredUserQueues = new Map(); // Worker for executing runs (Legacy) await pgBoss.work('execute-run', async (job: Job | Job[]) => { @@ -826,4 +869,4 @@ process.on('SIGINT', async () => { }); // For use in other files -export { pgBoss, startWorkers }; +export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers };