feat: queue and execute robot run via worker

This commit is contained in:
Rohit
2025-03-12 19:26:08 +05:30
parent ea6730208d
commit 7b4e689a7c

View File

@@ -7,8 +7,19 @@ import {
initializeRemoteBrowserForRecording,
destroyRemoteBrowser,
interpretWholeWorkflow,
stopRunningInterpretation
stopRunningInterpretation,
createRemoteBrowserForRun
} from './browser-management/controller';
import { WorkflowFile } from 'maxun-core';
import Run from './models/Run';
import Robot from './models/Robot';
import { browserPool } from './server';
import { Page } from 'playwright';
import { BinaryOutputService } from './storage/mino';
import { capture } from './utils/analytics';
import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet';
import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
import { RemoteBrowser } from './browser-management/classes/RemoteBrowser';
const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
@@ -29,6 +40,12 @@ interface DestroyBrowserData {
userId: string;
}
interface ExecuteRunData {
userId: string;
runId: string;
browserId: string;
}
const pgBoss = new PgBoss({connectionString: pgBossConnectionString, schema: 'public'});
/**
@@ -44,6 +61,323 @@ function extractJobData<T>(job: Job<T> | Job<T>[]): T {
return job.data;
}
function AddGeneratedFlags(workflow: WorkflowFile) {
const copy = JSON.parse(JSON.stringify(workflow));
for (let i = 0; i < workflow.workflow.length; i++) {
copy.workflow[i].what.unshift({
action: 'flag',
args: ['generated'],
});
}
return copy;
};
/**
* Function to reset browser state without creating a new browser
*/
async function resetBrowserState(browser: RemoteBrowser): Promise<boolean> {
try {
const currentPage = browser.getCurrentPage();
if (!currentPage) {
logger.log('error', 'No current page available to reset browser state');
return false;
}
// Navigate to blank page to reset state
await currentPage.goto('about:blank');
// Clear browser storage
await currentPage.evaluate(() => {
try {
localStorage.clear();
sessionStorage.clear();
} catch (e) {
// Ignore errors in cleanup
}
});
// Clear cookies
const context = currentPage.context();
await context.clearCookies();
return true;
} catch (error) {
logger.log('error', `Failed to reset browser state`);
return false;
}
}
/**
* Modified checkAndProcessQueuedRun function - only changes browser reset logic
*/
async function checkAndProcessQueuedRun(userId: string, browserId: string): Promise<boolean> {
try {
// Find the oldest queued run for this specific browser
const queuedRun = await Run.findOne({
where: {
browserId: browserId,
status: 'queued'
},
order: [['startedAt', 'ASC']]
});
if (!queuedRun) {
logger.log('info', `No queued runs found for browser ${browserId}`);
return false;
}
// Reset the browser state before next run
const browser = browserPool.getRemoteBrowser(browserId);
if (browser) {
logger.log('info', `Resetting browser state for browser ${browserId} before next run`);
await resetBrowserState(browser);
}
// Update the queued run to running status
await queuedRun.update({
status: 'running',
log: 'Run started - using browser from previous run'
});
// Schedule the run execution
await pgBoss.createQueue('execute-run');
const executeJobId = await pgBoss.send('execute-run', {
userId: userId,
runId: queuedRun.runId,
browserId: browserId
});
logger.log('info', `Scheduled queued run ${queuedRun.runId} to use browser ${browserId}, job ID: ${executeJobId}`);
return true;
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Error checking for queued runs: ${errorMessage}`);
return false;
}
}
/**
* Modified processRunExecution function - only add browser reset
*/
async function processRunExecution(job: Job<ExecuteRunData>) {
try {
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
// Find the run
const run = await Run.findOne({ where: { runId: data.runId } });
if (!run) {
logger.log('error', `Run ${data.runId} not found in database`);
return { success: false };
}
const plainRun = run.toJSON();
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
logger.log('error', `Recording for run ${data.runId} not found`);
// Update run status to failed
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed: Recording not found',
});
// Check for queued runs even if this one failed
await checkAndProcessQueuedRun(data.userId, data.browserId);
return { success: false };
}
// Get the browser and execute the run
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
let currentPage = browser?.getCurrentPage();
if (!browser || !currentPage) {
logger.log('error', `Browser or page not available for run ${data.runId}`);
// Update run status to failed
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed: Browser or page not available',
});
await pgBoss.fail(job.id, "Failed to get browser or page for run");
// Even if this run failed, check for queued runs
await checkAndProcessQueuedRun(data.userId, data.browserId);
return { success: false };
}
try {
// Reset the browser state before executing this run
await resetBrowserState(browser);
// Execute the workflow
const workflow = AddGeneratedFlags(recording.recording);
const interpretationInfo = await browser.interpreter.InterpretRecording(
workflow,
currentPage,
(newPage: Page) => currentPage = newPage,
plainRun.interpreterSettings
);
// Process the results
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
// Update the run record with results
await run.update({
...run,
status: 'success',
finishedAt: new Date().toLocaleString(),
browserId: plainRun.browserId,
log: interpretationInfo.log.join('\n'),
serializableOutput: interpretationInfo.serializableOutput,
binaryOutput: uploadedBinaryOutput,
});
// Track extraction metrics
let totalRowsExtracted = 0;
let extractedScreenshotsCount = 0;
let extractedItemsCount = 0;
if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) {
extractedScreenshotsCount = 1;
}
if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) {
const itemsArray = run.dataValues.serializableOutput["item-0"];
extractedItemsCount = itemsArray.length;
totalRowsExtracted = itemsArray.reduce((total, item) => {
return total + Object.keys(item).length;
}, 0);
}
console.log(`Extracted Items Count: ${extractedItemsCount}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
// Capture metrics
capture(
'maxun-oss-run-created-manual',
{
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'success',
totalRowsExtracted,
extractedItemsCount,
extractedScreenshotsCount,
}
);
// Schedule updates for Google Sheets and Airtable
try {
googleSheetUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
}
// Check for and process queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
// Only destroy the browser if no queued run was found
if (!queuedRunProcessed) {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
}
return { success: true };
} catch (executionError: any) {
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`,
});
// Check for queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
// Only destroy the browser if no queued run was found
if (!queuedRunProcessed) {
try {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
} catch (cleanupError: any) {
logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`);
}
}
// Capture failure metrics
capture(
'maxun-oss-run-created-manual',
{
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
}
);
return { success: false };
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to process run execution job: ${errorMessage}`);
return { success: false };
}
}
async function registerRunExecutionWorker() {
try {
// Worker for executing runs
await pgBoss.work('execute-run', async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
try {
const singleJob = Array.isArray(job) ? job[0] : job;
return await processRunExecution(singleJob);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Run execution job failed: ${errorMessage}`);
throw error;
}
});
// setInterval(checkForStuckQueuedRuns, 30000);
logger.log('info', 'Run execution worker registered successfully');
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to register run execution worker: ${errorMessage}`);
}
}
/**
* Initialize PgBoss and register all workers
*/
@@ -120,6 +454,9 @@ async function startWorkers() {
throw error;
}
});
// Register the run execution worker
await registerRunExecutionWorker();
logger.log('info', 'All recording workers registered successfully');
} catch (error: unknown) {