Merge branch 'develop' into abort-fix

This commit is contained in:
Rohit
2025-06-12 23:30:23 +05:30
committed by GitHub
12 changed files with 471 additions and 220 deletions

View File

@@ -47,6 +47,7 @@ interface InterpreterOptions {
activeId: (id: number) => void, activeId: (id: number) => void,
debugMessage: (msg: string) => void, debugMessage: (msg: string) => void,
setActionType: (type: string) => void, setActionType: (type: string) => void,
incrementScrapeListIndex: () => void,
}> }>
} }
@@ -475,6 +476,11 @@ export default class Interpreter extends EventEmitter {
} }
await this.ensureScriptsLoaded(page); await this.ensureScriptsLoaded(page);
if (this.options.debugChannel?.incrementScrapeListIndex) {
this.options.debugChannel.incrementScrapeListIndex();
}
if (!config.pagination) { if (!config.pagination) {
const scrapeResults: Record<string, any>[] = await page.evaluate((cfg) => window.scrapeList(cfg), config); const scrapeResults: Record<string, any>[] = await page.evaluate((cfg) => window.scrapeList(cfg), config);
await this.options.serializableCallback(scrapeResults); await this.options.serializableCallback(scrapeResults);
@@ -624,6 +630,8 @@ export default class Interpreter extends EventEmitter {
}); });
allResults = allResults.concat(newResults); allResults = allResults.concat(newResults);
debugLog("Results collected:", allResults.length); debugLog("Results collected:", allResults.length);
await this.options.serializableCallback(allResults);
}; };
const checkLimit = () => { const checkLimit = () => {

View File

@@ -474,7 +474,8 @@
"schedule_success": "Roboter {{name}} erfolgreich geplant", "schedule_success": "Roboter {{name}} erfolgreich geplant",
"schedule_failed": "Planen des Roboters {{name}} fehlgeschlagen", "schedule_failed": "Planen des Roboters {{name}} fehlgeschlagen",
"abort_success": "Interpretation des Roboters {{name}} erfolgreich abgebrochen", "abort_success": "Interpretation des Roboters {{name}} erfolgreich abgebrochen",
"abort_failed": "Abbrechen der Interpretation des Roboters {{name}} fehlgeschlagen" "abort_failed": "Abbrechen der Interpretation des Roboters {{name}} fehlgeschlagen",
"abort_initiated": "Interpretation des Roboters {{name}} wird abgebrochen"
}, },
"menu": { "menu": {
"recordings": "Roboter", "recordings": "Roboter",

View File

@@ -487,7 +487,8 @@
"schedule_success": "Robot {{name}} scheduled successfully", "schedule_success": "Robot {{name}} scheduled successfully",
"schedule_failed": "Failed to schedule robot {{name}}", "schedule_failed": "Failed to schedule robot {{name}}",
"abort_success": "Interpretation of robot {{name}} aborted successfully", "abort_success": "Interpretation of robot {{name}} aborted successfully",
"abort_failed": "Failed to abort the interpretation of robot {{name}}" "abort_failed": "Failed to abort the interpretation of robot {{name}}",
"abort_initiated": "Aborting the interpretation of robot {{name}}"
}, },
"menu": { "menu": {
"recordings": "Robots", "recordings": "Robots",

View File

@@ -475,7 +475,8 @@
"schedule_success": "Robot {{name}} programado exitosamente", "schedule_success": "Robot {{name}} programado exitosamente",
"schedule_failed": "Error al programar el robot {{name}}", "schedule_failed": "Error al programar el robot {{name}}",
"abort_success": "Interpretación del robot {{name}} abortada exitosamente", "abort_success": "Interpretación del robot {{name}} abortada exitosamente",
"abort_failed": "Error al abortar la interpretación del robot {{name}}" "abort_failed": "Error al abortar la interpretación del robot {{name}}",
"abort_initiated": "Cancelando la interpretación del robot {{name}}"
}, },
"menu": { "menu": {
"recordings": "Robots", "recordings": "Robots",

View File

@@ -475,7 +475,8 @@
"schedule_success": "ロボット{{name}}のスケジュールが正常に設定されました", "schedule_success": "ロボット{{name}}のスケジュールが正常に設定されました",
"schedule_failed": "ロボット{{name}}のスケジュール設定に失敗しました", "schedule_failed": "ロボット{{name}}のスケジュール設定に失敗しました",
"abort_success": "ロボット{{name}}の解釈を中止しました", "abort_success": "ロボット{{name}}の解釈を中止しました",
"abort_failed": "ロボット{{name}}の解釈中止に失敗しました" "abort_failed": "ロボット{{name}}の解釈中止に失敗しました",
"abort_initiated": "ロボット {{name}} の解釈を中止しています"
}, },
"menu": { "menu": {
"recordings": "ロボット", "recordings": "ロボット",

View File

@@ -475,7 +475,8 @@
"schedule_success": "机器人{{name}}调度成功", "schedule_success": "机器人{{name}}调度成功",
"schedule_failed": "机器人{{name}}调度失败", "schedule_failed": "机器人{{name}}调度失败",
"abort_success": "成功中止机器人{{name}}的解释", "abort_success": "成功中止机器人{{name}}的解释",
"abort_failed": "中止机器人{{name}}的解释失败" "abort_failed": "中止机器人{{name}}的解释失败",
"abort_initiated": "正在中止机器人 {{name}} 的解释"
}, },
"menu": { "menu": {
"recordings": "机器人", "recordings": "机器人",

View File

@@ -82,11 +82,140 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
return copy; return copy;
}; };
/**
* Helper function to extract and process scraped data from browser interpreter
*/
async function extractAndProcessScrapedData(
browser: RemoteBrowser,
run: any
): Promise<{
categorizedOutput: any;
uploadedBinaryOutput: any;
totalDataPointsExtracted: number;
totalSchemaItemsExtracted: number;
totalListItemsExtracted: number;
extractedScreenshotsCount: number;
}> {
let categorizedOutput: {
scrapeSchema: Record<string, any>;
scrapeList: Record<string, any>;
} = {
scrapeSchema: {},
scrapeList: {}
};
if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) {
browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => {
categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem;
});
}
if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) {
browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => {
categorizedOutput.scrapeList[`list-${index}`] = listItem;
});
}
const binaryOutput = browser?.interpreter?.binaryData?.reduce(
(reducedObject: Record<string, any>, item: any, index: number): Record<string, any> => {
return {
[`item-${index}`]: item,
...reducedObject,
};
},
{}
) || {};
let totalDataPointsExtracted = 0;
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
schemaResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalDataPointsExtracted += Object.keys(schemaResult).length;
totalSchemaItemsExtracted += 1;
}
});
}
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
listResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalListItemsExtracted += listResult.length;
}
});
}
if (binaryOutput) {
extractedScreenshotsCount = Object.keys(binaryOutput).length;
totalDataPointsExtracted += extractedScreenshotsCount;
}
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(
run,
binaryOutput
);
return {
categorizedOutput: {
scrapeSchema: categorizedOutput.scrapeSchema || {},
scrapeList: categorizedOutput.scrapeList || {}
},
uploadedBinaryOutput,
totalDataPointsExtracted,
totalSchemaItemsExtracted,
totalListItemsExtracted,
extractedScreenshotsCount
};
}
// Helper function to handle integration updates
async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise<void> {
try {
googleSheetUpdateTasks[runId] = {
robotId: robotMetaId,
runId: runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[runId] = {
robotId: robotMetaId,
runId: runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`);
}
}
/**
* Modified processRunExecution function - only add browser reset
*/
async function processRunExecution(job: Job<ExecuteRunData>) { async function processRunExecution(job: Job<ExecuteRunData>) {
const BROWSER_INIT_TIMEOUT = 30000; const BROWSER_INIT_TIMEOUT = 30000;
const data = job.data; const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}`); logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
try { try {
// Find the run // Find the run
@@ -108,51 +237,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
throw new Error(`No browser ID available for run ${data.runId}`); throw new Error(`No browser ID available for run ${data.runId}`);
} }
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
logger.log('error', `Recording for run ${data.runId} not found`);
const currentRun = await Run.findOne({ where: { runId: data.runId } });
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed: Recording not found',
});
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: 'Unknown Robot',
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: "Failed: Recording not found",
type: 'RecordingNotFoundError'
},
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
}
return { success: false };
}
logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`); logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`);
// Get the browser and execute the run
let browser = browserPool.getRemoteBrowser(browserId); let browser = browserPool.getRemoteBrowser(browserId);
const browserWaitStart = Date.now(); const browserWaitStart = Date.now();
@@ -168,7 +254,14 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
logger.log('info', `Browser ${browserId} found and ready for execution`); logger.log('info', `Browser ${browserId} found and ready for execution`);
try { try {
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
if (!recording) {
throw new Error(`Recording for run ${data.runId} not found`);
}
const isRunAborted = async (): Promise<boolean> => { const isRunAborted = async (): Promise<boolean> => {
const currentRun = await Run.findOne({ where: { runId: data.runId } }); const currentRun = await Run.findOne({ where: { runId: data.runId } });
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
@@ -182,7 +275,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
await new Promise(resolve => setTimeout(resolve, 1000)); await new Promise(resolve => setTimeout(resolve, 1000));
currentPage = browser.getCurrentPage(); currentPage = browser.getCurrentPage();
} }
if (!currentPage) { if (!currentPage) {
throw new Error(`No current page available for browser ${browserId} after timeout`); throw new Error(`No current page available for browser ${browserId} after timeout`);
} }
@@ -200,25 +293,27 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (await isRunAborted()) { if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
await destroyRemoteBrowser(plainRun.browserId, data.userId); await destroyRemoteBrowser(plainRun.browserId, data.userId);
return { success: true }; return { success: true };
} }
logger.log('info', `Workflow execution completed for run ${data.runId}`);
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
return { success: true };
}
const categorizedOutput = { const categorizedOutput = {
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
scrapeList: interpretationInfo.scrapeListOutput || {} scrapeList: interpretationInfo.scrapeListOutput || {}
}; };
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
return { success: true };
}
await run.update({ await run.update({
...run, ...run,
status: 'success', status: 'success',
@@ -233,6 +328,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}); });
// Track extraction metrics // Track extraction metrics
let totalDataPointsExtracted = 0;
let totalSchemaItemsExtracted = 0; let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0; let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0; let extractedScreenshotsCount = 0;
@@ -240,23 +336,35 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (categorizedOutput.scrapeSchema) { if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) { if (Array.isArray(schemaResult)) {
schemaResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalSchemaItemsExtracted += schemaResult.length; totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') { } else if (schemaResult && typeof schemaResult === 'object') {
totalDataPointsExtracted += Object.keys(schemaResult).length;
totalSchemaItemsExtracted += 1; totalSchemaItemsExtracted += 1;
} }
}); });
} }
if (categorizedOutput.scrapeList) { if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) { if (Array.isArray(listResult)) {
listResult.forEach(obj => {
if (obj && typeof obj === 'object') {
totalDataPointsExtracted += Object.keys(obj).length;
}
});
totalListItemsExtracted += listResult.length; totalListItemsExtracted += listResult.length;
} }
}); });
} }
if (uploadedBinaryOutput) { if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
totalDataPointsExtracted += extractedScreenshotsCount;
} }
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
@@ -265,6 +373,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`); console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`);
// Capture metrics // Capture metrics
capture( capture(
@@ -295,7 +404,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
total_rows: totalRowsExtracted, total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted, captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted, captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount screenshots_count: extractedScreenshotsCount,
total_data_points_extracted: totalDataPointsExtracted,
}, },
metadata: { metadata: {
browser_id: plainRun.browserId, browser_id: plainRun.browserId,
@@ -311,111 +421,213 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
} }
// Schedule updates for Google Sheets and Airtable // Schedule updates for Google Sheets and Airtable
try { await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
googleSheetUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[plainRun.runId] = { const completionData = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
}
serverIo.of(plainRun.browserId).emit('run-completed', {
runId: data.runId, runId: data.runId,
robotMetaId: plainRun.robotMetaId, robotMetaId: plainRun.robotMetaId,
robotName: recording.recording_meta.name, robotName: recording.recording_meta.name,
status: 'success', status: 'success',
finishedAt: new Date().toLocaleString() finishedAt: new Date().toLocaleString()
}); };
await destroyRemoteBrowser(plainRun.browserId, data.userId); serverIo.of(browserId).emit('run-completed', completionData);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData);
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`);
return { success: true }; return { success: true };
} catch (executionError: any) { } catch (executionError: any) {
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`); logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
const currentRun = await Run.findOne({ where: { runId: data.runId } }); let partialDataExtracted = false;
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { let partialData: any = null;
await run.update({ let partialUpdateData: any = {
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`,
};
try {
if (browser && browser.interpreter) {
const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0;
const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0;
const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0;
if (hasSchemaData || hasListData || hasBinaryData) {
logger.log('info', `Extracting partial data from failed run ${data.runId}`);
partialData = await extractAndProcessScrapedData(browser, run);
partialUpdateData.serializableOutput = {
scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema),
scrapeList: Object.values(partialData.categorizedOutput.scrapeList),
};
partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput;
partialDataExtracted = true;
logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`);
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
}
}
} catch (partialDataError: any) {
logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`);
}
await run.update(partialUpdateData);
try {
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failureData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed', status: 'failed',
finishedAt: new Date().toLocaleString(), finishedAt: new Date().toLocaleString(),
log: `Failed: ${executionError.message}`, hasPartialData: partialDataExtracted
});
// Capture failure metrics
capture(
'maxun-oss-run-created-manual',
{
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
}
);
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: recording.recording_meta.name,
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: executionError.message,
stack: executionError.stack,
type: executionError.name || 'ExecutionError'
},
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
}; };
try { serverIo.of(browserId).emit('run-completed', failureData);
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); } catch (emitError: any) {
} catch (webhookError: any) { logger.log('warn', `Failed to emit failure event: ${emitError.message}`);
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
} else {
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
} }
await destroyRemoteBrowser(plainRun.browserId, data.userId); const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
return { success: false }; const failedWebhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: executionError.message,
stack: executionError.stack,
type: 'ExecutionError',
},
partial_data_extracted: partialDataExtracted,
extracted_data: partialDataExtracted ? {
captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [],
captured_lists: partialUpdateData.serializableOutput?.scrapeList || {},
total_data_points_extracted: partialData?.totalDataPointsExtracted || 0,
captured_texts_count: partialData?.totalSchemaItemsExtracted || 0,
captured_lists_count: partialData?.totalListItemsExtracted || 0,
screenshots_count: partialData?.extractedScreenshotsCount || 0
} : null,
metadata: {
browser_id: plainRun.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
try {
const failureSocketData = {
runId: data.runId,
robotMetaId: run.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString()
};
serverIo.of(run.browserId).emit('run-completed', failureSocketData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData);
} catch (socketError: any) {
logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`);
}
capture('maxun-oss-run-created-manual', {
runId: data.runId,
user_id: data.userId,
created_at: new Date().toISOString(),
status: 'failed',
error_message: executionError.message,
partial_data_extracted: partialDataExtracted,
totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0,
});
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after failed run`);
return { success: false, partialDataExtracted };
} }
} catch (error: unknown) { } catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error); const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to process run execution job: ${errorMessage}`); logger.log('error', `Failed to process run execution job: ${errorMessage}`);
try {
const run = await Run.findOne({ where: { runId: data.runId }});
if (run) {
await run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed: ${errorMessage}`,
});
const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true });
const failedWebhookPayload = {
robot_id: run.robotMetaId,
run_id: data.runId,
robot_name: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
started_at: run.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: errorMessage,
},
metadata: {
browser_id: run.browserId,
user_id: data.userId,
}
};
try {
await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`);
}
try {
const failureSocketData = {
runId: data.runId,
robotMetaId: run.robotMetaId,
robotName: recording ? recording.recording_meta.name : 'Unknown Robot',
status: 'failed',
finishedAt: new Date().toLocaleString()
};
serverIo.of(run.browserId).emit('run-completed', failureSocketData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData);
} catch (socketError: any) {
logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`);
}
}
} catch (updateError: any) {
logger.log('error', `Failed to update run status: ${updateError.message}`);
}
return { success: false }; return { success: false };
} }
} }
async function abortRun(runId: string, userId: string): Promise<boolean> { async function abortRun(runId: string, userId: string): Promise<boolean> {
try { try {
const run = await Run.findOne({ const run = await Run.findOne({ where: { runId: runId } });
where: {
runId: runId,
runByUserId: userId
}
});
if (!run) { if (!run) {
logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`);
@@ -466,32 +678,9 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
} }
let currentLog = 'Run aborted by user'; let currentLog = 'Run aborted by user';
let categorizedOutput = { const extractedData = await extractAndProcessScrapedData(browser, run);
scrapeSchema: {},
scrapeList: {}, console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`);
};
let binaryOutput: Record<string, any> = {};
try {
if (browser.interpreter) {
if (browser.interpreter.debugMessages) {
currentLog = browser.interpreter.debugMessages.join('\n') || currentLog;
}
if (browser.interpreter.serializableDataByType) {
categorizedOutput = {
scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []),
scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []),
};
}
if (browser.interpreter.binaryData) {
binaryOutput = collectBinaryData(browser.interpreter.binaryData);
}
}
} catch (interpreterError) {
logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`);
}
await run.update({ await run.update({
status: 'aborted', status: 'aborted',
@@ -499,12 +688,16 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
browserId: plainRun.browserId, browserId: plainRun.browserId,
log: currentLog, log: currentLog,
serializableOutput: { serializableOutput: {
scrapeSchema: Object.values(categorizedOutput.scrapeSchema), scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema),
scrapeList: Object.values(categorizedOutput.scrapeList), scrapeList: Object.values(extractedData.categorizedOutput.scrapeList),
}, },
binaryOutput, binaryOutput: extractedData.uploadedBinaryOutput,
}); });
if (extractedData.totalDataPointsExtracted > 0) {
await triggerIntegrationUpdates(runId, plainRun.robotMetaId);
}
try { try {
serverIo.of(plainRun.browserId).emit('run-aborted', { serverIo.of(plainRun.browserId).emit('run-aborted', {
runId, runId,
@@ -515,7 +708,7 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
} catch (socketError) { } catch (socketError) {
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
} }
try { try {
await new Promise(resolve => setTimeout(resolve, 500)); await new Promise(resolve => setTimeout(resolve, 500));
@@ -533,30 +726,6 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
} }
} }
/**
* Helper function to collect data from arrays into indexed objects
* @param dataArray Array of data to be transformed into an object with indexed keys
* @returns Object with indexed keys
*/
function collectDataByType(dataArray: any[]): Record<string, any> {
return dataArray.reduce((result: Record<string, any>, item, index) => {
result[`item-${index}`] = item;
return result;
}, {});
}
/**
* Helper function to collect binary data (like screenshots)
* @param binaryDataArray Array of binary data objects to be transformed
* @returns Object with indexed keys
*/
function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record<string, any> {
return binaryDataArray.reduce((result: Record<string, any>, item, index) => {
result[`item-${index}`] = item;
return result;
}, {});
}
async function registerRunExecutionWorker() { async function registerRunExecutionWorker() {
try { try {
const registeredUserQueues = new Map(); const registeredUserQueues = new Map();

View File

@@ -952,21 +952,40 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest,
*/ */
router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
try { try {
if (!req.user) { if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); }
return res.status(401).send({ error: 'Unauthorized' });
} const run = await Run.findOne({ where: { runId: req.params.id } });
const run = await Run.findOne({
where: {
runId: req.params.id,
runByUserId: req.user.id,
}
});
if (!run) { if (!run) {
return res.status(404).send({ error: 'Run not found' }); return res.status(404).send({ error: 'Run not found' });
} }
if (!['running', 'queued'].includes(run.status)) {
return res.status(400).send({
error: `Cannot abort run with status: ${run.status}`
});
}
const isQueued = run.status === 'queued';
await run.update({
status: 'aborting'
});
if (isQueued) {
await run.update({
status: 'aborted',
finishedAt: new Date().toLocaleString(),
log: 'Run aborted while queued'
});
return res.send({
success: true,
message: 'Queued run aborted',
isQueued: true
});
}
if (!['running', 'queued'].includes(run.status)) { if (!['running', 'queued'].includes(run.status)) {
return res.status(400).send({ return res.status(400).send({
error: `Cannot abort run with status: ${run.status}` error: `Cannot abort run with status: ${run.status}`
@@ -997,12 +1016,13 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
logger.log('info', `Abort signal sent for run ${req.params.id}, job ID: ${jobId}`); logger.log('info', `Abort signal sent for run ${req.params.id}, job ID: ${jobId}`);
return res.send({ return res.send({
success: true, success: true,
message: 'Abort signal sent', message: 'Abort signal sent',
jobId jobId,
}); isQueued: false
});
} catch (e) { } catch (e) {
const { message } = e as Error; const { message } = e as Error;
logger.log('error', `Error aborting run ${req.params.id}: ${message}`); logger.log('error', `Error aborting run ${req.params.id}: ${message}`);

View File

@@ -6,6 +6,7 @@ import logger from './logger';
import Robot from './models/Robot'; import Robot from './models/Robot';
import { handleRunRecording } from './workflow-management/scheduler'; import { handleRunRecording } from './workflow-management/scheduler';
import { computeNextRun } from './utils/schedule'; import { computeNextRun } from './utils/schedule';
import { v4 as uuid } from "uuid";
if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) { if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
throw new Error('One or more required environment variables are missing.'); throw new Error('One or more required environment variables are missing.');
@@ -32,7 +33,7 @@ interface ScheduledWorkflowData {
*/ */
export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> { export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> {
try { try {
const runId = require('uuidv4').uuid(); const runId = uuid();
const queueName = `scheduled-workflow-${id}`; const queueName = `scheduled-workflow-${id}`;

View File

@@ -107,6 +107,11 @@ export class WorkflowInterpreter {
*/ */
public binaryData: { mimetype: string, data: string }[] = []; public binaryData: { mimetype: string, data: string }[] = [];
/**
* Track current scrapeList index
*/
private currentScrapeListIndex: number = 0;
/** /**
* An array of id's of the pairs from the workflow that are about to be paused. * An array of id's of the pairs from the workflow that are about to be paused.
* As "breakpoints". * As "breakpoints".
@@ -288,6 +293,7 @@ export class WorkflowInterpreter {
scrapeList: [], scrapeList: [],
}; };
this.binaryData = []; this.binaryData = [];
this.currentScrapeListIndex = 0;
} }
/** /**
@@ -322,6 +328,9 @@ export class WorkflowInterpreter {
}, },
setActionType: (type: string) => { setActionType: (type: string) => {
this.currentActionType = type; this.currentActionType = type;
},
incrementScrapeListIndex: () => {
this.currentScrapeListIndex++;
} }
}, },
serializableCallback: (data: any) => { serializableCallback: (data: any) => {
@@ -334,7 +343,7 @@ export class WorkflowInterpreter {
this.serializableDataByType.scrapeSchema.push([data]); this.serializableDataByType.scrapeSchema.push([data]);
} }
} else if (this.currentActionType === 'scrapeList') { } else if (this.currentActionType === 'scrapeList') {
this.serializableDataByType.scrapeList.push(data); this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data;
} }
this.socket.emit('serializableCallback', data); this.socket.emit('serializableCallback', data);

View File

@@ -205,20 +205,24 @@ export const interpretStoredRecording = async (id: string): Promise<boolean> =>
} }
} }
export const notifyAboutAbort = async (id: string): Promise<boolean> => { export const notifyAboutAbort = async (id: string): Promise<{ success: boolean; isQueued?: boolean }> => {
try { try {
const response = await axios.post(`${apiUrl}/storage/runs/abort/${id}`); const response = await axios.post(`${apiUrl}/storage/runs/abort/${id}`, { withCredentials: true });
if (response.status === 200) { if (response.status === 200) {
return response.data; return {
success: response.data.success,
isQueued: response.data.isQueued
};
} else { } else {
throw new Error(`Couldn't abort a running recording with id ${id}`); throw new Error(`Couldn't abort a running recording with id ${id}`);
} }
} catch (error: any) { } catch (error: any) {
console.log(error); console.log(error);
return false; return { success: false };
} }
} }
export const scheduleStoredRecording = async (id: string, settings: ScheduleSettings): Promise<ScheduleRunResponse> => { export const scheduleStoredRecording = async (id: string, settings: ScheduleSettings): Promise<ScheduleRunResponse> => {
try { try {
const response = await axios.put( const response = await axios.put(

View File

@@ -52,16 +52,51 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
const navigate = useNavigate(); const navigate = useNavigate();
const abortRunHandler = (runId: string, robotName: string, browserId: string) => { const abortRunHandler = (runId: string, robotName: string, browserId: string) => {
notify('info', t('main_page.notifications.abort_initiated', { name: robotName }));
aborted = true; aborted = true;
notifyAboutAbort(runId).then(async (response) => { notifyAboutAbort(runId).then(async (response) => {
if (response) { if (!response.success) {
notify('success', t('main_page.notifications.abort_success', { name: robotName }));
await stopRecording(ids.browserId);
} else {
notify('error', t('main_page.notifications.abort_failed', { name: robotName })); notify('error', t('main_page.notifications.abort_failed', { name: robotName }));
setRerenderRuns(true);
return;
} }
setRerenderRuns(true);
}) if (response.isQueued) {
setRerenderRuns(true);
notify('success', t('main_page.notifications.abort_success', { name: robotName }));
setQueuedRuns(prev => {
const newSet = new Set(prev);
newSet.delete(runId);
return newSet;
});
return;
}
const abortSocket = io(`${apiUrl}/${browserId}`, {
transports: ["websocket"],
rejectUnauthorized: false
});
abortSocket.on('run-aborted', (abortData) => {
if (abortData.runId === runId) {
notify('success', t('main_page.notifications.abort_success', { name: abortData.robotName || robotName }));
setRerenderRuns(true);
abortSocket.disconnect();
}
});
abortSocket.on('connect_error', (error) => {
console.log('Abort socket connection error:', error);
notify('error', t('main_page.notifications.abort_failed', { name: robotName }));
setRerenderRuns(true);
abortSocket.disconnect();
});
});
} }
const setRecordingInfo = (id: string, name: string) => { const setRecordingInfo = (id: string, name: string) => {