From be1b90feefe58173d000e1f776edcd18464dccde Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 10 Mar 2025 16:04:12 +0530 Subject: [PATCH] feat: send jobs and wait for completion --- server/src/routes/record.ts | 256 ++++++++++++++++-------------------- 1 file changed, 110 insertions(+), 146 deletions(-) diff --git a/server/src/routes/record.ts b/server/src/routes/record.ts index 055099ab..676254be 100644 --- a/server/src/routes/record.ts +++ b/server/src/routes/record.ts @@ -17,7 +17,7 @@ import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import logger from "../logger"; import { getDecryptedProxyConfig } from './proxy'; import { requireSignIn } from '../middlewares/auth'; -import { pgBoss } from '../server'; // Import pgBoss reference +import { pgBoss } from '../pgboss-worker'; export const router = Router(); chromium.use(stealthPlugin()); @@ -26,6 +26,41 @@ export interface AuthenticatedRequest extends Request { user?: any; } +async function waitForJobCompletion(jobId: string, queueName: string, timeout = 15000): Promise { + return new Promise((resolve, reject) => { + const startTime = Date.now(); + + const checkJobStatus = async () => { + if (Date.now() - startTime > timeout) { + return reject(new Error(`Timeout waiting for job ${jobId} to complete`)); + } + + try { + const job = await pgBoss.getJobById(queueName, jobId); + + if (!job) { + return reject(new Error(`Job ${jobId} not found`)); + } + + if (job.state === 'completed') { + return resolve(job.output); + } + + if (job.state === 'failed') { + return reject(new Error(`Job ${jobId} failed.`)); + } + + setTimeout(checkJobStatus, 200); + } catch (error) { + reject(error); + } + }; + + // Start checking + checkJobStatus(); + }); +} + /** * Logs information about remote browser recording session. */ @@ -34,33 +69,52 @@ router.all('/', requireSignIn, (req, res, next) => { next() // pass control to the next handler }) + /** - * GET endpoint for starting the remote browser recording session. - * returns session's id or job id + * GET endpoint for starting the remote browser recording session + * Waits for job completion */ router.get('/start', requireSignIn, async (req: AuthenticatedRequest, res: Response) => { if (!req.user) { return res.status(401).send('User not authenticated'); } + try { - const job = await pgBoss.send('initialize-browser-recording', { - userId: req.user.id + await pgBoss.createQueue('initialize-browser-recording'); + + const jobId = await pgBoss.send('initialize-browser-recording', { + userId: req.user.id, + timestamp: new Date().toISOString() }); - if (!job) { + if (!jobId) { logger.log('warn', 'pgBoss.send returned null, falling back to direct initialization'); const browserId = initializeRemoteBrowserForRecording(req.user.id); - return res.send(browserId); + return res.send( browserId ); } - logger.log('info', `Queued browser initialization job: ${job}`); - return res.send(job); + logger.log('info', `Queued browser initialization job: ${jobId}, waiting for completion...`); + + try { + const result = await waitForJobCompletion(jobId, 'initialize-browser-recording', 15000); + + if (result && result.browserId) { + logger.log('info', `Job completed with browserId: ${result.browserId}`); + return res.send(result.browserId); + } else { + logger.log('warn', 'Job completed but returned unexpected result'); + return res.send(jobId); + } + } catch (waitError: any) { + logger.log('warn', `Error waiting for job completion: ${waitError.message}`); + return res.send(jobId); + } } catch (error: any) { logger.log('error', `Failed to queue browser initialization job: ${error.message}`); try { const browserId = initializeRemoteBrowserForRecording(req.user.id); - return res.send(browserId); + return res.send( browserId ); } catch (directError: any) { logger.log('error', `Direct initialization also failed: ${directError.message}`); return res.status(500).send('Failed to start recording'); @@ -70,116 +124,54 @@ router.get('/start', requireSignIn, async (req: AuthenticatedRequest, res: Respo /** * POST endpoint for starting the remote browser recording session accepting browser launch options. - * returns session's id or job id + * returns session's id */ -router.post('/start', requireSignIn, async (req: AuthenticatedRequest, res: Response) => { +router.post('/start', requireSignIn, (req: AuthenticatedRequest, res:Response) => { if (!req.user) { return res.status(401).send('User not authenticated'); } - try { - const job = await pgBoss.send('initialize-browser-recording', { - userId: req.user.id - }); - - if (!job) { - logger.log('warn', 'pgBoss.send returned null, falling back to direct initialization'); - const browserId = initializeRemoteBrowserForRecording(req.user.id); - return res.send(browserId); - } - - logger.log('info', `Queued browser initialization job: ${job}`); - return res.send(job); - } catch (error: any) { - logger.log('error', `Failed to queue browser initialization job: ${error.message}`); - - try { - const browserId = initializeRemoteBrowserForRecording(req.user.id); - return res.send(browserId); - } catch (directError: any) { - logger.log('error', `Direct initialization also failed: ${directError.message}`); - return res.status(500).send('Failed to start recording'); - } - } -}); - -/** - * GET endpoint for getting the job status of a browser initialization job - */ -router.get('/job-status/:jobId', requireSignIn, async (req, res) => { - try { - logger.log('debug', `Checking status for job ${req.params.jobId}`); - const job = await pgBoss.getJobById("job-status", req.params.jobId); - - if (!job) { - logger.log('warn', `Job ${req.params.jobId} not found`); - return res.status(404).send('Job not found'); - } - - logger.log('debug', `Job state: ${job.state}, hasOutput: ${!!job.output}`); - - if (job.state === 'completed' && job.output) { - const output = job.output as { browserId?: string }; - if (output.browserId) { - logger.log('info', `Job completed with browserId: ${output.browserId}`); - } else { - logger.log('warn', `Job completed but missing browserId in output`); - } - return res.send(output); // Return the browser ID from the completed job - } - - return res.send({ - state: job.state, - createdAt: job.createdOn, - startedAt: job.startedOn || null - }); - } catch (error: any) { - logger.log('error', `Failed to get job status: ${error.message}`); - return res.status(500).send('Failed to get job status'); - } + const id = initializeRemoteBrowserForRecording(req.user.id); + return res.send(id); }); /** * GET endpoint for terminating the remote browser recording session. * returns whether the termination was successful */ -router.get('/stop/:browserId', requireSignIn, async (req, res) => { +router.get('/stop/:browserId', requireSignIn, async (req: AuthenticatedRequest, res) => { + if (!req.user) { + return res.status(401).send('User not authenticated'); + } + try { - if (req.params.browserId.startsWith('job_')) { - logger.log('debug', `Stopping job ${req.params.browserId}`); - - try { - const job = await pgBoss.getJobById("stop", req.params.browserId); - if (job && job.state === 'completed' && job.output) { - const output = job.output as { browserId?: string }; - if (output.browserId) { - await pgBoss.send('destroy-browser', { - browserId: output.browserId - }); - logger.log('info', `Queued destroy job for browser ${output.browserId}`); - return res.send(true); - } - } else if (job && (job.state === 'created' || job.state === 'active')) { - await pgBoss.cancel("cancel", req.params.browserId); - logger.log('info', `Cancelled job ${req.params.browserId}`); - return res.send(true); - } - } catch (jobError: any) { - logger.log('error', `Error handling job termination: ${jobError.message}`); - } - } + await pgBoss.createQueue('destroy-browser'); + const jobId = await pgBoss.send('destroy-browser', { + browserId: req.params.browserId, + timestamp: new Date().toISOString() + }); + + if (!jobId) { + logger.log('warn', 'pgBoss.send returned null, falling back to direct destruction'); + const browserId = initializeRemoteBrowserForRecording(req.user.id); + return res.send( browserId ); + } + + logger.log('info', `Queued browser destruction job: ${jobId}, waiting for completion...`); + try { - await pgBoss.send('destroy-browser', { - browserId: req.params.browserId - }); - logger.log('info', `Queued destroy job for browser ${req.params.browserId}`); - return res.send(true); - } catch (queueError: any) { - logger.log('error', `Failed to queue destroy job: ${queueError.message}`); + const result = await waitForJobCompletion(jobId, 'destroy-browser', 15000); - const success = await destroyRemoteBrowser(req.params.browserId); - logger.log('info', `Direct browser destruction result: ${success}`); - return res.send(success); + if (result) { + logger.log('info', `Browser destruction job completed with result: ${result.success}`); + return res.send(result.success); + } else { + logger.log('warn', 'Job completed but returned unexpected result'); + return res.send(false); + } + } catch (waitError: any) { + logger.log('warn', `Error waiting for job completion: ${waitError.message}`); + return res.send(false); } } catch (error: any) { logger.log('error', `Failed to stop browser: ${error.message}`); @@ -222,55 +214,27 @@ router.get('/active/tabs', requireSignIn, (req, res) => { /** * GET endpoint for starting an interpretation of the currently generated workflow. */ -router.get('/interpret', requireSignIn, async (req, res) => { +router.get('/interpret', requireSignIn, async (req: AuthenticatedRequest, res) => { try { - const job = await pgBoss.send('interpret-workflow', {}); - - if (!job) { - logger.log('warn', 'pgBoss.send returned null for interpret, falling back to direct interpretation'); - await interpretWholeWorkflow(); - return res.send('interpretation complete (direct)'); - } - - logger.log('info', `Queued interpretation job: ${job}`); - return res.send('interpretation queued'); - } catch (error: any) { - logger.log('error', `Failed to queue interpretation job: ${error.message}`); - - try { - await interpretWholeWorkflow(); - return res.send('interpretation complete (fallback)'); - } catch (directError: any) { - return res.status(500).send('interpretation failed'); + if (!req.user) { + return res.status(401).send('User not authenticated'); } + await interpretWholeWorkflow(); + return res.send('interpretation done'); + } catch (e) { + return res.send('interpretation failed'); } }); /** * GET endpoint for stopping an ongoing interpretation of the currently generated workflow. */ -router.get('/interpret/stop', requireSignIn, async (req, res) => { - try { - const job = await pgBoss.send('stop-interpretation', {}); - - if (!job) { - logger.log('warn', 'pgBoss.send returned null for stop-interpretation, falling back to direct stop'); - await stopRunningInterpretation(); - return res.send('interpretation stopped (direct)'); - } - - logger.log('info', `Queued stop interpretation job: ${job}`); - return res.send('interpretation stop queued'); - } catch (error: any) { - logger.log('error', `Failed to queue stop interpretation job: ${error.message}`); - - try { - await stopRunningInterpretation(); - return res.send('interpretation stopped (fallback)'); - } catch (directError: any) { - return res.status(500).send('interpretation stop failed'); - } +router.get('/interpret/stop', requireSignIn, async (req: AuthenticatedRequest, res) => { + if (!req.user) { + return res.status(401).send('User not authenticated'); } + await stopRunningInterpretation(); + return res.send('interpretation stopped'); }); export default router; \ No newline at end of file