From d6b75757d1bf6c96863fa82983d228b7b4e14095 Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 12 Mar 2025 23:09:48 +0530 Subject: [PATCH] feat: user specific queue run execution --- server/src/pgboss-worker.ts | 43 +++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 3566b4f8..64bfb351 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -117,6 +117,7 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom const queuedRun = await Run.findOne({ where: { browserId: browserId, + runByUserId: userId, status: 'queued' }, order: [['startedAt', 'ASC']] @@ -140,9 +141,12 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom log: 'Run started - using browser from previous run' }); + // Use user-specific queue + const userQueueName = `execute-run-user-${userId}`; + // Schedule the run execution - await pgBoss.createQueue('execute-run'); - const executeJobId = await pgBoss.send('execute-run', { + await pgBoss.createQueue(userQueueName); + const executeJobId = await pgBoss.send(userQueueName, { userId: userId, runId: queuedRun.runId, browserId: browserId @@ -358,7 +362,9 @@ async function processRunExecution(job: Job) { async function registerRunExecutionWorker() { try { - // Worker for executing runs + const registeredUserQueues = new Map(); + + // Worker for executing runs (Legacy) await pgBoss.work('execute-run', async (job: Job | Job[]) => { try { const singleJob = Array.isArray(job) ? job[0] : job; @@ -370,7 +376,36 @@ async function registerRunExecutionWorker() { } }); - // setInterval(checkForStuckQueuedRuns, 30000); + const checkForNewUserQueues = async () => { + try { + const activeQueues = await pgBoss.getQueues(); + + const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-')); + + for (const queue of userQueues) { + if (!registeredUserQueues.has(queue.name)) { + await pgBoss.work(queue.name, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processRunExecution(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`); + throw error; + } + }); + + registeredUserQueues.set(queue.name, true); + logger.log('info', `Registered worker for queue: ${queue.name}`); + } + } + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to check for new user queues: ${errorMessage}`); + } + }; + + await checkForNewUserQueues(); logger.log('info', 'Run execution worker registered successfully'); } catch (error: unknown) {