From 10e1e8a87c096afe3caf4b57962646b99760bb22 Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 11 Mar 2025 21:56:12 +0530 Subject: [PATCH] feat: queue stop and interpret workflow jobs --- server/src/routes/record.ts | 83 ++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 6 deletions(-) diff --git a/server/src/routes/record.ts b/server/src/routes/record.ts index de468ef7..17fd56d2 100644 --- a/server/src/routes/record.ts +++ b/server/src/routes/record.ts @@ -148,6 +148,7 @@ router.get('/stop/:browserId', requireSignIn, async (req: AuthenticatedRequest, const jobId = await pgBoss.send('destroy-browser', { browserId: req.params.browserId, + userId: req.user.id, timestamp: new Date().toISOString() }); @@ -224,14 +225,84 @@ router.get('/active/tabs', requireSignIn, (req: AuthenticatedRequest, res) => { * GET endpoint for starting an interpretation of the currently generated workflow. */ router.get('/interpret', requireSignIn, async (req: AuthenticatedRequest, res) => { + if (!req.user) { + return res.status(401).send('User not authenticated'); + } + try { - if (!req.user) { - return res.status(401).send('User not authenticated'); + await pgBoss.createQueue('interpret-workflow'); + + const jobId = await pgBoss.send('interpret-workflow', { + userId: req.user.id, + timestamp: new Date().toISOString() + }); + + if (!jobId) { + logger.log('warn', 'pgBoss.send returned null, falling back to direct destruction'); + await interpretWholeWorkflow(req.user?.id); + return res.send('interpretation done'); } - await interpretWholeWorkflow(req.user?.id); - return res.send('interpretation done'); - } catch (e) { - return res.send('interpretation failed'); + + logger.log('info', `Queued interpret workflow job: ${jobId}, waiting for completion...`); + + try { + const result = await waitForJobCompletion(jobId, 'interpret-workflow', 15000); + + if (result) { + logger.log('info', `Browser destruction job completed with result: ${result.success}`); + return res.send('interpretation done'); + } else { + logger.log('warn', 'Job completed but returned unexpected result'); + return res.send('interpretation failed'); + } + } catch (waitError: any) { + logger.log('warn', `Error waiting for job completion: ${waitError.message}`); + return res.send('interpretation failed'); + } + } catch (error: any) { + logger.log('error', `Failed to stop browser: ${error.message}`); + return res.status(500).send('interpretation failed'); + } +}); + +router.get('/interpret/stop', requireSignIn, async (req: AuthenticatedRequest, res) => { + if (!req.user) { + return res.status(401).send('User not authenticated'); + } + + try { + await pgBoss.createQueue('stop-interpretation'); + + const jobId = await pgBoss.send('stop-interpretation', { + userId: req.user.id, + timestamp: new Date().toISOString() + }); + + if (!jobId) { + logger.log('warn', 'pgBoss.send returned null, falling back to direct destruction'); + await interpretWholeWorkflow(req.user?.id); + return res.send('interpretation done'); + } + + logger.log('info', `Queued stop interpret workflow job: ${jobId}, waiting for completion...`); + + try { + const result = await waitForJobCompletion(jobId, 'stop-interpretation', 15000); + + if (result) { + logger.log('info', `Browser destruction job completed with result: ${result.success}`); + return res.send('interpretation stopped'); + } else { + logger.log('warn', 'Job completed but returned unexpected result'); + return res.send('interpretation failed to stop'); + } + } catch (waitError: any) { + logger.log('warn', `Error waiting for job completion: ${waitError.message}`); + return res.send('interpretation failed to stop'); + } + } catch (error: any) { + logger.log('error', `Failed to stop browser: ${error.message}`); + return res.status(500).send('interpretation failed to stop'); } });