diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 8373b680..190afdad 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -192,6 +192,8 @@ export async function scheduleWorkflow(id: string, userId: string, cronExpressio { id, runId, userId }, { tz: timezone } ); + + await registerWorkerForQueue(queueName); logger.log('info', `Scheduled workflow job for robot ${id}`); } catch (error: unknown) { @@ -476,28 +478,10 @@ async function processScheduledWorkflow(job: Job) { */ async function registerScheduledWorkflowWorker() { try { - // First, get a list of all existing robots - const robots = await Robot.findAll({ - attributes: ['recording_meta.id'], - raw: true - }); - - // Register a worker for each potential robot queue - for (const robot of robots) { - if (robot.recording_meta && robot.recording_meta.id) { - const queueName = `scheduled-workflow-${robot.recording_meta.id}`; - await registerWorkerForQueue(queueName); - } - } - - // Also register workers for any existing PgBoss queues that follow our naming pattern - const queues = await pgBoss.getQueues(); - for (const queue of queues) { - if (queue.name.startsWith('scheduled-workflow-') && - !queue.name.endsWith('_error') && - !queue.name.endsWith('_completed')) { - await registerWorkerForQueue(queue.name); - } + const jobs = await pgBoss.getSchedules(); + for (const job of jobs) { + await pgBoss.createQueue(job.name); + await registerWorkerForQueue(job.name); } logger.log('info', 'Scheduled workflow workers registered successfully');