diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 3b68ec22..bc33f3dc 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -17,7 +17,7 @@ import { capture } from "../utils/analytics"; import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; -import { pgBoss } from '../pgboss-worker'; +import { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue } from '../pgboss-worker'; chromium.use(stealthPlugin()); export const router = Router(); @@ -573,6 +573,7 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => try { const userQueueName = `execute-run-user-${req.user.id}`; await pgBoss.createQueue(userQueueName); + await registerWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, @@ -690,6 +691,7 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re // Queue the execution job await pgBoss.createQueue(userQueueName); + await registerWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, @@ -962,6 +964,7 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, const userQueueName = `abort-run-user-${req.user.id}`; await pgBoss.createQueue(userQueueName); + await registerAbortWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, @@ -1029,6 +1032,7 @@ async function processQueuedRuns() { const userQueueName = `execute-run-user-${userId}`; await pgBoss.createQueue(userQueueName); + await registerWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: userId,