feat: register worker for queue

This commit is contained in:
Rohit
2025-03-30 00:22:40 +05:30
parent d13e9d56ce
commit b2cd7c7bc2

View File

@@ -192,6 +192,8 @@ export async function scheduleWorkflow(id: string, userId: string, cronExpressio
{ id, runId, userId }, { id, runId, userId },
{ tz: timezone } { tz: timezone }
); );
await registerWorkerForQueue(queueName);
logger.log('info', `Scheduled workflow job for robot ${id}`); logger.log('info', `Scheduled workflow job for robot ${id}`);
} catch (error: unknown) { } catch (error: unknown) {
@@ -476,28 +478,10 @@ async function processScheduledWorkflow(job: Job<ScheduledWorkflowData>) {
*/ */
async function registerScheduledWorkflowWorker() { async function registerScheduledWorkflowWorker() {
try { try {
// First, get a list of all existing robots const jobs = await pgBoss.getSchedules();
const robots = await Robot.findAll({ for (const job of jobs) {
attributes: ['recording_meta.id'], await pgBoss.createQueue(job.name);
raw: true await registerWorkerForQueue(job.name);
});
// Register a worker for each potential robot queue
for (const robot of robots) {
if (robot.recording_meta && robot.recording_meta.id) {
const queueName = `scheduled-workflow-${robot.recording_meta.id}`;
await registerWorkerForQueue(queueName);
}
}
// Also register workers for any existing PgBoss queues that follow our naming pattern
const queues = await pgBoss.getQueues();
for (const queue of queues) {
if (queue.name.startsWith('scheduled-workflow-') &&
!queue.name.endsWith('_error') &&
!queue.name.endsWith('_completed')) {
await registerWorkerForQueue(queue.name);
}
} }
logger.log('info', 'Scheduled workflow workers registered successfully'); logger.log('info', 'Scheduled workflow workers registered successfully');