diff --git a/public/locales/de.json b/public/locales/de.json index 4e26dc35..8141d873 100644 --- a/public/locales/de.json +++ b/public/locales/de.json @@ -61,7 +61,8 @@ "notifications": { "delete_warning": "Der Roboter hat zugehörige Ausführungen. Löschen Sie zuerst die Ausführungen, um den Roboter zu löschen", "delete_success": "Roboter erfolgreich gelöscht", - "auth_success": "Roboter erfolgreich authentifiziert" + "auth_success": "Roboter erfolgreich authentifiziert", + "browser_limit_warning": "Remote-Browser sind derzeit ausgelastet. Bitte warten Sie einige Minuten und versuchen Sie es erneut" } }, "mainmenu": { diff --git a/public/locales/en.json b/public/locales/en.json index e4f15c52..5e0925a1 100644 --- a/public/locales/en.json +++ b/public/locales/en.json @@ -68,7 +68,8 @@ "notifications": { "delete_warning": "The robot has associated runs. First delete runs to delete the robot", "delete_success": "Robot deleted successfully", - "auth_success": "Robot successfully authenticated" + "auth_success": "Robot successfully authenticated", + "browser_limit_warning": "Remote browsers are currently busy. Please wait for a few minutes and try again" } }, "mainmenu":{ diff --git a/public/locales/es.json b/public/locales/es.json index 35b1fba1..e6953f38 100644 --- a/public/locales/es.json +++ b/public/locales/es.json @@ -62,7 +62,8 @@ "notifications": { "delete_warning": "El robot tiene ejecuciones asociadas. Primero elimine las ejecuciones para eliminar el robot", "delete_success": "Robot eliminado exitosamente", - "auth_success": "Robot autenticado exitosamente" + "auth_success": "Robot autenticado exitosamente", + "browser_limit_warning": "Los navegadores remotos están ocupados actualmente. Por favor, espere unos minutos e inténtelo de nuevo" } }, "mainmenu": { diff --git a/public/locales/ja.json b/public/locales/ja.json index b9e1174a..9d237674 100644 --- a/public/locales/ja.json +++ b/public/locales/ja.json @@ -62,7 +62,8 @@ "notifications": { "delete_warning": "ロボットには関連する実行があります。ロボットを削除するには、まず実行を削除してください", "delete_success": "ロボットが正常に削除されました", - "auth_success": "ロボットの認証に成功しました" + "auth_success": "ロボットの認証に成功しました", + "browser_limit_warning": "リモートブラウザは現在ビジー状態です。数分お待ちいただいてから再度お試しください" } }, "mainmenu": { diff --git a/public/locales/zh.json b/public/locales/zh.json index 6ac76ed9..949faf4d 100644 --- a/public/locales/zh.json +++ b/public/locales/zh.json @@ -62,7 +62,8 @@ "notifications": { "delete_warning": "该机器人有关联的运行记录。请先删除运行记录才能删除机器人", "delete_success": "机器人删除成功", - "auth_success": "机器人认证成功" + "auth_success": "机器人认证成功", + "browser_limit_warning": "远程浏览器当前繁忙。请稍等几分钟后重试" } }, "mainmenu": { diff --git a/server/src/browser-management/classes/BrowserPool.ts b/server/src/browser-management/classes/BrowserPool.ts index c1f0f557..e6dcf6b8 100644 --- a/server/src/browser-management/classes/BrowserPool.ts +++ b/server/src/browser-management/classes/BrowserPool.ts @@ -14,7 +14,7 @@ interface BrowserPoolInfo { /** * The instance of remote browser. */ - browser: RemoteBrowser, + browser: RemoteBrowser | null, /** * States if the browser's instance is being actively used. * Helps to persist the progress on the frontend when the application has been reloaded. @@ -31,6 +31,11 @@ interface BrowserPoolInfo { * @default "recording" */ state: BrowserState, + /** + * The status of the browser instance. + * Can be "reserved", "initializing", "ready" or "failed". + */ + status?: "reserved" | "initializing" | "ready" | "failed", } /** @@ -205,8 +210,18 @@ export class BrowserPool { * @returns remote browser instance or undefined if it does not exist in the pool */ public getRemoteBrowser = (id: string): RemoteBrowser | undefined => { - logger.log('debug', `Remote browser with id: ${id} retrieved from the pool`); - return this.pool[id]?.browser; + const poolInfo = this.pool[id]; + if (!poolInfo) { + return undefined; + } + + // Return undefined for reserved slots (browser is null) + if (poolInfo.status === "reserved") { + logger.log('debug', `Browser ${id} is reserved but not yet ready`); + return undefined; + } + + return poolInfo.browser || undefined; }; /** @@ -506,6 +521,29 @@ export class BrowserPool { return browserIds.length > 0 ? browserIds[0] : null; }; + /** + * Checks if there are available browser slots for a user. + * Returns true if user has available slots AND none of their active browsers are in "recording" state. + * @param userId the user ID to check browser slots for + * @returns {boolean} true if user has available slots and no recording browsers, false otherwise + */ + public hasAvailableBrowserSlots = (userId: string, state?: BrowserState): boolean => { + const userBrowserIds = this.userToBrowserMap.get(userId) || []; + + if (userBrowserIds.length >= 2) { + return false; + } + + if (state === "recording") { + const hasBrowserInState = userBrowserIds.some(browserId => + this.pool[browserId] && this.pool[browserId].state === "recording" + ); + return !hasBrowserInState; + } + + return true; + }; + /** * Returns the first active browser's instance id from the pool. * If there is no active browser, it returns null. @@ -524,4 +562,71 @@ export class BrowserPool { // logger.log('warn', `No active browser in the pool`); return null; }; + + /** + * Reserves a browser slot immediately without creating the actual browser. + * This ensures slot counting is accurate for rapid successive requests. + * + * @param id browser ID to reserve + * @param userId user ID that owns this reservation + * @param state browser state ("recording" or "run") + * @returns true if slot was reserved, false if user has reached limit + */ + public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => { + // Check if user has available slots first + if (!this.hasAvailableBrowserSlots(userId, state)) { + logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`); + return false; + } + + // Reserve the slot with null browser + this.pool[id] = { + browser: null, + active: false, + userId, + state, + status: "reserved" + }; + + // Update the user-to-browser mapping + let userBrowserIds = this.userToBrowserMap.get(userId) || []; + if (!userBrowserIds.includes(id)) { + userBrowserIds.push(id); + this.userToBrowserMap.set(userId, userBrowserIds); + } + + logger.log('info', `Reserved browser slot ${id} for user ${userId} in state ${state}`); + return true; + }; + + /** + * Upgrades a reserved slot to an actual browser instance. + * + * @param id browser ID that was previously reserved + * @param browser the actual RemoteBrowser instance + * @returns true if successful, false if slot wasn't reserved + */ + public upgradeBrowserSlot = (id: string, browser: RemoteBrowser): boolean => { + if (!this.pool[id] || this.pool[id].status !== "reserved") { + logger.log('warn', `Cannot upgrade browser ${id}: slot not reserved`); + return false; + } + + this.pool[id].browser = browser; + this.pool[id].status = "ready"; + logger.log('info', `Upgraded browser slot ${id} to ready state`); + return true; + }; + + /** + * Marks a reserved slot as failed and removes it. + * + * @param id browser ID to mark as failed + */ + public failBrowserSlot = (id: string): void => { + if (this.pool[id]) { + logger.log('info', `Marking browser slot ${id} as failed`); + this.deleteRemoteBrowser(id); + } + }; } \ No newline at end of file diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index 3da388a0..0589b68c 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -54,20 +54,23 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => { * @category BrowserManagement-Controller */ export const createRemoteBrowserForRun = (userId: string): string => { - const id = uuid(); + if (!userId) { + logger.log('error', 'createRemoteBrowserForRun: Missing required parameter userId'); + throw new Error('userId is required'); + } + + const id = uuid(); + + const slotReserved = browserPool.reserveBrowserSlot(id, userId, "run"); + if (!slotReserved) { + logger.log('warn', `Cannot create browser for user ${userId}: no available slots`); + throw new Error('User has reached maximum browser limit'); + } + + logger.log('info', `createRemoteBrowserForRun: Reserved slot ${id} for user ${userId}`); + + initializeBrowserAsync(id, userId); - createSocketConnectionForRun( - io.of(id), - async (socket: Socket) => { - try { - const browserSession = new RemoteBrowser(socket, userId, id); - await browserSession.initialize(userId); - browserPool.addRemoteBrowser(id, browserSession, userId, false, "run"); - socket.emit('ready-for-run'); - } catch (error: any) { - logger.error(`Error initializing browser: ${error.message}`); - } - }); return id; }; @@ -135,6 +138,19 @@ export const getActiveBrowserIdByState = (userId: string, state: "recording" | " return browserPool.getActiveBrowserId(userId, state); }; +/** + * Checks if there are available browser slots for a user. + * Wrapper around {@link browserPool.hasAvailableBrowserSlots()} function. + * If state is provided, also checks that none of their active browsers are in that state. + * @param userId the user ID to check browser slots for + * @param state optional state to check - if provided, ensures no browser is in this state + * @returns {boolean} true if user has available slots (and no browsers in specified state if state is provided) + * @category BrowserManagement-Controller + */ +export const canCreateBrowserInState = (userId: string, state?: "recording" | "run"): boolean => { + return browserPool.hasAvailableBrowserSlots(userId, state); +}; + /** * Returns the url string from a remote browser if exists in the browser pool. * @param id instance id of the remote browser @@ -198,3 +214,87 @@ export const stopRunningInterpretation = async (userId: string) => { logger.log('error', 'Cannot stop interpretation: No active browser or generator.'); } }; + +const initializeBrowserAsync = async (id: string, userId: string) => { + try { + const namespace = io.of(id); + let clientConnected = false; + let connectionTimeout: NodeJS.Timeout; + + const waitForConnection = new Promise((resolve) => { + namespace.on('connection', (socket: Socket) => { + clientConnected = true; + clearTimeout(connectionTimeout); + logger.log('info', `Frontend connected to browser ${id} via socket ${socket.id}`); + resolve(socket); + }); + + connectionTimeout = setTimeout(() => { + if (!clientConnected) { + logger.log('warn', `No client connected to browser ${id} within timeout, proceeding with dummy socket`); + resolve(null); + } + }, 10000); + }); + + namespace.on('error', (error: any) => { + logger.log('error', `Socket namespace error for browser ${id}: ${error.message}`); + clearTimeout(connectionTimeout); + browserPool.failBrowserSlot(id); + }); + + const socket = await waitForConnection; + + try { + let browserSession: RemoteBrowser; + + if (socket) { + logger.log('info', `Using real socket for browser ${id}`); + browserSession = new RemoteBrowser(socket, userId, id); + } else { + logger.log('info', `Using dummy socket for browser ${id}`); + const dummySocket = { + emit: (event: string, data?: any) => { + logger.log('debug', `Browser ${id} dummy socket emitted ${event}:`, data); + }, + on: () => {}, + id: `dummy-${id}`, + } as any; + + browserSession = new RemoteBrowser(dummySocket, userId, id); + } + + await browserSession.initialize(userId); + + const upgraded = browserPool.upgradeBrowserSlot(id, browserSession); + if (!upgraded) { + throw new Error('Failed to upgrade reserved browser slot'); + } + + if (socket) { + socket.emit('ready-for-run'); + } else { + setTimeout(async () => { + try { + logger.log('info', `Starting execution for browser ${id} with dummy socket`); + } catch (error: any) { + logger.log('error', `Error executing run for browser ${id}: ${error.message}`); + } + }, 100); + } + + logger.log('info', `Browser ${id} successfully initialized for run with ${socket ? 'real' : 'dummy'} socket`); + + } catch (error: any) { + logger.log('error', `Error initializing browser ${id}: ${error.message}`); + browserPool.failBrowserSlot(id); + if (socket) { + socket.emit('error', { message: error.message }); + } + } + + } catch (error: any) { + logger.log('error', `Error setting up browser ${id}: ${error.message}`); + browserPool.failBrowserSlot(id); + } +}; diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 1a32f79b..374c24fa 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -82,102 +82,13 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; -/** - * Function to reset browser state without creating a new browser - */ -async function resetBrowserState(browser: RemoteBrowser): Promise { - try { - const currentPage = browser.getCurrentPage(); - if (!currentPage) { - logger.log('error', 'No current page available to reset browser state'); - return false; - } - - // Navigate to blank page to reset state - await currentPage.goto('about:blank', { waitUntil: 'networkidle', timeout: 10000 }); - - // Clear browser storage - await currentPage.evaluate(() => { - try { - localStorage.clear(); - sessionStorage.clear(); - } catch (e) { - // Ignore errors in cleanup - } - }); - - // Clear cookies - const context = currentPage.context(); - await context.clearCookies(); - - return true; - } catch (error) { - logger.log('error', `Failed to reset browser state`); - return false; - } -} - -/** - * Modified checkAndProcessQueuedRun function - only changes browser reset logic - */ -async function checkAndProcessQueuedRun(userId: string, browserId: string): Promise { - try { - // Find the oldest queued run for this specific browser - const queuedRun = await Run.findOne({ - where: { - browserId: browserId, - runByUserId: userId, - status: 'queued' - }, - order: [['startedAt', 'ASC']] - }); - - if (!queuedRun) { - logger.log('info', `No queued runs found for browser ${browserId}`); - return false; - } - - // Reset the browser state before next run - const browser = browserPool.getRemoteBrowser(browserId); - if (browser) { - logger.log('info', `Resetting browser state for browser ${browserId} before next run`); - await resetBrowserState(browser); - } - - // Update the queued run to running status - await queuedRun.update({ - status: 'running', - log: 'Run started - using browser from previous run' - }); - - // Use user-specific queue - const userQueueName = `execute-run-user-${userId}`; - - // Schedule the run execution - await pgBoss.createQueue(userQueueName); - const executeJobId = await pgBoss.send(userQueueName, { - userId: userId, - runId: queuedRun.runId, - browserId: browserId - }); - - logger.log('info', `Scheduled queued run ${queuedRun.runId} to use browser ${browserId}, job ID: ${executeJobId}`); - return true; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Error checking for queued runs: ${errorMessage}`); - return false; - } -} - -/** - * Modified processRunExecution function - only add browser reset - */ async function processRunExecution(job: Job) { - try { - const data = job.data; - logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`); - + const BROWSER_INIT_TIMEOUT = 30000; + + const data = job.data; + logger.log('info', `Processing run execution job for runId: ${data.runId}`); + + try { // Find the run const run = await Run.findOne({ where: { runId: data.runId } }); if (!run) { @@ -191,6 +102,11 @@ async function processRunExecution(job: Job) { } const plainRun = run.toJSON(); + const browserId = data.browserId || plainRun.browserId; + + if (!browserId) { + throw new Error(`No browser ID available for run ${data.runId}`); + } // Find the recording const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); @@ -231,33 +147,47 @@ async function processRunExecution(job: Job) { } } - // Check for queued runs even if this one failed - await checkAndProcessQueuedRun(data.userId, data.browserId); - return { success: false }; } + logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`); + // Get the browser and execute the run - const browser = browserPool.getRemoteBrowser(plainRun.browserId); - let currentPage = browser?.getCurrentPage(); + let browser = browserPool.getRemoteBrowser(browserId); + const browserWaitStart = Date.now(); - if (!browser || !currentPage) { - logger.log('error', `Browser or page not available for run ${data.runId}`); - - // Even if this run failed, check for queued runs - await checkAndProcessQueuedRun(data.userId, data.browserId); - - return { success: false }; + while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) { + logger.log('debug', `Browser ${browserId} not ready yet, waiting...`); + await new Promise(resolve => setTimeout(resolve, 1000)); + browser = browserPool.getRemoteBrowser(browserId); } + if (!browser) { + throw new Error(`Browser ${browserId} not found in pool after timeout`); + } + + logger.log('info', `Browser ${browserId} found and ready for execution`); + try { - // Reset the browser state before executing this run - await resetBrowserState(browser); - const isRunAborted = async (): Promise => { const currentRun = await Run.findOne({ where: { runId: data.runId } }); return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; }; + + let currentPage = browser.getCurrentPage(); + + const pageWaitStart = Date.now(); + while (!currentPage && (Date.now() - pageWaitStart) < 30000) { + logger.log('debug', `Page not ready for browser ${browserId}, waiting...`); + await new Promise(resolve => setTimeout(resolve, 1000)); + currentPage = browser.getCurrentPage(); + } + + if (!currentPage) { + throw new Error(`No current page available for browser ${browserId} after timeout`); + } + + logger.log('info', `Starting workflow execution for run ${data.runId}`); // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); @@ -271,12 +201,7 @@ async function processRunExecution(job: Job) { if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); - const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); - - if (!queuedRunProcessed) { - await destroyRemoteBrowser(plainRun.browserId, data.userId); - logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); - } + await destroyRemoteBrowser(plainRun.browserId, data.userId); return { success: true }; } @@ -415,14 +340,8 @@ async function processRunExecution(job: Job) { finishedAt: new Date().toLocaleString() }); - // Check for and process queued runs before destroying the browser - const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); - - // Only destroy the browser if no queued run was found - if (!queuedRunProcessed) { - await destroyRemoteBrowser(plainRun.browserId, data.userId); - logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); - } + await destroyRemoteBrowser(plainRun.browserId, data.userId); + logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); return { success: true }; } catch (executionError: any) { @@ -477,18 +396,7 @@ async function processRunExecution(job: Job) { logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); } - // Check for queued runs before destroying the browser - const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); - - // Only destroy the browser if no queued run was found - if (!queuedRunProcessed) { - try { - await destroyRemoteBrowser(plainRun.browserId, data.userId); - logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); - } catch (cleanupError: any) { - logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`); - } - } + await destroyRemoteBrowser(plainRun.browserId, data.userId); return { success: false }; } @@ -607,23 +515,14 @@ async function abortRun(runId: string, userId: string): Promise { } catch (socketError) { logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); } - - let queuedRunProcessed = false; - try { - queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId); - } catch (queueError) { - logger.log('warn', `Error checking queued runs: ${queueError}`); - } - if (!queuedRunProcessed) { - try { - await new Promise(resolve => setTimeout(resolve, 500)); - - await destroyRemoteBrowser(plainRun.browserId, userId); - logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`); - } catch (cleanupError) { - logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); - } + try { + await new Promise(resolve => setTimeout(resolve, 500)); + + await destroyRemoteBrowser(plainRun.browserId, userId); + logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`); + } catch (cleanupError) { + logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`); } return true; diff --git a/server/src/routes/record.ts b/server/src/routes/record.ts index 374f837a..8a589811 100644 --- a/server/src/routes/record.ts +++ b/server/src/routes/record.ts @@ -11,6 +11,7 @@ import { getRemoteBrowserCurrentTabs, getActiveBrowserIdByState, destroyRemoteBrowser, + canCreateBrowserInState, } from '../browser-management/controller'; import { chromium } from 'playwright-extra'; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; @@ -181,6 +182,18 @@ router.get('/active', requireSignIn, (req: AuthenticatedRequest, res) => { return res.send(id); }); +/** + * GET endpoint for checking if the user can create a new remote browser. + */ +router.get('/can-create/:state', requireSignIn, (req: AuthenticatedRequest, res) => { + if (!req.user) { + return res.status(401).send('User not authenticated'); + } + const state = req.params.state as "recording" | "run"; + const canCreate = canCreateBrowserInState(req.user.id, state); + return res.json({ canCreate }); +}); + /** * GET endpoint for getting the current url of the active remote browser. */ diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index b4e8cdfd..0ebe8492 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -1,6 +1,6 @@ import { Router } from 'express'; import logger from "../logger"; -import { createRemoteBrowserForRun, getActiveBrowserIdByState } from "../browser-management/controller"; +import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserIdByState } from "../browser-management/controller"; import { chromium } from 'playwright-extra'; import stealthPlugin from 'puppeteer-extra-plugin-stealth'; import { browserPool } from "../server"; @@ -517,98 +517,124 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => return res.status(401).send({ error: 'Unauthorized' }); } - const proxyConfig = await getDecryptedProxyConfig(req.user.id); - let proxyOptions: any = {}; - - if (proxyConfig.proxy_url) { - proxyOptions = { - server: proxyConfig.proxy_url, - ...(proxyConfig.proxy_username && proxyConfig.proxy_password && { - username: proxyConfig.proxy_username, - password: proxyConfig.proxy_password, - }), - }; - } - - console.log(`Proxy config for run: ${JSON.stringify(proxyOptions)}`); - // Generate runId first const runId = uuid(); - // Check if user has reached browser limit - const userBrowserIds = browserPool.getAllBrowserIdsForUser(req.user.id); - const canCreateBrowser = userBrowserIds.length < 2; - - if (canCreateBrowser) { - // User has available browser slots, create it directly - const id = createRemoteBrowserForRun(req.user.id); + const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(req.user.id, "run"); - const run = await Run.create({ - status: 'running', + if (canCreateBrowser) { + let browserId: string; + + try { + browserId = await createRemoteBrowserForRun(req.user.id); + + if (!browserId || browserId.trim() === '') { + throw new Error('Failed to generate valid browser ID'); + } + + logger.log('info', `Created browser ${browserId} for run ${runId}`); + + } catch (browserError: any) { + logger.log('error', `Failed to create browser: ${browserError.message}`); + return res.status(500).send({ error: 'Failed to create browser instance' }); + } + + try { + await Run.create({ + status: 'running', + name: recording.recording_meta.name, + robotId: recording.id, + robotMetaId: recording.recording_meta.id, + startedAt: new Date().toLocaleString(), + finishedAt: '', + browserId: browserId, + interpreterSettings: req.body, + log: '', + runId, + runByUserId: req.user.id, + serializableOutput: {}, + binaryOutput: {}, + }); + + logger.log('info', `Created run ${runId} with browser ${browserId}`); + + } catch (dbError: any) { + logger.log('error', `Database error creating run: ${dbError.message}`); + + try { + await destroyRemoteBrowser(browserId, req.user.id); + } catch (cleanupError: any) { + logger.log('warn', `Failed to cleanup browser after run creation failure: ${cleanupError.message}`); + } + + return res.status(500).send({ error: 'Failed to create run record' }); + } + + try { + const userQueueName = `execute-run-user-${req.user.id}`; + await pgBoss.createQueue(userQueueName); + + const jobId = await pgBoss.send(userQueueName, { + userId: req.user.id, + runId: runId, + browserId: browserId, + }); + + logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`); + } catch (queueError: any) { + logger.log('error', `Failed to queue run execution: ${queueError.message}`); + + try { + await Run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Failed to queue execution job' + }, { where: { runId: runId } }); + + await destroyRemoteBrowser(browserId, req.user.id); + } catch (cleanupError: any) { + logger.log('warn', `Failed to cleanup after queue error: ${cleanupError.message}`); + } + + return res.status(503).send({ error: 'Unable to queue run, please try again later' }); + } + + return res.send({ + browserId: browserId, + runId: runId, + robotMetaId: recording.recording_meta.id, + queued: false + }); + } else { + const browserId = uuid(); + + await Run.create({ + status: 'queued', name: recording.recording_meta.name, robotId: recording.id, robotMetaId: recording.recording_meta.id, startedAt: new Date().toLocaleString(), finishedAt: '', - browserId: id, + browserId, interpreterSettings: req.body, - log: '', + log: 'Run queued - waiting for available browser slot', runId, runByUserId: req.user.id, serializableOutput: {}, binaryOutput: {}, }); - - const plainRun = run.toJSON(); - + return res.send({ - browserId: id, - runId: plainRun.runId, + browserId: browserId, + runId: runId, robotMetaId: recording.recording_meta.id, - queued: false + queued: true }); - } else { - const browserId = getActiveBrowserIdByState(req.user.id, "run") - - if (browserId) { - // User has reached the browser limit, queue the run - try { - // Create the run record with 'queued' status - await Run.create({ - status: 'queued', - name: recording.recording_meta.name, - robotId: recording.id, - robotMetaId: recording.recording_meta.id, - startedAt: new Date().toLocaleString(), - finishedAt: '', - browserId: browserId, // Random will be updated later - interpreterSettings: req.body, - log: 'Run queued - waiting for available browser slot', - runId, - runByUserId: req.user.id, - serializableOutput: {}, - binaryOutput: {}, - }); - - return res.send({ - browserId: browserId, - runId: runId, - robotMetaId: recording.recording_meta.id, - queued: true, - }); - } catch (queueError: any) { - logger.log('error', `Failed to queue run job: ${queueError.message}`); - return res.status(503).send({ error: 'Unable to queue run, please try again later' }); - } - } else { - logger.log('info', "Browser id does not exist"); - return res.send(''); - } } } catch (e) { const { message } = e as Error; - logger.log('info', `Error while creating a run with robot id: ${req.params.id} - ${message}`); - return res.send(''); + logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`); + return res.status(500).send({ error: 'Internal server error' }); } }); @@ -919,3 +945,73 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, return res.send(false); } }); + +async function processQueuedRuns() { + try { + const queuedRun = await Run.findOne({ + where: { status: 'queued' }, + order: [['startedAt', 'ASC']] + }); + + if (!queuedRun) return; + + const userId = queuedRun.runByUserId; + + const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run"); + + if (canCreateBrowser) { + logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`); + + const recording = await Robot.findOne({ + where: { + 'recording_meta.id': queuedRun.robotMetaId + }, + raw: true + }); + + if (!recording) { + await queuedRun.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Recording not found' + }); + return; + } + + try { + const newBrowserId = await createRemoteBrowserForRun(userId); + + logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`); + + await queuedRun.update({ + status: 'running', + browserId: newBrowserId, + log: 'Browser created and ready for execution' + }); + + const userQueueName = `execute-run-user-${userId}`; + await pgBoss.createQueue(userQueueName); + + const jobId = await pgBoss.send(userQueueName, { + userId: userId, + runId: queuedRun.runId, + browserId: newBrowserId, + }); + + logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`); + + } catch (browserError: any) { + logger.log('error', `Failed to create browser for queued run: ${browserError.message}`); + await queuedRun.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed to create browser: ${browserError.message}` + }); + } + } + } catch (error: any) { + logger.log('error', `Error processing queued runs: ${error.message}`); + } +} + +export { processQueuedRuns }; diff --git a/server/src/server.ts b/server/src/server.ts index bd4a1697..c03aaaa7 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -20,6 +20,7 @@ import connectPgSimple from 'connect-pg-simple'; import pg from 'pg'; import session from 'express-session'; import Run from './models/Run'; +import { processQueuedRuns } from './routes/storage'; const app = express(); app.use(cors({ @@ -163,6 +164,10 @@ app.use((req, res, next) => { next(); }); +setInterval(() => { + processQueuedRuns(); +}, 5000); + server.listen(SERVER_PORT, '0.0.0.0', async () => { try { await connectDB(); diff --git a/src/api/recording.ts b/src/api/recording.ts index 6b816001..ce07753a 100644 --- a/src/api/recording.ts +++ b/src/api/recording.ts @@ -35,6 +35,19 @@ export const getActiveBrowserId = async(): Promise => { } }; +export const canCreateBrowserInState = async(state: "recording" | "run"): Promise => { + try { + const response = await axios.get(`${apiUrl}/record/can-create/${state}`, { withCredentials: true }); + if (response.status === 200) { + return response.data.canCreate; + } else { + return false; + } + } catch(error: any) { + return false; + } +}; + export const interpretCurrentRecording = async(): Promise => { try { const response = await axios.get(`${apiUrl}/record/interpret`); diff --git a/src/api/storage.ts b/src/api/storage.ts index 295f340c..02558c72 100644 --- a/src/api/storage.ts +++ b/src/api/storage.ts @@ -154,6 +154,27 @@ export const editRecordingFromStorage = async (browserId: string, id: string): P } }; +export interface CreateRunResponseWithQueue extends CreateRunResponse { + queued?: boolean; +} + +export const createAndRunRecording = async (id: string, settings: RunSettings): Promise => { + try { + const response = await axios.put( + `${apiUrl}/storage/runs/${id}`, + { ...settings, withCredentials: true } + ); + if (response.status === 200) { + return response.data; + } else { + throw new Error(`Couldn't create and run recording ${id}`); + } + } catch (error: any) { + console.log(error); + return { browserId: '', runId: '', robotMetaId: '', queued: false }; + } +} + export const createRunForStoredRecording = async (id: string, settings: RunSettings): Promise => { try { const response = await axios.put( diff --git a/src/components/robot/RecordingsTable.tsx b/src/components/robot/RecordingsTable.tsx index 6a517a13..6ea86f74 100644 --- a/src/components/robot/RecordingsTable.tsx +++ b/src/components/robot/RecordingsTable.tsx @@ -39,7 +39,7 @@ import { useGlobalInfoStore } from "../../context/globalInfo"; import { checkRunsForRecording, deleteRecordingFromStorage, getStoredRecordings } from "../../api/storage"; import { Add } from "@mui/icons-material"; import { useNavigate } from 'react-router-dom'; -import { getActiveBrowserId, stopRecording } from "../../api/recording"; +import { canCreateBrowserInState, getActiveBrowserId, stopRecording } from "../../api/recording"; import { GenericModal } from '../ui/GenericModal'; declare global { @@ -274,11 +274,16 @@ export const RecordingsTable = ({ }, [setRecordings, notify, t]); const handleNewRecording = useCallback(async () => { - const activeBrowserId = await getActiveBrowserId(); + const canCreateRecording = await canCreateBrowserInState("recording"); - if (activeBrowserId) { - setActiveBrowserId(activeBrowserId); - setWarningModalOpen(true); + if (!canCreateRecording) { + const activeBrowserId = await getActiveBrowserId(); + if (activeBrowserId) { + setActiveBrowserId(activeBrowserId); + setWarningModalOpen(true); + } else { + notify('warning', t('recordingtable.notifications.browser_limit_warning')); + } } else { setModalOpen(true); } @@ -314,7 +319,6 @@ export const RecordingsTable = ({ }; const handleRetrainRobot = useCallback(async (id: string, name: string) => { - const activeBrowserId = await getActiveBrowserId(); const robot = rows.find(row => row.id === id); let targetUrl; @@ -340,11 +344,18 @@ export const RecordingsTable = ({ window.sessionStorage.setItem('initialUrl', targetUrl); } - if (activeBrowserId) { - setActiveBrowserId(activeBrowserId); - setWarningModalOpen(true); + const canCreateRecording = await canCreateBrowserInState("recording"); + + if (!canCreateRecording) { + const activeBrowserId = await getActiveBrowserId(); + if (activeBrowserId) { + setActiveBrowserId(activeBrowserId); + setWarningModalOpen(true); + } else { + notify('warning', t('recordingtable.notifications.browser_limit_warning')); + } } else { - startRetrainRecording(id, name, targetUrl); + startRetrainRecording(id, name, targetUrl); } }, [rows, setInitialUrl, setRecordingUrl]); diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index 8d1a623c..02dd503c 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -1,4 +1,4 @@ -import React, { useCallback, useEffect } from 'react'; +import React, { useCallback, useContext, useEffect } from 'react'; import { useTranslation } from 'react-i18next'; import { MainMenu } from "../components/dashboard/MainMenu"; import { Stack } from "@mui/material"; @@ -7,13 +7,14 @@ import { Runs } from "../components/run/Runs"; import ProxyForm from '../components/proxy/ProxyForm'; import ApiKey from '../components/api/ApiKey'; import { useGlobalInfoStore } from "../context/globalInfo"; -import { createRunForStoredRecording, interpretStoredRecording, notifyAboutAbort, scheduleStoredRecording } from "../api/storage"; +import { createAndRunRecording, createRunForStoredRecording, CreateRunResponseWithQueue, interpretStoredRecording, notifyAboutAbort, scheduleStoredRecording } from "../api/storage"; import { io, Socket } from "socket.io-client"; import { stopRecording } from "../api/recording"; import { RunSettings } from "../components/run/RunSettings"; import { ScheduleSettings } from "../components/robot/ScheduleSettings"; import { apiUrl } from "../apiConfig"; import { useNavigate } from 'react-router-dom'; +import { AuthContext } from '../context/auth'; interface MainPageProps { handleEditRecording: (id: string, fileName: string) => void; @@ -43,12 +44,16 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) runId: '', robotMetaId: '' }); + const [queuedRuns, setQueuedRuns] = React.useState>(new Set()); let aborted = false; const { notify, setRerenderRuns, setRecordingId } = useGlobalInfoStore(); const navigate = useNavigate(); + const { state } = useContext(AuthContext); + const { user } = state; + const abortRunHandler = (runId: string) => { aborted = true; notifyAboutAbort(runId).then(async (response) => { @@ -90,48 +95,109 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) }, [currentInterpretationLog]) const handleRunRecording = useCallback((settings: RunSettings) => { - createRunForStoredRecording(runningRecordingId, settings).then(({ browserId, runId, robotMetaId }: CreateRunResponse) => { + createAndRunRecording(runningRecordingId, settings).then((response: CreateRunResponseWithQueue) => { + const { browserId, runId, robotMetaId, queued } = response; + setIds({ browserId, runId, robotMetaId }); navigate(`/runs/${robotMetaId}/run/${runId}`); - const socket = - io(`${apiUrl}/${browserId}`, { + + if (queued) { + console.log('Creating queue socket for queued run:', runId); + + setQueuedRuns(prev => new Set([...prev, runId])); + + const queueSocket = io(`${apiUrl}/queued-run`, { + transports: ["websocket"], + rejectUnauthorized: false, + query: { userId: user?.id } + }); + + queueSocket.on('connect', () => { + console.log('Queue socket connected for user:', user?.id); + }); + + queueSocket.on('connect_error', (error) => { + console.log('Queue socket connection error:', error); + }); + + queueSocket.on('run-completed', (completionData) => { + if (completionData.runId === runId) { + setRunningRecordingName(''); + setCurrentInterpretationLog(''); + setRerenderRuns(true); + + setQueuedRuns(prev => { + const newSet = new Set(prev); + newSet.delete(runId); + return newSet; + }); + + const robotName = completionData.robotName || runningRecordingName; + + if (completionData.status === 'success') { + notify('success', t('main_page.notifications.interpretation_success', { name: robotName })); + } else { + notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); + } + + queueSocket.disconnect(); + } + }); + + setSockets(sockets => [...sockets, queueSocket]); + + notify('info', `Run queued: ${runningRecordingName}`); + } else { + const socket = io(`${apiUrl}/${browserId}`, { transports: ["websocket"], rejectUnauthorized: false }); - setSockets(sockets => [...sockets, socket]); - socket.on('ready-for-run', () => readyForRunHandler(browserId, runId)); - socket.on('debugMessage', debugMessageHandler); - socket.on('run-completed', (data) => { - setRerenderRuns(true); - const robotName = data.robotName; + setSockets(sockets => [...sockets, socket]); - if (data.status === 'success') { - notify('success', t('main_page.notifications.interpretation_success', { name: robotName })); + socket.on('debugMessage', debugMessageHandler); + socket.on('run-completed', (data) => { + setRunningRecordingName(''); + setCurrentInterpretationLog(''); + setRerenderRuns(true); + + const robotName = data.robotName; + + if (data.status === 'success') { + notify('success', t('main_page.notifications.interpretation_success', { name: robotName })); + } else { + notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); + } + }); + + socket.on('connect_error', (error) => { + console.log('error', `Failed to connect to browser ${browserId}: ${error}`); + notify('error', t('main_page.notifications.connection_failed', { name: runningRecordingName })); + }); + + socket.on('disconnect', (reason) => { + console.log('warn', `Disconnected from browser ${browserId}: ${reason}`); + }); + + if (runId) { + notify('info', t('main_page.notifications.run_started', { name: runningRecordingName })); } else { - notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); + notify('error', t('main_page.notifications.run_start_failed', { name: runningRecordingName })); } - }); - - socket.on('run-aborted', (data) => { - setRerenderRuns(true); - - const abortedRobotName = data.robotName; - notify('success', t('main_page.notifications.abort_success', { name: abortedRobotName })); - }); - - setContent('runs'); - if (browserId) { - notify('info', t('main_page.notifications.run_started', { name: runningRecordingName })); - } else { - notify('error', t('main_page.notifications.run_start_failed', { name: runningRecordingName })); } - }) - return (socket: Socket, browserId: string, runId: string) => { - socket.off('ready-for-run', () => readyForRunHandler(browserId, runId)); + + setContent('runs'); + }).catch((error: any) => { + console.error('Error in createAndRunRecording:', error); // ✅ Debug log + }); + + return (socket: Socket) => { socket.off('debugMessage', debugMessageHandler); + socket.off('run-completed'); + socket.off('connect_error'); + socket.off('disconnect'); } - }, [runningRecordingName, sockets, ids, readyForRunHandler, debugMessageHandler]) + }, [runningRecordingName, sockets, ids, debugMessageHandler, user?.id, t, notify, setRerenderRuns, setQueuedRuns, navigate, setContent, setIds]); const handleScheduleRecording = (settings: ScheduleSettings) => { scheduleStoredRecording(runningRecordingId, settings)