feat: revamp process run execution
This commit is contained in:
@@ -82,14 +82,13 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
|
|||||||
return copy;
|
return copy;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Modified processRunExecution function - only add browser reset
|
|
||||||
*/
|
|
||||||
async function processRunExecution(job: Job<ExecuteRunData>) {
|
async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||||
try {
|
const BROWSER_INIT_TIMEOUT = 30000;
|
||||||
const data = job.data;
|
|
||||||
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
|
|
||||||
|
|
||||||
|
const data = job.data;
|
||||||
|
logger.log('info', `Processing run execution job for runId: ${data.runId}`);
|
||||||
|
|
||||||
|
try {
|
||||||
// Find the run
|
// Find the run
|
||||||
const run = await Run.findOne({ where: { runId: data.runId } });
|
const run = await Run.findOne({ where: { runId: data.runId } });
|
||||||
if (!run) {
|
if (!run) {
|
||||||
@@ -103,6 +102,11 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const plainRun = run.toJSON();
|
const plainRun = run.toJSON();
|
||||||
|
const browserId = data.browserId || plainRun.browserId;
|
||||||
|
|
||||||
|
if (!browserId) {
|
||||||
|
throw new Error(`No browser ID available for run ${data.runId}`);
|
||||||
|
}
|
||||||
|
|
||||||
// Find the recording
|
// Find the recording
|
||||||
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
|
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
|
||||||
@@ -146,22 +150,45 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
return { success: false };
|
return { success: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`);
|
||||||
|
|
||||||
// Get the browser and execute the run
|
// Get the browser and execute the run
|
||||||
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
|
let browser = browserPool.getRemoteBrowser(browserId);
|
||||||
let currentPage = browser?.getCurrentPage();
|
const browserWaitStart = Date.now();
|
||||||
|
|
||||||
if (!browser || !currentPage) {
|
while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) {
|
||||||
logger.log('error', `Browser or page not available for run ${data.runId}`);
|
logger.log('debug', `Browser ${browserId} not ready yet, waiting...`);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||||
return { success: false };
|
browser = browserPool.getRemoteBrowser(browserId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!browser) {
|
||||||
|
throw new Error(`Browser ${browserId} not found in pool after timeout`);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.log('info', `Browser ${browserId} found and ready for execution`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const isRunAborted = async (): Promise<boolean> => {
|
const isRunAborted = async (): Promise<boolean> => {
|
||||||
const currentRun = await Run.findOne({ where: { runId: data.runId } });
|
const currentRun = await Run.findOne({ where: { runId: data.runId } });
|
||||||
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
|
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let currentPage = browser.getCurrentPage();
|
||||||
|
|
||||||
|
const pageWaitStart = Date.now();
|
||||||
|
while (!currentPage && (Date.now() - pageWaitStart) < 30000) {
|
||||||
|
logger.log('debug', `Page not ready for browser ${browserId}, waiting...`);
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||||
|
currentPage = browser.getCurrentPage();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!currentPage) {
|
||||||
|
throw new Error(`No current page available for browser ${browserId} after timeout`);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.log('info', `Starting workflow execution for run ${data.runId}`);
|
||||||
|
|
||||||
// Execute the workflow
|
// Execute the workflow
|
||||||
const workflow = AddGeneratedFlags(recording.recording);
|
const workflow = AddGeneratedFlags(recording.recording);
|
||||||
const interpretationInfo = await browser.interpreter.InterpretRecording(
|
const interpretationInfo = await browser.interpreter.InterpretRecording(
|
||||||
@@ -313,7 +340,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
finishedAt: new Date().toLocaleString()
|
finishedAt: new Date().toLocaleString()
|
||||||
});
|
});
|
||||||
|
|
||||||
// Only destroy the browser if no queued run was found
|
|
||||||
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
||||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
||||||
|
|
||||||
@@ -370,12 +396,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
|
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
||||||
await destroyRemoteBrowser(plainRun.browserId, data.userId);
|
|
||||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
|
||||||
} catch (cleanupError: any) {
|
|
||||||
logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
return { success: false };
|
return { success: false };
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user