diff --git a/server/src/api/record.ts b/server/src/api/record.ts index 7c665001..1f567c3e 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -15,8 +15,8 @@ import { AuthenticatedRequest } from "../routes/record" import {capture} from "../utils/analytics"; import { Page } from "playwright"; import { WorkflowFile } from "maxun-core"; -import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../workflow-management/integrations/gsheet"; -import { airtableUpdateTasks, processAirtableUpdates } from "../workflow-management/integrations/airtable"; +import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from "../workflow-management/integrations/gsheet"; +import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from "../workflow-management/integrations/airtable"; import { sendWebhook } from "../routes/webhook"; import { convertPageToHTML, convertPageToMarkdown } from '../markdownify/scrape'; @@ -557,32 +557,44 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { } } +function withTimeout(promise: Promise, timeoutMs: number, operation: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs) + ) + ]); +} + async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { try { - googleSheetUpdateTasks[runId] = { + addGoogleSheetUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - airtableUpdateTasks[runId] = { + addAirtableUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + withTimeout(processAirtableUpdates(), 65000, 'Airtable update') + .catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + + withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update') + .catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } } -async function readyForRunHandler(browserId: string, id: string, userId: string, requestedFormats?: string[]){ +async function readyForRunHandler(browserId: string, id: string, userId: string, socket: Socket){ try { - const result = await executeRun(id, userId, requestedFormats); + const result = await executeRun(id, userId); if (result && result.success) { logger.log('info', `Interpretation of ${id} succeeded`); @@ -599,6 +611,8 @@ async function readyForRunHandler(browserId: string, id: string, userId: string, logger.error(`Error during readyForRunHandler: ${error.message}`); await destroyRemoteBrowser(browserId, userId); return null; + } finally { + cleanupSocketConnection(socket, browserId, id); } } @@ -688,15 +702,23 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ let html = ''; const serializableOutput: any = {}; - // Markdown conversion + const SCRAPE_TIMEOUT = 120000; + if (formats.includes('markdown')) { - markdown = await convertPageToMarkdown(url); + const markdownPromise = convertPageToMarkdown(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + markdown = await Promise.race([markdownPromise, timeoutPromise]); serializableOutput.markdown = [{ content: markdown }]; } - // HTML conversion if (formats.includes('html')) { - html = await convertPageToHTML(url); + const htmlPromise = convertPageToHTML(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + html = await Promise.race([htmlPromise, timeoutPromise]); serializableOutput.html = [{ content: html }]; } @@ -808,6 +830,22 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ ); } + try { + await sendWebhook(plainRun.robotMetaId, 'run_failed', { + robot_id: plainRun.robotMetaId, + run_id: plainRun.runId, + robot_name: recording.recording_meta.name, + status: 'failed', + finished_at: new Date().toLocaleString(), + error: { + message: error.message, + type: 'ConversionError' + } + }); + } catch (webhookError: any) { + logger.log('warn', `Failed to send webhook for failed API scrape run ${plainRun.runId}: ${webhookError.message}`); + } + capture("maxun-oss-run-created-api", { runId: plainRun.runId, user_id: userId, @@ -838,13 +876,24 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ browser.interpreter.setRunId(plainRun.runId); - const interpretationInfo = await browser.interpreter.InterpretRecording( + const INTERPRETATION_TIMEOUT = 600000; + + const interpretationPromise = browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT); + }); + + const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + if (browser && browser.interpreter) { + await browser.interpreter.clearState(); + } await destroyRemoteBrowser(plainRun.browserId, userId); const updatedRun = await run.update({ @@ -854,6 +903,25 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ binaryOutput: uploadedBinaryOutput, }); + try { + const completionData = { + runId: plainRun.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording.recording_meta.name, + status: 'success', + finishedAt: new Date().toLocaleString(), + runByUserId: plainRun.runByUserId, + runByScheduleId: plainRun.runByScheduleId, + runByAPI: plainRun.runByAPI || false, + browserId: plainRun.browserId + }; + + serverIo.of('/queued-run').to(`user-${userId}`).emit('run-completed', completionData); + logger.log('info', `API run completed notification sent for run: ${plainRun.runId} to user-${userId}`); + } catch (socketError: any) { + logger.log('warn', `Failed to send run-completed notification for API run ${plainRun.runId}: ${socketError.message}`); + } + let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; @@ -950,6 +1018,17 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); const run = await Run.findOne({ where: { runId: id } }); if (run) { + if (browser) { + try { + if (browser.interpreter) { + await browser.interpreter.clearState(); + } + await destroyRemoteBrowser(run.browserId, userId); + } catch (cleanupError: any) { + logger.error(`Failed to cleanup browser in error handler: ${cleanupError.message}`); + } + } + await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), @@ -1020,6 +1099,8 @@ async function executeRun(id: string, userId: string, requestedFormats?: string[ } export async function handleRunRecording(id: string, userId: string, requestedFormats?: string[]) { + let socket: Socket | null = null; + try { const result = await createWorkflowAndStoreMetadata(id, userId); const { browserId, runId: newRunId } = result; @@ -1028,41 +1109,79 @@ export async function handleRunRecording(id: string, userId: string, requestedFo throw new Error('browserId or runId or userId is undefined'); } - const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, { + const CONNECTION_TIMEOUT = 30000; + + socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, { transports: ['websocket'], - rejectUnauthorized: false + rejectUnauthorized: false, + timeout: CONNECTION_TIMEOUT, }); - socket.on('ready-for-run', () => readyForRunHandler(browserId, newRunId, userId, requestedFormats)); + const readyHandler = () => readyForRunHandler(browserId, newRunId, userId, socket!); + + socket.on('ready-for-run', readyHandler); + + socket.on('connect_error', (error: Error) => { + logger.error(`Socket connection error for API run ${newRunId}: ${error.message}`); + cleanupSocketConnection(socket!, browserId, newRunId); + }); + + socket.on('disconnect', () => { + cleanupSocketConnection(socket!, browserId, newRunId); + }); logger.log('info', `Running Robot: ${id}`); - socket.on('disconnect', () => { - cleanupSocketListeners(socket, browserId, newRunId, userId); - }); - - // Return the runId immediately, so the client knows the run is started return newRunId; } catch (error: any) { logger.error('Error running robot:', error); + if (socket) { + cleanupSocketConnection(socket, '', ''); + } } } -function cleanupSocketListeners(socket: Socket, browserId: string, id: string, userId: string) { - socket.off('ready-for-run', () => readyForRunHandler(browserId, id, userId)); - logger.log('info', `Cleaned up listeners for browserId: ${browserId}, runId: ${id}`); +function cleanupSocketConnection(socket: Socket, browserId: string, id: string) { + try { + socket.removeAllListeners(); + socket.disconnect(); + + if (browserId) { + const namespace = serverIo.of(browserId); + namespace.removeAllListeners(); + namespace.disconnectSockets(true); + const nsps = (serverIo as any)._nsps; + if (nsps && nsps.has(`/${browserId}`)) { + nsps.delete(`/${browserId}`); + logger.log('debug', `Deleted namespace /${browserId} from io._nsps Map`); + } + } + + logger.log('info', `Cleaned up socket connection for browserId: ${browserId}, runId: ${id}`); + } catch (error: any) { + logger.error(`Error cleaning up socket connection: ${error.message}`); + } } async function waitForRunCompletion(runId: string, interval: number = 2000) { + const MAX_WAIT_TIME = 180 * 60 * 1000; + const startTime = Date.now(); + while (true) { - const run = await Run.findOne({ where: { runId }, raw: true }); + if (Date.now() - startTime > MAX_WAIT_TIME) { + throw new Error('Run completion timeout after 3 hours'); + } + + const run = await Run.findOne({ where: { runId } }); if (!run) throw new Error('Run not found'); if (run.status === 'success') { - return run; + return run.toJSON(); } else if (run.status === 'failed') { throw new Error('Run failed'); + } else if (run.status === 'aborted' || run.status === 'aborting') { + throw new Error('Run was aborted'); } await new Promise(resolve => setTimeout(resolve, interval)); diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index bb33ffe5..3545cb3a 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -15,8 +15,8 @@ import Robot from './models/Robot'; import { browserPool } from './server'; import { Page } from 'playwright'; import { capture } from './utils/analytics'; -import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet'; -import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; +import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet'; +import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { io as serverIo } from "./server"; import { sendWebhook } from './routes/webhook'; import { BinaryOutputService } from './storage/mino'; @@ -59,7 +59,7 @@ interface AbortRunData { const pgBoss = new PgBoss({ connectionString: pgBossConnectionString, expireInHours: 23, - max: 3, + max: 5, }); /** @@ -86,26 +86,36 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; +function withTimeout(promise: Promise, timeoutMs: number, operation: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs) + ) + ]); +} -// Helper function to handle integration updates async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { try { - googleSheetUpdateTasks[runId] = { + addGoogleSheetUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - airtableUpdateTasks[runId] = { + addAirtableUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + withTimeout(processAirtableUpdates(), 65000, 'Airtable update') + .catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + + withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update') + .catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } @@ -115,8 +125,8 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr * Modified processRunExecution function - only add browser reset */ async function processRunExecution(job: Job) { - const BROWSER_INIT_TIMEOUT = 60000; - const BROWSER_PAGE_TIMEOUT = 45000; + const BROWSER_INIT_TIMEOUT = 30000; + const BROWSER_PAGE_TIMEOUT = 15000; const data = job.data; logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`); @@ -211,15 +221,23 @@ async function processRunExecution(job: Job) { let html = ''; const serializableOutput: any = {}; - // Markdown conversion + const SCRAPE_TIMEOUT = 120000; + if (formats.includes('markdown')) { - markdown = await convertPageToMarkdown(url); + const markdownPromise = convertPageToMarkdown(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + markdown = await Promise.race([markdownPromise, timeoutPromise]); serializableOutput.markdown = [{ content: markdown }]; } - // HTML conversion if (formats.includes('html')) { - html = await convertPageToHTML(url); + const htmlPromise = convertPageToHTML(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + html = await Promise.race([htmlPromise, timeoutPromise]); serializableOutput.html = [{ content: html }]; } @@ -375,21 +393,33 @@ async function processRunExecution(job: Job) { logger.log('warn', `Failed to send run-started notification for API run ${plainRun.runId}: ${socketError.message}`); } - // Execute the workflow - const workflow = AddGeneratedFlags(recording.recording); - browser.interpreter.setRunId(data.runId); - - const interpretationInfo = await browser.interpreter.InterpretRecording( - workflow, - currentPage, - (newPage: Page) => currentPage = newPage, - plainRun.interpreterSettings + + const INTERPRETATION_TIMEOUT = 600000; + + const interpretationPromise = browser.interpreter.InterpretRecording( + AddGeneratedFlags(recording.recording), + currentPage, + (newPage: Page) => currentPage = newPage, + plainRun.interpreterSettings, ); + + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT); + }); + + const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]); if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); + try { + await browser.interpreter.clearState(); + logger.debug(`Cleared interpreter state for aborted run ${data.runId}`); + } catch (clearError: any) { + logger.warn(`Failed to clear interpreter state on abort: ${clearError.message}`); + } + await destroyRemoteBrowser(plainRun.browserId, data.userId); return { success: true }; @@ -635,6 +665,15 @@ async function processRunExecution(job: Job) { totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0, }); + try { + if (browser && browser.interpreter) { + await browser.interpreter.clearState(); + logger.debug(`Cleared interpreter state for failed run ${data.runId}`); + } + } catch (clearError: any) { + logger.warn(`Failed to clear interpreter state on error: ${clearError.message}`); + } + await destroyRemoteBrowser(browserId, data.userId); logger.log('info', `Browser ${browserId} destroyed after failed run`); @@ -804,6 +843,8 @@ async function abortRun(runId: string, userId: string): Promise { const registeredUserQueues = new Map(); const registeredAbortQueues = new Map(); +const workerIntervals: NodeJS.Timeout[] = []; + async function registerWorkerForQueue(queueName: string) { if (!registeredUserQueues.has(queueName)) { await pgBoss.work(queueName, async (job: Job | Job[]) => { @@ -866,21 +907,7 @@ async function registerRunExecutionWorker() { const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-')); for (const queue of userQueues) { - if (!registeredUserQueues.has(queue.name)) { - await pgBoss.work(queue.name, async (job: Job | Job[]) => { - try { - const singleJob = Array.isArray(job) ? job[0] : job; - return await processRunExecution(singleJob); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`); - throw error; - } - }); - - registeredUserQueues.set(queue.name, true); - logger.log('info', `Registered worker for queue: ${queue.name}`); - } + await registerWorkerForQueue(queue.name); } } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -890,10 +917,15 @@ async function registerRunExecutionWorker() { await checkForNewUserQueues(); - setInterval(async () => { - await checkForNewUserQueues(); + const userQueueInterval = setInterval(async () => { + try { + await checkForNewUserQueues(); + } catch (error: any) { + logger.log('error', `Error checking user queues: ${error.message}`); + } }, 10000); - + workerIntervals.push(userQueueInterval); + logger.log('info', 'Run execution worker registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -903,7 +935,6 @@ async function registerRunExecutionWorker() { async function registerAbortRunWorker() { try { - const registeredAbortQueues = new Map(); const checkForNewAbortQueues = async () => { try { @@ -912,25 +943,7 @@ async function registerAbortRunWorker() { const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-')); for (const queue of abortQueues) { - if (!registeredAbortQueues.has(queue.name)) { - await pgBoss.work(queue.name, async (job: Job | Job[]) => { - try { - const data = extractJobData(job); - const { userId, runId } = data; - - logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); - const success = await abortRun(runId, userId); - return { success }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`); - throw error; - } - }); - - registeredAbortQueues.set(queue.name, true); - logger.log('info', `Registered abort worker for queue: ${queue.name}`); - } + await registerAbortWorkerForQueue(queue.name); } } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -940,9 +953,14 @@ async function registerAbortRunWorker() { await checkForNewAbortQueues(); - setInterval(async () => { - await checkForNewAbortQueues(); + const abortQueueInterval = setInterval(async () => { + try { + await checkForNewAbortQueues(); + } catch (error: any) { + logger.log('error', `Error checking abort queues: ${error.message}`); + } }, 10000); + workerIntervals.push(abortQueueInterval); logger.log('info', 'Abort run worker registration system initialized'); } catch (error: unknown) { @@ -1050,15 +1068,22 @@ pgBoss.on('error', (error) => { // Handle graceful shutdown process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss...'); + + logger.log('info', `Clearing ${workerIntervals.length} worker intervals...`); + workerIntervals.forEach(clearInterval); + await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss stopped, waiting for main process cleanup...'); }); process.on('SIGINT', async () => { logger.log('info', 'SIGINT received, shutting down PgBoss...'); + + logger.log('info', `Clearing ${workerIntervals.length} worker intervals...`); + workerIntervals.forEach(clearInterval); + await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss stopped, waiting for main process cleanup...'); }); -// For use in other files -export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers }; +export { startWorkers }; diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 470cdacb..30ed892b 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -5,7 +5,7 @@ import { io, Socket } from "socket.io-client"; import { createRemoteBrowserForRun, destroyRemoteBrowser } from '../../browser-management/controller'; import logger from '../../logger'; import { browserPool, io as serverIo } from "../../server"; -import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; +import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; import Robot from "../../models/Robot"; import Run from "../../models/Run"; import { getDecryptedProxyConfig } from "../../routes/proxy"; @@ -14,7 +14,7 @@ import { capture } from "../../utils/analytics"; import { WorkflowFile } from "maxun-core"; import { Page } from "playwright"; import { sendWebhook } from "../../routes/webhook"; -import { airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable"; +import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable"; import { convertPageToMarkdown, convertPageToHTML } from "../../markdownify/scrape"; chromium.use(stealthPlugin()); @@ -104,24 +104,36 @@ async function createWorkflowAndStoreMetadata(id: string, userId: string) { } } +function withTimeout(promise: Promise, timeoutMs: number, operation: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs) + ) + ]); +} + async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { try { - googleSheetUpdateTasks[runId] = { + addGoogleSheetUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - airtableUpdateTasks[runId] = { + addAirtableUpdateTask(runId, { robotId: robotMetaId, runId: runId, status: 'pending', retries: 5, - }; + }); - processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); - processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); + withTimeout(processAirtableUpdates(), 65000, 'Airtable update') + .catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + + withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update') + .catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } @@ -250,15 +262,24 @@ async function executeRun(id: string, userId: string) { let html = ''; const serializableOutput: any = {}; + const SCRAPE_TIMEOUT = 120000; + // Markdown conversion - if (formats.includes('markdown')) { - markdown = await convertPageToMarkdown(url); + if (formats.includes("markdown")) { + const markdownPromise = convertPageToMarkdown(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + markdown = await Promise.race([markdownPromise, timeoutPromise]); serializableOutput.markdown = [{ content: markdown }]; } - // HTML conversion - if (formats.includes('html')) { - html = await convertPageToHTML(url); + if (formats.includes("html")) { + const htmlPromise = convertPageToHTML(url); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT); + }); + html = await Promise.race([htmlPromise, timeoutPromise]); serializableOutput.html = [{ content: html }]; } @@ -406,10 +427,18 @@ async function executeRun(id: string, userId: string) { // Set run ID for real-time data persistence browser.interpreter.setRunId(id); - const interpretationInfo = await browser.interpreter.InterpretRecording( + const INTERPRETATION_TIMEOUT = 600000; + + const interpretationPromise = browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT); + }); + + const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); @@ -523,9 +552,19 @@ async function executeRun(id: string, userId: string) { return true; } catch (error: any) { logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); - console.log(error.message); const run = await Run.findOne({ where: { runId: id } }); if (run) { + if (browser) { + try { + if (browser.interpreter) { + await browser.interpreter.clearState(); + } + await destroyRemoteBrowser(run.browserId, userId); + } catch (cleanupError: any) { + logger.error(`Failed to cleanup browser in error handler: ${cleanupError.message}`); + } + } + await run.update({ status: 'failed', finishedAt: new Date().toLocaleString(), @@ -586,7 +625,7 @@ async function executeRun(id: string, userId: string) { } } -async function readyForRunHandler(browserId: string, id: string, userId: string) { +async function readyForRunHandler(browserId: string, id: string, userId: string, socket: Socket) { try { const interpretation = await executeRun(id, userId); @@ -602,6 +641,8 @@ async function readyForRunHandler(browserId: string, id: string, userId: string) } catch (error: any) { logger.error(`Error during readyForRunHandler: ${error.message}`); await destroyRemoteBrowser(browserId, userId); + } finally { + cleanupSocketConnection(socket, browserId, id); } } @@ -611,6 +652,8 @@ function resetRecordingState(browserId: string, id: string) { } export async function handleRunRecording(id: string, userId: string) { + let socket: Socket | null = null; + try { const result = await createWorkflowAndStoreMetadata(id, userId); const { browserId, runId: newRunId } = result; @@ -619,27 +662,57 @@ export async function handleRunRecording(id: string, userId: string) { throw new Error('browserId or runId or userId is undefined'); } - const socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:8080'}/${browserId}`, { + const CONNECTION_TIMEOUT = 30000; + + socket = io(`${process.env.BACKEND_URL ? process.env.BACKEND_URL : 'http://localhost:5000'}/${browserId}`, { transports: ['websocket'], - rejectUnauthorized: false + rejectUnauthorized: false, + timeout: CONNECTION_TIMEOUT, }); - socket.on('ready-for-run', () => readyForRunHandler(browserId, newRunId, userId)); + const readyHandler = () => readyForRunHandler(browserId, newRunId, userId, socket!); + + socket.on('ready-for-run', readyHandler); + + socket.on('connect_error', (error: Error) => { + logger.error(`Socket connection error for scheduled run ${newRunId}: ${error.message}`); + cleanupSocketConnection(socket!, browserId, newRunId); + }); + + socket.on('disconnect', () => { + cleanupSocketConnection(socket!, browserId, newRunId); + }); logger.log('info', `Running robot: ${id}`); - socket.on('disconnect', () => { - cleanupSocketListeners(socket, browserId, newRunId, userId); - }); - } catch (error: any) { logger.error('Error running recording:', error); + if (socket) { + cleanupSocketConnection(socket, '', ''); + } } } -function cleanupSocketListeners(socket: Socket, browserId: string, id: string, userId: string) { - socket.off('ready-for-run', () => readyForRunHandler(browserId, id, userId)); - logger.log('info', `Cleaned up listeners for browserId: ${browserId}, runId: ${id}`); +function cleanupSocketConnection(socket: Socket, browserId: string, id: string) { + try { + socket.removeAllListeners(); + socket.disconnect(); + + if (browserId) { + const namespace = serverIo.of(browserId); + namespace.removeAllListeners(); + namespace.disconnectSockets(true); + const nsps = (serverIo as any)._nsps; + if (nsps && nsps.has(`/${browserId}`)) { + nsps.delete(`/${browserId}`); + logger.log('debug', `Deleted namespace /${browserId} from io._nsps Map`); + } + } + + logger.log('info', `Cleaned up socket connection for browserId: ${browserId}, runId: ${id}`); + } catch (error: any) { + logger.error(`Error cleaning up socket connection: ${error.message}`); + } } export { createWorkflowAndStoreMetadata }; \ No newline at end of file