feat: add queued runs execution logic
This commit is contained in:
@@ -575,11 +575,9 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) =>
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
emailId: req.user.email,
|
||||
userId: req.user.id,
|
||||
runId: runId,
|
||||
browserId: browserId,
|
||||
interpreterSettings: req.body
|
||||
});
|
||||
|
||||
logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`);
|
||||
@@ -635,12 +633,7 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) =>
|
||||
}
|
||||
} catch (e) {
|
||||
const { message } = e as Error;
|
||||
logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
|
||||
|
||||
if (message.includes('invalid input syntax for type uuid')) {
|
||||
return res.status(400).send({ error: 'Invalid UUID format detected' });
|
||||
}
|
||||
|
||||
logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
|
||||
return res.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
});
|
||||
@@ -952,3 +945,73 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
|
||||
return res.send(false);
|
||||
}
|
||||
});
|
||||
|
||||
async function processQueuedRuns() {
|
||||
try {
|
||||
const queuedRun = await Run.findOne({
|
||||
where: { status: 'queued' },
|
||||
order: [['startedAt', 'ASC']]
|
||||
});
|
||||
|
||||
if (!queuedRun) return;
|
||||
|
||||
const userId = queuedRun.runByUserId;
|
||||
|
||||
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run");
|
||||
|
||||
if (canCreateBrowser) {
|
||||
logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`);
|
||||
|
||||
const recording = await Robot.findOne({
|
||||
where: {
|
||||
'recording_meta.id': queuedRun.robotMetaId
|
||||
},
|
||||
raw: true
|
||||
});
|
||||
|
||||
if (!recording) {
|
||||
await queuedRun.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: 'Recording not found'
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const newBrowserId = await createRemoteBrowserForRun(userId);
|
||||
|
||||
logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`);
|
||||
|
||||
await queuedRun.update({
|
||||
status: 'running',
|
||||
browserId: newBrowserId,
|
||||
log: 'Browser created and ready for execution'
|
||||
});
|
||||
|
||||
const userQueueName = `execute-run-user-${userId}`;
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
userId: userId,
|
||||
runId: queuedRun.runId,
|
||||
browserId: newBrowserId,
|
||||
});
|
||||
|
||||
logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`);
|
||||
|
||||
} catch (browserError: any) {
|
||||
logger.log('error', `Failed to create browser for queued run: ${browserError.message}`);
|
||||
await queuedRun.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: `Failed to create browser: ${browserError.message}`
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Error processing queued runs: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export { processQueuedRuns };
|
||||
|
||||
Reference in New Issue
Block a user