feat: user specific queue run execution

This commit is contained in:
Rohit
2025-03-12 23:09:48 +05:30
parent 5657abb21e
commit d6b75757d1

View File

@@ -117,6 +117,7 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom
const queuedRun = await Run.findOne({
where: {
browserId: browserId,
runByUserId: userId,
status: 'queued'
},
order: [['startedAt', 'ASC']]
@@ -140,9 +141,12 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom
log: 'Run started - using browser from previous run'
});
// Use user-specific queue
const userQueueName = `execute-run-user-${userId}`;
// Schedule the run execution
await pgBoss.createQueue('execute-run');
const executeJobId = await pgBoss.send('execute-run', {
await pgBoss.createQueue(userQueueName);
const executeJobId = await pgBoss.send(userQueueName, {
userId: userId,
runId: queuedRun.runId,
browserId: browserId
@@ -358,7 +362,9 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
async function registerRunExecutionWorker() {
try {
// Worker for executing runs
const registeredUserQueues = new Map();
// Worker for executing runs (Legacy)
await pgBoss.work('execute-run', async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
try {
const singleJob = Array.isArray(job) ? job[0] : job;
@@ -370,7 +376,36 @@ async function registerRunExecutionWorker() {
}
});
// setInterval(checkForStuckQueuedRuns, 30000);
const checkForNewUserQueues = async () => {
try {
const activeQueues = await pgBoss.getQueues();
const userQueues = activeQueues.filter(q => q.name.startsWith('execute-run-user-'));
for (const queue of userQueues) {
if (!registeredUserQueues.has(queue.name)) {
await pgBoss.work(queue.name, async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
try {
const singleJob = Array.isArray(job) ? job[0] : job;
return await processRunExecution(singleJob);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Run execution job failed in ${queue.name}: ${errorMessage}`);
throw error;
}
});
registeredUserQueues.set(queue.name, true);
logger.log('info', `Registered worker for queue: ${queue.name}`);
}
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to check for new user queues: ${errorMessage}`);
}
};
await checkForNewUserQueues();
logger.log('info', 'Run execution worker registered successfully');
} catch (error: unknown) {