feat: register workers for queue

This commit is contained in:
Rohit Rajan
2025-09-10 10:18:35 +05:30
parent 2e2115ccc9
commit 5830982f33

View File

@@ -608,9 +608,52 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
}
}
// Track registered queues globally for individual queue registration
const registeredUserQueues = new Map();
const registeredAbortQueues = new Map();
async function registerWorkerForQueue(queueName: string) {
if (!registeredUserQueues.has(queueName)) {
await pgBoss.work(queueName, async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
try {
const singleJob = Array.isArray(job) ? job[0] : job;
return await processRunExecution(singleJob);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Run execution job failed in ${queueName}: ${errorMessage}`);
throw error;
}
});
registeredUserQueues.set(queueName, true);
logger.log('info', `Registered worker for queue: ${queueName}`);
}
}
async function registerAbortWorkerForQueue(queueName: string) {
if (!registeredAbortQueues.has(queueName)) {
await pgBoss.work(queueName, async (job: Job<AbortRunData> | Job<AbortRunData>[]) => {
try {
const data = extractJobData(job);
const { userId, runId } = data;
logger.log('info', `Processing abort request for run ${runId} by user ${userId}`);
const success = await abortRun(runId, userId);
return { success };
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Abort run job failed in ${queueName}: ${errorMessage}`);
throw error;
}
});
registeredAbortQueues.set(queueName, true);
logger.log('info', `Registered abort worker for queue: ${queueName}`);
}
}
async function registerRunExecutionWorker() {
try {
const registeredUserQueues = new Map();
// Worker for executing runs (Legacy)
await pgBoss.work('execute-run', async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
@@ -826,4 +869,4 @@ process.on('SIGINT', async () => {
});
// For use in other files
export { pgBoss, startWorkers };
export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers };