Merge pull request #474 from getmaxun/urun-queue
feat: unique queue robot run execution for users
This commit is contained in:
@@ -117,6 +117,7 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom
|
||||
const queuedRun = await Run.findOne({
|
||||
where: {
|
||||
browserId: browserId,
|
||||
runByUserId: userId,
|
||||
status: 'queued'
|
||||
},
|
||||
order: [['startedAt', 'ASC']]
|
||||
@@ -140,9 +141,12 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom
|
||||
log: 'Run started - using browser from previous run'
|
||||
});
|
||||
|
||||
// Use user-specific queue
|
||||
const userQueueName = `execute-run-user-${userId}`;
|
||||
|
||||
// Schedule the run execution
|
||||
await pgBoss.createQueue('execute-run');
|
||||
const executeJobId = await pgBoss.send('execute-run', {
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
const executeJobId = await pgBoss.send(userQueueName, {
|
||||
userId: userId,
|
||||
runId: queuedRun.runId,
|
||||
browserId: browserId
|
||||
@@ -358,7 +362,9 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
|
||||
async function registerRunExecutionWorker() {
|
||||
try {
|
||||
// Worker for executing runs
|
||||
const registeredUserQueues = new Map();
|
||||
|
||||
// Worker for executing runs (Legacy)
|
||||
await pgBoss.work('execute-run', async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
|
||||
try {
|
||||
const singleJob = Array.isArray(job) ? job[0] : job;
|
||||
@@ -370,7 +376,36 @@ async function registerRunExecutionWorker() {
|
||||
}
|
||||
});
|
||||
|
||||
// setInterval(checkForStuckQueuedRuns, 30000);
|
||||
const checkForNewUserQueues = async () => {
|
||||
try {
|
||||
const activeQueues = await pgBoss.getQueues();
|
||||
|
||||
const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-'));
|
||||
|
||||
for (const queue of userQueues) {
|
||||
if (!registeredUserQueues.has(queue.name)) {
|
||||
await pgBoss.work(queue.name, async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
|
||||
try {
|
||||
const singleJob = Array.isArray(job) ? job[0] : job;
|
||||
return await processRunExecution(singleJob);
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
registeredUserQueues.set(queue.name, true);
|
||||
logger.log('info', `Registered worker for queue: ${queue.name}`);
|
||||
}
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to check for new user queues: ${errorMessage}`);
|
||||
}
|
||||
};
|
||||
|
||||
await checkForNewUserQueues();
|
||||
|
||||
logger.log('info', 'Run execution worker registered successfully');
|
||||
} catch (error: unknown) {
|
||||
|
||||
@@ -658,10 +658,12 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re
|
||||
}
|
||||
|
||||
try {
|
||||
const userQueueName = `execute-run-user-${req.user.id}`;
|
||||
|
||||
// Queue the execution job
|
||||
await pgBoss.createQueue('execute-run');
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send('execute-run', {
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
userId: req.user.id,
|
||||
runId: req.params.id,
|
||||
browserId: plainRun.browserId
|
||||
|
||||
Reference in New Issue
Block a user