From 0d9092367cfd20950c411c753491128cc83c730d Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 6 Mar 2025 16:37:22 +0530 Subject: [PATCH] feat: add jobs for recording --- server/src/routes/record.ts | 206 ++++++++++++++++++++++++++++++------ 1 file changed, 174 insertions(+), 32 deletions(-) diff --git a/server/src/routes/record.ts b/server/src/routes/record.ts index 51d3ff92..055099ab 100644 --- a/server/src/routes/record.ts +++ b/server/src/routes/record.ts @@ -9,18 +9,19 @@ import { getActiveBrowserId, interpretWholeWorkflow, stopRunningInterpretation, - getRemoteBrowserCurrentUrl, getRemoteBrowserCurrentTabs, -} from '../browser-management/controller' + getRemoteBrowserCurrentUrl, + getRemoteBrowserCurrentTabs, +} from '../browser-management/controller'; import { chromium } from 'playwright-extra'; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import logger from "../logger"; import { getDecryptedProxyConfig } from './proxy'; import { requireSignIn } from '../middlewares/auth'; +import { pgBoss } from '../server'; // Import pgBoss reference export const router = Router(); chromium.use(stealthPlugin()); - export interface AuthenticatedRequest extends Request { user?: any; } @@ -35,41 +36,106 @@ router.all('/', requireSignIn, (req, res, next) => { /** * GET endpoint for starting the remote browser recording session. - * returns session's id + * returns session's id or job id */ router.get('/start', requireSignIn, async (req: AuthenticatedRequest, res: Response) => { if (!req.user) { return res.status(401).send('User not authenticated'); } - const proxyConfig = await getDecryptedProxyConfig(req.user.id); - // Prepare the proxy options dynamically based on the user's proxy configuration - let proxyOptions: any = {}; // Default to no proxy - - if (proxyConfig.proxy_url) { - // Set the server, and if username & password exist, set those as well - proxyOptions = { - server: proxyConfig.proxy_url, - ...(proxyConfig.proxy_username && proxyConfig.proxy_password && { - username: proxyConfig.proxy_username, - password: proxyConfig.proxy_password, - }), - }; + try { + const job = await pgBoss.send('initialize-browser-recording', { + userId: req.user.id + }); + + if (!job) { + logger.log('warn', 'pgBoss.send returned null, falling back to direct initialization'); + const browserId = initializeRemoteBrowserForRecording(req.user.id); + return res.send(browserId); + } + + logger.log('info', `Queued browser initialization job: ${job}`); + return res.send(job); + } catch (error: any) { + logger.log('error', `Failed to queue browser initialization job: ${error.message}`); + + try { + const browserId = initializeRemoteBrowserForRecording(req.user.id); + return res.send(browserId); + } catch (directError: any) { + logger.log('error', `Direct initialization also failed: ${directError.message}`); + return res.status(500).send('Failed to start recording'); + } } - - const id = initializeRemoteBrowserForRecording(req.user.id); - return res.send(id); }); /** * POST endpoint for starting the remote browser recording session accepting browser launch options. - * returns session's id + * returns session's id or job id */ -router.post('/start', requireSignIn, (req: AuthenticatedRequest, res:Response) => { +router.post('/start', requireSignIn, async (req: AuthenticatedRequest, res: Response) => { if (!req.user) { return res.status(401).send('User not authenticated'); } - const id = initializeRemoteBrowserForRecording(req.user.id); - return res.send(id); + try { + const job = await pgBoss.send('initialize-browser-recording', { + userId: req.user.id + }); + + if (!job) { + logger.log('warn', 'pgBoss.send returned null, falling back to direct initialization'); + const browserId = initializeRemoteBrowserForRecording(req.user.id); + return res.send(browserId); + } + + logger.log('info', `Queued browser initialization job: ${job}`); + return res.send(job); + } catch (error: any) { + logger.log('error', `Failed to queue browser initialization job: ${error.message}`); + + try { + const browserId = initializeRemoteBrowserForRecording(req.user.id); + return res.send(browserId); + } catch (directError: any) { + logger.log('error', `Direct initialization also failed: ${directError.message}`); + return res.status(500).send('Failed to start recording'); + } + } +}); + +/** + * GET endpoint for getting the job status of a browser initialization job + */ +router.get('/job-status/:jobId', requireSignIn, async (req, res) => { + try { + logger.log('debug', `Checking status for job ${req.params.jobId}`); + const job = await pgBoss.getJobById("job-status", req.params.jobId); + + if (!job) { + logger.log('warn', `Job ${req.params.jobId} not found`); + return res.status(404).send('Job not found'); + } + + logger.log('debug', `Job state: ${job.state}, hasOutput: ${!!job.output}`); + + if (job.state === 'completed' && job.output) { + const output = job.output as { browserId?: string }; + if (output.browserId) { + logger.log('info', `Job completed with browserId: ${output.browserId}`); + } else { + logger.log('warn', `Job completed but missing browserId in output`); + } + return res.send(output); // Return the browser ID from the completed job + } + + return res.send({ + state: job.state, + createdAt: job.createdOn, + startedAt: job.startedOn || null + }); + } catch (error: any) { + logger.log('error', `Failed to get job status: ${error.message}`); + return res.status(500).send('Failed to get job status'); + } }); /** @@ -77,8 +143,48 @@ router.post('/start', requireSignIn, (req: AuthenticatedRequest, res:Response) = * returns whether the termination was successful */ router.get('/stop/:browserId', requireSignIn, async (req, res) => { - const success = await destroyRemoteBrowser(req.params.browserId); - return res.send(success); + try { + if (req.params.browserId.startsWith('job_')) { + logger.log('debug', `Stopping job ${req.params.browserId}`); + + try { + const job = await pgBoss.getJobById("stop", req.params.browserId); + if (job && job.state === 'completed' && job.output) { + const output = job.output as { browserId?: string }; + if (output.browserId) { + await pgBoss.send('destroy-browser', { + browserId: output.browserId + }); + logger.log('info', `Queued destroy job for browser ${output.browserId}`); + return res.send(true); + } + } else if (job && (job.state === 'created' || job.state === 'active')) { + await pgBoss.cancel("cancel", req.params.browserId); + logger.log('info', `Cancelled job ${req.params.browserId}`); + return res.send(true); + } + } catch (jobError: any) { + logger.log('error', `Error handling job termination: ${jobError.message}`); + } + } + + try { + await pgBoss.send('destroy-browser', { + browserId: req.params.browserId + }); + logger.log('info', `Queued destroy job for browser ${req.params.browserId}`); + return res.send(true); + } catch (queueError: any) { + logger.log('error', `Failed to queue destroy job: ${queueError.message}`); + + const success = await destroyRemoteBrowser(req.params.browserId); + logger.log('info', `Direct browser destruction result: ${success}`); + return res.send(success); + } + } catch (error: any) { + logger.log('error', `Failed to stop browser: ${error.message}`); + return res.status(500).send(false); + } }); /** @@ -118,10 +224,25 @@ router.get('/active/tabs', requireSignIn, (req, res) => { */ router.get('/interpret', requireSignIn, async (req, res) => { try { - await interpretWholeWorkflow(); - return res.send('interpretation done'); - } catch (e) { - return res.send('interpretation failed'); + const job = await pgBoss.send('interpret-workflow', {}); + + if (!job) { + logger.log('warn', 'pgBoss.send returned null for interpret, falling back to direct interpretation'); + await interpretWholeWorkflow(); + return res.send('interpretation complete (direct)'); + } + + logger.log('info', `Queued interpretation job: ${job}`); + return res.send('interpretation queued'); + } catch (error: any) { + logger.log('error', `Failed to queue interpretation job: ${error.message}`); + + try { + await interpretWholeWorkflow(); + return res.send('interpretation complete (fallback)'); + } catch (directError: any) { + return res.status(500).send('interpretation failed'); + } } }); @@ -129,6 +250,27 @@ router.get('/interpret', requireSignIn, async (req, res) => { * GET endpoint for stopping an ongoing interpretation of the currently generated workflow. */ router.get('/interpret/stop', requireSignIn, async (req, res) => { - await stopRunningInterpretation(); - return res.send('interpretation stopped'); + try { + const job = await pgBoss.send('stop-interpretation', {}); + + if (!job) { + logger.log('warn', 'pgBoss.send returned null for stop-interpretation, falling back to direct stop'); + await stopRunningInterpretation(); + return res.send('interpretation stopped (direct)'); + } + + logger.log('info', `Queued stop interpretation job: ${job}`); + return res.send('interpretation stop queued'); + } catch (error: any) { + logger.log('error', `Failed to queue stop interpretation job: ${error.message}`); + + try { + await stopRunningInterpretation(); + return res.send('interpretation stopped (fallback)'); + } catch (directError: any) { + return res.status(500).send('interpretation stop failed'); + } + } }); + +export default router; \ No newline at end of file