feat: add abort checks and update status
This commit is contained in:
@@ -180,6 +180,11 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
return { success: false };
|
||||
}
|
||||
|
||||
if (run.status === 'aborted' || run.status === 'aborting') {
|
||||
logger.log('info', `Run ${data.runId} has status ${run.status}, skipping execution`);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
const plainRun = run.toJSON();
|
||||
|
||||
// Find the recording
|
||||
@@ -187,12 +192,14 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
if (!recording) {
|
||||
logger.log('error', `Recording for run ${data.runId} not found`);
|
||||
|
||||
// Update run status to failed
|
||||
await run.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: 'Failed: Recording not found',
|
||||
});
|
||||
const currentRun = await Run.findOne({ where: { runId: data.runId } });
|
||||
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
|
||||
await run.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: 'Failed: Recording not found',
|
||||
});
|
||||
}
|
||||
|
||||
// Check for queued runs even if this one failed
|
||||
await checkAndProcessQueuedRun(data.userId, data.browserId);
|
||||
@@ -207,8 +214,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
if (!browser || !currentPage) {
|
||||
logger.log('error', `Browser or page not available for run ${data.runId}`);
|
||||
|
||||
await pgBoss.fail(job.id, "Failed to get browser or page for run");
|
||||
|
||||
// Even if this run failed, check for queued runs
|
||||
await checkAndProcessQueuedRun(data.userId, data.browserId);
|
||||
|
||||
@@ -219,6 +224,11 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
// Reset the browser state before executing this run
|
||||
await resetBrowserState(browser);
|
||||
|
||||
const isRunAborted = async (): Promise<boolean> => {
|
||||
const currentRun = await Run.findOne({ where: { runId: data.runId } });
|
||||
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
|
||||
};
|
||||
|
||||
// Execute the workflow
|
||||
const workflow = AddGeneratedFlags(recording.recording);
|
||||
const interpretationInfo = await browser.interpreter.InterpretRecording(
|
||||
@@ -228,10 +238,28 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
plainRun.interpreterSettings
|
||||
);
|
||||
|
||||
if (await isRunAborted()) {
|
||||
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
|
||||
|
||||
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
|
||||
|
||||
if (!queuedRunProcessed) {
|
||||
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
||||
}
|
||||
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
// Process the results
|
||||
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
|
||||
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
|
||||
|
||||
if (await isRunAborted()) {
|
||||
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
// Update the run record with results
|
||||
await run.update({
|
||||
...run,
|
||||
@@ -322,11 +350,28 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
} catch (executionError: any) {
|
||||
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
|
||||
|
||||
await run.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: `Failed: ${executionError.message}`,
|
||||
});
|
||||
const currentRun = await Run.findOne({ where: { runId: data.runId } });
|
||||
if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) {
|
||||
await run.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: `Failed: ${executionError.message}`,
|
||||
});
|
||||
|
||||
// Capture failure metrics
|
||||
capture(
|
||||
'maxun-oss-run-created-manual',
|
||||
{
|
||||
runId: data.runId,
|
||||
user_id: data.userId,
|
||||
created_at: new Date().toISOString(),
|
||||
status: 'failed',
|
||||
error_message: executionError.message,
|
||||
}
|
||||
);
|
||||
} else {
|
||||
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
|
||||
}
|
||||
|
||||
// Check for queued runs before destroying the browser
|
||||
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
|
||||
@@ -340,18 +385,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Capture failure metrics
|
||||
capture(
|
||||
'maxun-oss-run-created-manual',
|
||||
{
|
||||
runId: data.runId,
|
||||
user_id: data.userId,
|
||||
created_at: new Date().toISOString(),
|
||||
status: 'failed',
|
||||
error_message: executionError.message,
|
||||
}
|
||||
);
|
||||
|
||||
return { success: false };
|
||||
}
|
||||
@@ -377,9 +410,26 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||
return false;
|
||||
}
|
||||
|
||||
await run.update({
|
||||
status: 'aborting'
|
||||
});
|
||||
|
||||
const plainRun = run.toJSON();
|
||||
|
||||
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
|
||||
const recording = await Robot.findOne({
|
||||
where: { 'recording_meta.id': plainRun.robotMetaId },
|
||||
raw: true
|
||||
});
|
||||
|
||||
const robotName = recording?.recording_meta?.name || 'Unknown Robot';
|
||||
|
||||
let browser;
|
||||
try {
|
||||
browser = browserPool.getRemoteBrowser(plainRun.browserId);
|
||||
} catch (browserError) {
|
||||
logger.log('warn', `Could not get browser for run ${runId}: ${browserError}`);
|
||||
browser = null;
|
||||
}
|
||||
|
||||
if (!browser) {
|
||||
await run.update({
|
||||
@@ -388,36 +438,80 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||
log: 'Aborted: Browser not found or already closed'
|
||||
});
|
||||
|
||||
try {
|
||||
serverIo.of(plainRun.browserId).emit('run-aborted', {
|
||||
runId,
|
||||
robotName: robotName,
|
||||
status: 'aborted',
|
||||
finishedAt: new Date().toLocaleString()
|
||||
});
|
||||
} catch (socketError) {
|
||||
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
|
||||
}
|
||||
|
||||
logger.log('warn', `Browser not found for run ${runId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
const currentLog = browser.interpreter.debugMessages.join('\n');
|
||||
const serializableOutput: Record<string, any> = {};
|
||||
browser.interpreter.serializableData.forEach((item, index) => {
|
||||
serializableOutput[`item-${index}`] = item;
|
||||
});
|
||||
let currentLog = 'Run aborted by user';
|
||||
let serializableOutput: Record<string, any> = {};
|
||||
let binaryOutput: Record<string, any> = {};
|
||||
|
||||
const binaryOutput: Record<string, any> = {};
|
||||
browser.interpreter.binaryData.forEach((item, index) => {
|
||||
binaryOutput[`item-${index}`] = item;
|
||||
});
|
||||
try {
|
||||
if (browser.interpreter) {
|
||||
if (browser.interpreter.debugMessages) {
|
||||
currentLog = browser.interpreter.debugMessages.join('\n') || currentLog;
|
||||
}
|
||||
|
||||
if (browser.interpreter.serializableData) {
|
||||
browser.interpreter.serializableData.forEach((item, index) => {
|
||||
serializableOutput[`item-${index}`] = item;
|
||||
});
|
||||
}
|
||||
|
||||
if (browser.interpreter.binaryData) {
|
||||
browser.interpreter.binaryData.forEach((item, index) => {
|
||||
binaryOutput[`item-${index}`] = item;
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (interpreterError) {
|
||||
logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`);
|
||||
}
|
||||
|
||||
await run.update({
|
||||
status: 'aborted',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
browserId: plainRun.browserId,
|
||||
log: currentLog || 'Run aborted by user',
|
||||
log: currentLog,
|
||||
serializableOutput,
|
||||
binaryOutput,
|
||||
});
|
||||
|
||||
const queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId);
|
||||
try {
|
||||
serverIo.of(plainRun.browserId).emit('run-aborted', {
|
||||
runId,
|
||||
robotName: robotName,
|
||||
status: 'aborted',
|
||||
finishedAt: new Date().toLocaleString()
|
||||
});
|
||||
} catch (socketError) {
|
||||
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
|
||||
}
|
||||
|
||||
let queuedRunProcessed = false;
|
||||
try {
|
||||
queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId);
|
||||
} catch (queueError) {
|
||||
logger.log('warn', `Error checking queued runs: ${queueError}`);
|
||||
}
|
||||
|
||||
if (!queuedRunProcessed) {
|
||||
try {
|
||||
await new Promise(resolve => setTimeout(resolve, 500));
|
||||
|
||||
await destroyRemoteBrowser(plainRun.browserId, userId);
|
||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
||||
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
|
||||
} catch (cleanupError) {
|
||||
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user