feat: revamp process run execution

This commit is contained in:
Rohit
2025-06-12 11:03:39 +05:30
parent b0403aeb40
commit 67e6e0c3c1

View File

@@ -212,10 +212,12 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr
* Modified processRunExecution function - only add browser reset
*/
async function processRunExecution(job: Job<ExecuteRunData>) {
try {
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
const BROWSER_INIT_TIMEOUT = 30000;
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
try {
// Find the run
const run = await Run.findOne({ where: { runId: data.runId } });
if (!run) {
@@ -229,64 +231,56 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
const plainRun = run.toJSON();
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
logger.log('error', `Recording for run ${data.runId} not found`);
const currentRun = await Run.findOne({ where: { runId: data.runId } });
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed: Recording not found',
});
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: 'Unknown Robot',
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: "Failed: Recording not found",
type: 'RecordingNotFoundError'
},
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
}
return { success: false };
}
// Get the browser and execute the run
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
let currentPage = browser?.getCurrentPage();
const browserId = data.browserId || plainRun.browserId;
if (!browser || !currentPage) {
logger.log('error', `Browser or page not available for run ${data.runId}`);
return { success: false };
if (!browserId) {
throw new Error(`No browser ID available for run ${data.runId}`);
}
logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`);
let browser = browserPool.getRemoteBrowser(browserId);
const browserWaitStart = Date.now();
while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) {
logger.log('debug', `Browser ${browserId} not ready yet, waiting...`);
await new Promise(resolve => setTimeout(resolve, 1000));
browser = browserPool.getRemoteBrowser(browserId);
}
if (!browser) {
throw new Error(`Browser ${browserId} not found in pool after timeout`);
}
logger.log('info', `Browser ${browserId} found and ready for execution`);
try {
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
throw new Error(`Recording for run ${data.runId} not found`);
}
const isRunAborted = async (): Promise<boolean> => {
const currentRun = await Run.findOne({ where: { runId: data.runId } });
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
};
let currentPage = browser.getCurrentPage();
const pageWaitStart = Date.now();
while (!currentPage && (Date.now() - pageWaitStart) < 30000) {
logger.log('debug', `Page not ready for browser ${browserId}, waiting...`);
await new Promise(resolve => setTimeout(resolve, 1000));
currentPage = browser.getCurrentPage();
}
if (!currentPage) {
throw new Error(`No current page available for browser ${browserId} after timeout`);
}
logger.log('info', `Starting workflow execution for run ${data.runId}`);
// Execute the workflow
const workflow = AddGeneratedFlags(recording.recording);
@@ -299,23 +293,27 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
await destroyRemoteBrowser(plainRun.browserId, data.userId);
return { success: true };
}
logger.log('info', `Workflow execution completed for run ${data.runId}`);
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
return { success: true };
}
const categorizedOutput = {
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
scrapeList: interpretationInfo.scrapeListOutput || {}
};
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
return { success: true };
}
await run.update({
...run,
status: 'success',
@@ -330,6 +328,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
});
// Track extraction metrics
let totalDataPointsExtracted = 0;
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
@@ -337,23 +336,35 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
schemaResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalDataPointsExtracted += Object.keys(schemaResult).length;
totalSchemaItemsExtracted += 1;
}
});
}
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
listResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalListItemsExtracted += listResult.length;
}
});
}
if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
totalDataPointsExtracted += extractedScreenshotsCount;
}
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
@@ -362,6 +373,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`);
// Capture metrics
capture(
@@ -392,7 +404,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount
screenshots_count: extractedScreenshotsCount,
total_data_points_extracted: totalDataPointsExtracted,
},
metadata: {
browser_id: plainRun.browserId,
@@ -408,94 +421,206 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
// Schedule updates for Google Sheets and Airtable
try {
googleSheetUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
airtableUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
}
serverIo.of(plainRun.browserId).emit('run-completed', {
const completionData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording.recording_meta.name,
status: 'success',
finishedAt: new Date().toLocaleString()
});
};
serverIo.of(browserId).emit('run-completed', completionData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData);
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`);
return { success: true };
} catch (executionError: any) {
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
const currentRun = await Run.findOne({ where: { runId: data.runId } });
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
await run.update({
let partialDataExtracted = false;
let partialData: any = null;
let partialUpdateData: any = {
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`,
};
try {
if (browser && browser.interpreter) {
const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0;
const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0;
const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0;
if (hasSchemaData || hasListData || hasBinaryData) {
logger.log('info', `Extracting partial data from failed run ${data.runId}`);
partialData = await extractAndProcessScrapedData(browser, run);
partialUpdateData.serializableOutput = {
scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema),
scrapeList: Object.values(partialData.categorizedOutput.scrapeList),
};
partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput;
partialDataExtracted = true;
logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`);
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
}
}
} catch (partialDataError: any) {
logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`);
}
await run.update(partialUpdateData);
try {
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failureData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`,
});
// Capture failure metrics
capture(
'maxun-oss-run-created-manual',
{
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
}
);
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: recording.recording_meta.name,
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: executionError.message,
stack: executionError.stack,
type: executionError.name || 'ExecutionError'
},
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
hasPartialData: partialDataExtracted
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
} else {
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
serverIo.of(browserId).emit('run-completed', failureData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData);
} catch (emitError: any) {
logger.log('warn', `Failed to emit failure event: ${emitError.message}`);
}
return { success: false };
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: executionError.message,
stack: executionError.stack,
type: 'ExecutionError',
},
partial_data_extracted: partialDataExtracted,
extracted_data: partialDataExtracted ? {
captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [],
captured_lists: partialUpdateData.serializableOutput?.scrapeList || {},
total_data_points_extracted: partialData?.totalDataPointsExtracted || 0,
captured_texts_count: partialData?.totalSchemaItemsExtracted || 0,
captured_lists_count: partialData?.totalListItemsExtracted || 0,
screenshots_count: partialData?.extractedScreenshotsCount || 0
} : null,
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
try {
const failureSocketData = {
runId: data.runId,
robotMetaId: run.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString()
};
serverIo.of(run.browserId).emit('run-completed', failureSocketData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData);
} catch (socketError: any) {
logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`);
}
capture('maxun-oss-run-created-manual', {
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
partial_data_extracted: partialDataExtracted,
totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0,
});
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after failed run`);
return { success: false, partialDataExtracted };
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to process run execution job: ${errorMessage}`);
try {
const run = await Run.findOne({ where: { runId: data.runId }});
if (run) {
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${errorMessage}`,
});
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failedWebhookPayload = {
robot_id: run.robotMetaId,
run_id: data.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: run.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: errorMessage,
},
metadata: {
browser_id: run.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
try {
const failureSocketData = {
runId: data.runId,
robotMetaId: run.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString()
};
serverIo.of(run.browserId).emit('run-completed', failureSocketData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData);
} catch (socketError: any) {
logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`);
}
}
} catch (updateError: any) {
logger.log('error', `Failed to update run status: ${updateError.message}`);
}
return { success: false };
}
}