From 612fee7196a7c4f89bf1231d18749b41fcdf67bf Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 3 Jun 2025 19:20:13 +0530 Subject: [PATCH] feat: add queued runs execution logic --- server/src/routes/storage.ts | 79 ++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 92807d31..0ebe8492 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -575,11 +575,9 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => await pgBoss.createQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { - emailId: req.user.email, userId: req.user.id, runId: runId, browserId: browserId, - interpreterSettings: req.body }); logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`); @@ -635,12 +633,7 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => } } catch (e) { const { message } = e as Error; - logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`); - - if (message.includes('invalid input syntax for type uuid')) { - return res.status(400).send({ error: 'Invalid UUID format detected' }); - } - + logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`); return res.status(500).send({ error: 'Internal server error' }); } }); @@ -952,3 +945,73 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, return res.send(false); } }); + +async function processQueuedRuns() { + try { + const queuedRun = await Run.findOne({ + where: { status: 'queued' }, + order: [['startedAt', 'ASC']] + }); + + if (!queuedRun) return; + + const userId = queuedRun.runByUserId; + + const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run"); + + if (canCreateBrowser) { + logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`); + + const recording = await Robot.findOne({ + where: { + 'recording_meta.id': queuedRun.robotMetaId + }, + raw: true + }); + + if (!recording) { + await queuedRun.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Recording not found' + }); + return; + } + + try { + const newBrowserId = await createRemoteBrowserForRun(userId); + + logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`); + + await queuedRun.update({ + status: 'running', + browserId: newBrowserId, + log: 'Browser created and ready for execution' + }); + + const userQueueName = `execute-run-user-${userId}`; + await pgBoss.createQueue(userQueueName); + + const jobId = await pgBoss.send(userQueueName, { + userId: userId, + runId: queuedRun.runId, + browserId: newBrowserId, + }); + + logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`); + + } catch (browserError: any) { + logger.log('error', `Failed to create browser for queued run: ${browserError.message}`); + await queuedRun.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed to create browser: ${browserError.message}` + }); + } + } + } catch (error: any) { + logger.log('error', `Error processing queued runs: ${error.message}`); + } +} + +export { processQueuedRuns };