diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index af7850f0..603cde14 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -22,7 +22,7 @@ import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { Page } from 'playwright'; import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable'; -import { pgBoss } from '../pgboss-worker'; +import { cancelScheduledWorkflow, pgBoss, scheduleWorkflow } from '../pgboss-worker'; chromium.use(stealthPlugin()); export const router = Router(); @@ -792,17 +792,13 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re return res.status(401).json({ error: 'Unauthorized' }); } - // Create the job in the queue with the cron expression - const job = await workflowQueue.add( - 'run workflow', - { id, runId: uuid(), userId: req.user.id }, - { - repeat: { - pattern: cronExpression, - tz: timezone, - }, - } - ); + try { + await cancelScheduledWorkflow(id); + } catch (cancelError) { + logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`); + } + + const jobId = await scheduleWorkflow(id, req.user.id, cronExpression, timezone); const nextRunAt = computeNextRun(cronExpression, timezone); @@ -877,12 +873,12 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, return res.status(404).json({ error: 'Robot not found' }); } - // Remove existing job from queue if it exists - const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']); - for (const job of existingJobs) { - if (job.data.id === id) { - await job.remove(); - } + // Cancel the scheduled job in PgBoss + try { + await cancelScheduledWorkflow(id); + } catch (error) { + logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`); + // Continue with robot update even if cancellation fails } // Delete the schedule from the robot