From ea6730208d76849ca6bd651908aa574aecde1478 Mon Sep 17 00:00:00 2001 From: Rohit Date: Wed, 12 Mar 2025 19:25:18 +0530 Subject: [PATCH] feat: queue robot run --- server/src/routes/storage.ts | 187 ++++++++++++++++------------------- 1 file changed, 87 insertions(+), 100 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index aa3f4e12..ffdf0149 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -1,6 +1,6 @@ import { Router } from 'express'; import logger from "../logger"; -import { createRemoteBrowserForRun, destroyRemoteBrowser } from "../browser-management/controller"; +import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserIdByState } from "../browser-management/controller"; import { chromium } from 'playwright-extra'; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import { browserPool } from "../server"; @@ -22,6 +22,7 @@ import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { Page } from 'playwright'; import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable'; +import { pgBoss } from '../pgboss-worker'; chromium.use(stealthPlugin()); export const router = Router(); @@ -494,6 +495,8 @@ router.delete('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) /** * PUT endpoint for starting a remote browser instance and saving run metadata to the storage. * Making it ready for interpretation and returning a runId. + * + * If the user has reached their browser limit, the run will be queued using PgBoss. */ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { try { @@ -525,35 +528,81 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => }; } - console.log(`Proxy config for run: ${JSON.stringify(proxyOptions)}`) - - const id = createRemoteBrowserForRun(req.user.id); + console.log(`Proxy config for run: ${JSON.stringify(proxyOptions)}`); + // Generate runId first const runId = uuid(); + + // Check if user has reached browser limit + const userBrowserIds = browserPool.getAllBrowserIdsForUser(req.user.id); + const canCreateBrowser = userBrowserIds.length < 2; + + if (canCreateBrowser) { + // User has available browser slots, create it directly + const id = createRemoteBrowserForRun(req.user.id); - const run = await Run.create({ - status: 'running', - name: recording.recording_meta.name, - robotId: recording.id, - robotMetaId: recording.recording_meta.id, - startedAt: new Date().toLocaleString(), - finishedAt: '', - browserId: id, - interpreterSettings: req.body, - log: '', - runId, - runByUserId: req.user.id, - serializableOutput: {}, - binaryOutput: {}, - }); + const run = await Run.create({ + status: 'running', + name: recording.recording_meta.name, + robotId: recording.id, + robotMetaId: recording.recording_meta.id, + startedAt: new Date().toLocaleString(), + finishedAt: '', + browserId: id, + interpreterSettings: req.body, + log: '', + runId, + runByUserId: req.user.id, + serializableOutput: {}, + binaryOutput: {}, + }); - const plainRun = run.toJSON(); + const plainRun = run.toJSON(); - return res.send({ - browserId: id, - runId: plainRun.runId, - robotMetaId: recording.recording_meta.id, - }); + return res.send({ + browserId: id, + runId: plainRun.runId, + robotMetaId: recording.recording_meta.id, + queued: false + }); + } else { + const browserId = getActiveBrowserIdByState(req.user.id, "run") + + if (browserId) { + // User has reached the browser limit, queue the run + try { + // Create the run record with 'queued' status + await Run.create({ + status: 'queued', + name: recording.recording_meta.name, + robotId: recording.id, + robotMetaId: recording.recording_meta.id, + startedAt: new Date().toLocaleString(), + finishedAt: '', + browserId: browserId, // Random will be updated later + interpreterSettings: req.body, + log: 'Run queued - waiting for available browser slot', + runId, + runByUserId: req.user.id, + serializableOutput: {}, + binaryOutput: {}, + }); + + return res.send({ + browserId: browserId, + runId: runId, + robotMetaId: recording.recording_meta.id, + queued: true, + }); + } catch (queueError: any) { + logger.log('error', `Failed to queue run job: ${queueError.message}`); + return res.status(503).send({ error: 'Unable to queue run, please try again later' }); + } + } else { + logger.log('info', "Browser id does not exist"); + return res.send(''); + } + } } catch (e) { const { message } = e as Error; logger.log('info', `Error while creating a run with robot id: ${req.params.id} - ${message}`); @@ -608,82 +657,20 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re return res.status(404).send(false); } - // interpret the run in active browser - const browser = browserPool.getRemoteBrowser(plainRun.browserId); - let currentPage = browser?.getCurrentPage(); - if (browser && currentPage) { - const workflow = AddGeneratedFlags(recording.recording); - const interpretationInfo = await browser.interpreter.InterpretRecording( - workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - await destroyRemoteBrowser(plainRun.browserId, req.user?.id); - await run.update({ - ...run, - status: 'success', - finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: interpretationInfo.log.join('\n'), - serializableOutput: interpretationInfo.serializableOutput, - binaryOutput: uploadedBinaryOutput, + try { + // Queue the execution job + await pgBoss.createQueue('execute-run'); + + const jobId = await pgBoss.send('execute-run', { + userId: req.user.id, + runId: req.params.id, + browserId: plainRun.browserId }); - - let totalRowsExtracted = 0; - let extractedScreenshotsCount = 0; - let extractedItemsCount = 0; - - if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) { - extractedScreenshotsCount = 1; - } - - if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) { - const itemsArray = run.dataValues.serializableOutput["item-0"]; - extractedItemsCount = itemsArray.length; - - totalRowsExtracted = itemsArray.reduce((total, item) => { - return total + Object.keys(item).length; - }, 0); - } - - console.log(`Extracted Items Count: ${extractedItemsCount}`); - console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); - console.log(`Total Rows Extracted: ${totalRowsExtracted}`); - - capture( - 'maxun-oss-run-created-manual', - { - runId: req.params.id, - user_id: req.user?.id, - created_at: new Date().toISOString(), - status: 'success', - totalRowsExtracted, - extractedItemsCount, - extractedScreenshotsCount, - } - ) - try { - googleSheetUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; - - airtableUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; - - processAirtableUpdates(); - processGoogleSheetUpdates(); - } catch (err: any) { - logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); - } - return res.send(true); - } else { - throw new Error('Could not destroy browser'); + + logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${req.params.id}`); + } catch (queueError: any) { + logger.log('error', `Failed to queue run execution`); + } } catch (e) { const { message } = e as Error;