feat: add execution timeouts api, scheduled, manual run

This commit is contained in:
Rohit Rajan
2025-11-29 12:43:19 +05:30
parent 0e6bc22dce
commit 2a1d461357
3 changed files with 340 additions and 123 deletions

View File

@@ -15,8 +15,8 @@ import Robot from './models/Robot';
import { browserPool } from './server';
import { Page } from 'playwright';
import { capture } from './utils/analytics';
import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet';
import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
import { addGoogleSheetUpdateTask, googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet';
import { addAirtableUpdateTask, airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
import { io as serverIo } from "./server";
import { sendWebhook } from './routes/webhook';
import { BinaryOutputService } from './storage/mino';
@@ -59,7 +59,7 @@ interface AbortRunData {
const pgBoss = new PgBoss({
connectionString: pgBossConnectionString,
expireInHours: 23,
max: 3,
max: 5,
});
/**
@@ -86,26 +86,36 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
return copy;
};
function withTimeout<T>(promise: Promise<T>, timeoutMs: number, operation: string): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error(`${operation} timed out after ${timeoutMs}ms`)), timeoutMs)
)
]);
}
// Helper function to handle integration updates
async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise<void> {
try {
googleSheetUpdateTasks[runId] = {
addGoogleSheetUpdateTask(runId, {
robotId: robotMetaId,
runId: runId,
status: 'pending',
retries: 5,
};
});
airtableUpdateTasks[runId] = {
addAirtableUpdateTask(runId, {
robotId: robotMetaId,
runId: runId,
status: 'pending',
retries: 5,
};
});
processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`));
processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`));
withTimeout(processAirtableUpdates(), 65000, 'Airtable update')
.catch(err => logger.log('error', `Airtable update error: ${err.message}`));
withTimeout(processGoogleSheetUpdates(), 65000, 'Google Sheets update')
.catch(err => logger.log('error', `Google Sheets update error: ${err.message}`));
} catch (err: any) {
logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`);
}
@@ -115,8 +125,8 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr
* Modified processRunExecution function - only add browser reset
*/
async function processRunExecution(job: Job<ExecuteRunData>) {
const BROWSER_INIT_TIMEOUT = 60000;
const BROWSER_PAGE_TIMEOUT = 45000;
const BROWSER_INIT_TIMEOUT = 30000;
const BROWSER_PAGE_TIMEOUT = 15000;
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
@@ -211,15 +221,23 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
let html = '';
const serializableOutput: any = {};
// Markdown conversion
const SCRAPE_TIMEOUT = 120000;
if (formats.includes('markdown')) {
markdown = await convertPageToMarkdown(url);
const markdownPromise = convertPageToMarkdown(url);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`Markdown conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT);
});
markdown = await Promise.race([markdownPromise, timeoutPromise]);
serializableOutput.markdown = [{ content: markdown }];
}
// HTML conversion
if (formats.includes('html')) {
html = await convertPageToHTML(url);
const htmlPromise = convertPageToHTML(url);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`HTML conversion timed out after ${SCRAPE_TIMEOUT/1000}s`)), SCRAPE_TIMEOUT);
});
html = await Promise.race([htmlPromise, timeoutPromise]);
serializableOutput.html = [{ content: html }];
}
@@ -375,21 +393,33 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
logger.log('warn', `Failed to send run-started notification for API run ${plainRun.runId}: ${socketError.message}`);
}
// Execute the workflow
const workflow = AddGeneratedFlags(recording.recording);
browser.interpreter.setRunId(data.runId);
const interpretationInfo = await browser.interpreter.InterpretRecording(
workflow,
currentPage,
(newPage: Page) => currentPage = newPage,
plainRun.interpreterSettings
const INTERPRETATION_TIMEOUT = 600000;
const interpretationPromise = browser.interpreter.InterpretRecording(
AddGeneratedFlags(recording.recording),
currentPage,
(newPage: Page) => currentPage = newPage,
plainRun.interpreterSettings,
);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error(`Workflow interpretation timed out after ${INTERPRETATION_TIMEOUT/1000}s`)), INTERPRETATION_TIMEOUT);
});
const interpretationInfo = await Promise.race([interpretationPromise, timeoutPromise]);
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
try {
await browser.interpreter.clearState();
logger.debug(`Cleared interpreter state for aborted run ${data.runId}`);
} catch (clearError: any) {
logger.warn(`Failed to clear interpreter state on abort: ${clearError.message}`);
}
await destroyRemoteBrowser(plainRun.browserId, data.userId);
return { success: true };
@@ -635,6 +665,15 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0,
});
try {
if (browser && browser.interpreter) {
await browser.interpreter.clearState();
logger.debug(`Cleared interpreter state for failed run ${data.runId}`);
}
} catch (clearError: any) {
logger.warn(`Failed to clear interpreter state on error: ${clearError.message}`);
}
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after failed run`);
@@ -804,6 +843,8 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
const registeredUserQueues = new Map();
const registeredAbortQueues = new Map();
const workerIntervals: NodeJS.Timeout[] = [];
async function registerWorkerForQueue(queueName: string) {
if (!registeredUserQueues.has(queueName)) {
await pgBoss.work(queueName, async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
@@ -866,21 +907,7 @@ async function registerRunExecutionWorker() {
const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-'));
for (const queue of userQueues) {
if (!registeredUserQueues.has(queue.name)) {
await pgBoss.work(queue.name, async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
try {
const singleJob = Array.isArray(job) ? job[0] : job;
return await processRunExecution(singleJob);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`);
throw error;
}
});
registeredUserQueues.set(queue.name, true);
logger.log('info', `Registered worker for queue: ${queue.name}`);
}
await registerWorkerForQueue(queue.name);
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -890,10 +917,15 @@ async function registerRunExecutionWorker() {
await checkForNewUserQueues();
setInterval(async () => {
await checkForNewUserQueues();
const userQueueInterval = setInterval(async () => {
try {
await checkForNewUserQueues();
} catch (error: any) {
logger.log('error', `Error checking user queues: ${error.message}`);
}
}, 10000);
workerIntervals.push(userQueueInterval);
logger.log('info', 'Run execution worker registered successfully');
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -903,7 +935,6 @@ async function registerRunExecutionWorker() {
async function registerAbortRunWorker() {
try {
const registeredAbortQueues = new Map();
const checkForNewAbortQueues = async () => {
try {
@@ -912,25 +943,7 @@ async function registerAbortRunWorker() {
const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-'));
for (const queue of abortQueues) {
if (!registeredAbortQueues.has(queue.name)) {
await pgBoss.work(queue.name, async (job: Job<AbortRunData> | Job<AbortRunData>[]) => {
try {
const data = extractJobData(job);
const { userId, runId } = data;
logger.log('info', `Processing abort request for run ${runId} by user ${userId}`);
const success = await abortRun(runId, userId);
return { success };
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`);
throw error;
}
});
registeredAbortQueues.set(queue.name, true);
logger.log('info', `Registered abort worker for queue: ${queue.name}`);
}
await registerAbortWorkerForQueue(queue.name);
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -940,9 +953,14 @@ async function registerAbortRunWorker() {
await checkForNewAbortQueues();
setInterval(async () => {
await checkForNewAbortQueues();
const abortQueueInterval = setInterval(async () => {
try {
await checkForNewAbortQueues();
} catch (error: any) {
logger.log('error', `Error checking abort queues: ${error.message}`);
}
}, 10000);
workerIntervals.push(abortQueueInterval);
logger.log('info', 'Abort run worker registration system initialized');
} catch (error: unknown) {
@@ -1050,15 +1068,22 @@ pgBoss.on('error', (error) => {
// Handle graceful shutdown
process.on('SIGTERM', async () => {
logger.log('info', 'SIGTERM received, shutting down PgBoss...');
logger.log('info', `Clearing ${workerIntervals.length} worker intervals...`);
workerIntervals.forEach(clearInterval);
await pgBoss.stop();
process.exit(0);
logger.log('info', 'PgBoss stopped, waiting for main process cleanup...');
});
process.on('SIGINT', async () => {
logger.log('info', 'SIGINT received, shutting down PgBoss...');
logger.log('info', `Clearing ${workerIntervals.length} worker intervals...`);
workerIntervals.forEach(clearInterval);
await pgBoss.stop();
process.exit(0);
logger.log('info', 'PgBoss stopped, waiting for main process cleanup...');
});
// For use in other files
export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers };
export { startWorkers };