From 5657abb21ea1438ad9c0190cfecdbf45d236c4aa Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 12 Mar 2025 23:08:51 +0530 Subject: [PATCH 1/2] feat: user specific queue for run --- server/src/routes/storage.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index ffdf0149..357c199f 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -658,10 +658,12 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re } try { + const userQueueName = `execute-run-user-${req.user.id}`; + // Queue the execution job - await pgBoss.createQueue('execute-run'); + await pgBoss.createQueue(userQueueName); - const jobId = await pgBoss.send('execute-run', { + const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, runId: req.params.id, browserId: plainRun.browserId From d6b75757d1bf6c96863fa82983d228b7b4e14095 Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 12 Mar 2025 23:09:48 +0530 Subject: [PATCH 2/2] feat: user specific queue run execution --- server/src/pgboss-worker.ts | 43 +++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 3566b4f8..64bfb351 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -117,6 +117,7 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom const queuedRun = await Run.findOne({ where: { browserId: browserId, + runByUserId: userId, status: 'queued' }, order: [['startedAt', 'ASC']] @@ -140,9 +141,12 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom log: 'Run started - using browser from previous run' }); + // Use user-specific queue + const userQueueName = `execute-run-user-${userId}`; + // Schedule the run execution - await pgBoss.createQueue('execute-run'); - const executeJobId = await pgBoss.send('execute-run', { + await pgBoss.createQueue(userQueueName); + const executeJobId = await pgBoss.send(userQueueName, { userId: userId, runId: queuedRun.runId, browserId: browserId @@ -358,7 +362,9 @@ async function processRunExecution(job: Job) { async function registerRunExecutionWorker() { try { - // Worker for executing runs + const registeredUserQueues = new Map(); + + // Worker for executing runs (Legacy) await pgBoss.work('execute-run', async (job: Job | Job[]) => { try { const singleJob = Array.isArray(job) ? job[0] : job; @@ -370,7 +376,36 @@ async function registerRunExecutionWorker() { } }); - // setInterval(checkForStuckQueuedRuns, 30000); + const checkForNewUserQueues = async () => { + try { + const activeQueues = await pgBoss.getQueues(); + + const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-')); + + for (const queue of userQueues) { + if (!registeredUserQueues.has(queue.name)) { + await pgBoss.work(queue.name, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processRunExecution(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`); + throw error; + } + }); + + registeredUserQueues.set(queue.name, true); + logger.log('info', `Registered worker for queue: ${queue.name}`); + } + } + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to check for new user queues: ${errorMessage}`); + } + }; + + await checkForNewUserQueues(); logger.log('info', 'Run execution worker registered successfully'); } catch (error: unknown) {