diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index f59c250d..235e5f74 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -15,8 +15,8 @@ import { computeNextRun } from '../utils/schedule'; import { capture } from "../utils/analytics"; import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; -import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; -import { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue } from '../pgboss-worker'; +import { cancelScheduledWorkflow, scheduleWorkflow } from '../storage/schedule'; +import { pgBossClient } from '../storage/pgboss'; chromium.use(stealthPlugin()); export const router = Router(); @@ -590,7 +590,7 @@ router.delete('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) * PUT endpoint for starting a remote browser instance and saving run metadata to the storage. * Making it ready for interpretation and returning a runId. * - * If the user has reached their browser limit, the run will be queued using PgBoss. + * If the user has reached their browser limit, the run will be queued using pgBossClient. */ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { try { @@ -664,10 +664,9 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => try { const userQueueName = `execute-run-user-${req.user.id}`; - await pgBoss.createQueue(userQueueName); - await registerWorkerForQueue(userQueueName); - - const jobId = await pgBoss.send(userQueueName, { + await pgBossClient.createQueue(userQueueName); + + const jobId = await pgBossClient.send(userQueueName, { userId: req.user.id, runId: runId, browserId: browserId, @@ -782,10 +781,9 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re const userQueueName = `execute-run-user-${req.user.id}`; // Queue the execution job - await pgBoss.createQueue(userQueueName); - await registerWorkerForQueue(userQueueName); + await pgBossClient.createQueue(userQueueName); - const jobId = await pgBoss.send(userQueueName, { + const jobId = await pgBossClient.send(userQueueName, { userId: req.user.id, runId: req.params.id, browserId: plainRun.browserId @@ -974,7 +972,7 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, return res.status(404).json({ error: 'Robot not found' }); } - // Cancel the scheduled job in PgBoss + // Cancel the scheduled job in pgBossClient try { await cancelScheduledWorkflow(id); } catch (error) { @@ -1055,10 +1053,9 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, } const userQueueName = `abort-run-user-${req.user.id}`; - await pgBoss.createQueue(userQueueName); - await registerAbortWorkerForQueue(userQueueName); - - const jobId = await pgBoss.send(userQueueName, { + await pgBossClient.createQueue(userQueueName); + + const jobId = await pgBossClient.send(userQueueName, { userId: req.user.id, runId: req.params.id }); @@ -1123,10 +1120,9 @@ async function processQueuedRuns() { }); const userQueueName = `execute-run-user-${userId}`; - await pgBoss.createQueue(userQueueName); - await registerWorkerForQueue(userQueueName); + await pgBossClient.createQueue(userQueueName); - const jobId = await pgBoss.send(userQueueName, { + const jobId = await pgBossClient.send(userQueueName, { userId: userId, runId: queuedRun.runId, browserId: newBrowserId,