feat: rm logic to process queued runs
This commit is contained in:
@@ -82,52 +82,6 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
|
|||||||
return copy;
|
return copy;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Modified checkAndProcessQueuedRun function - only changes browser reset logic
|
|
||||||
*/
|
|
||||||
async function checkAndProcessQueuedRun(userId: string, browserId: string): Promise<boolean> {
|
|
||||||
try {
|
|
||||||
// Find the oldest queued run for this specific browser
|
|
||||||
const queuedRun = await Run.findOne({
|
|
||||||
where: {
|
|
||||||
browserId: browserId,
|
|
||||||
runByUserId: userId,
|
|
||||||
status: 'queued'
|
|
||||||
},
|
|
||||||
order: [['startedAt', 'ASC']]
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!queuedRun) {
|
|
||||||
logger.log('info', `No queued runs found for browser ${browserId}`);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the queued run to running status
|
|
||||||
await queuedRun.update({
|
|
||||||
status: 'running',
|
|
||||||
log: 'Run started - using browser from previous run'
|
|
||||||
});
|
|
||||||
|
|
||||||
// Use user-specific queue
|
|
||||||
const userQueueName = `execute-run-user-${userId}`;
|
|
||||||
|
|
||||||
// Schedule the run execution
|
|
||||||
await pgBoss.createQueue(userQueueName);
|
|
||||||
const executeJobId = await pgBoss.send(userQueueName, {
|
|
||||||
userId: userId,
|
|
||||||
runId: queuedRun.runId,
|
|
||||||
browserId: browserId
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.log('info', `Scheduled queued run ${queuedRun.runId} to use browser ${browserId}, job ID: ${executeJobId}`);
|
|
||||||
return true;
|
|
||||||
} catch (error: unknown) {
|
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
||||||
logger.log('error', `Error checking for queued runs: ${errorMessage}`);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Modified processRunExecution function - only add browser reset
|
* Modified processRunExecution function - only add browser reset
|
||||||
*/
|
*/
|
||||||
@@ -189,9 +143,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for queued runs even if this one failed
|
|
||||||
await checkAndProcessQueuedRun(data.userId, data.browserId);
|
|
||||||
|
|
||||||
return { success: false };
|
return { success: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,9 +153,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
if (!browser || !currentPage) {
|
if (!browser || !currentPage) {
|
||||||
logger.log('error', `Browser or page not available for run ${data.runId}`);
|
logger.log('error', `Browser or page not available for run ${data.runId}`);
|
||||||
|
|
||||||
// Even if this run failed, check for queued runs
|
|
||||||
await checkAndProcessQueuedRun(data.userId, data.browserId);
|
|
||||||
|
|
||||||
return { success: false };
|
return { success: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -227,13 +175,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
if (await isRunAborted()) {
|
if (await isRunAborted()) {
|
||||||
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
|
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
|
||||||
|
|
||||||
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
|
|
||||||
|
|
||||||
if (!queuedRunProcessed) {
|
|
||||||
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
|
||||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
|
||||||
}
|
|
||||||
|
|
||||||
return { success: true };
|
return { success: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -371,15 +312,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
finishedAt: new Date().toLocaleString()
|
finishedAt: new Date().toLocaleString()
|
||||||
});
|
});
|
||||||
|
|
||||||
// Check for and process queued runs before destroying the browser
|
|
||||||
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
|
|
||||||
|
|
||||||
// Only destroy the browser if no queued run was found
|
|
||||||
if (!queuedRunProcessed) {
|
|
||||||
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
|
||||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
|
||||||
}
|
|
||||||
|
|
||||||
return { success: true };
|
return { success: true };
|
||||||
} catch (executionError: any) {
|
} catch (executionError: any) {
|
||||||
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
|
logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`);
|
||||||
@@ -433,19 +365,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
|
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for queued runs before destroying the browser
|
|
||||||
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
|
|
||||||
|
|
||||||
// Only destroy the browser if no queued run was found
|
|
||||||
if (!queuedRunProcessed) {
|
|
||||||
try {
|
|
||||||
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
|
||||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
|
||||||
} catch (cleanupError: any) {
|
|
||||||
logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return { success: false };
|
return { success: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -563,25 +482,7 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
|||||||
} catch (socketError) {
|
} catch (socketError) {
|
||||||
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
|
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
let queuedRunProcessed = false;
|
|
||||||
try {
|
|
||||||
queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId);
|
|
||||||
} catch (queueError) {
|
|
||||||
logger.log('warn', `Error checking queued runs: ${queueError}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!queuedRunProcessed) {
|
|
||||||
try {
|
|
||||||
await new Promise(resolve => setTimeout(resolve, 500));
|
|
||||||
|
|
||||||
await destroyRemoteBrowser(plainRun.browserId, userId);
|
|
||||||
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
|
|
||||||
} catch (cleanupError) {
|
|
||||||
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
|||||||
Reference in New Issue
Block a user