feat: schedule routes using pgboss queue

This commit is contained in:
Rohit
2025-03-28 17:24:58 +05:30
parent d891aa14ac
commit 3977b6feb4

View File

@@ -22,7 +22,7 @@ import { encrypt, decrypt } from '../utils/auth';
import { WorkflowFile } from 'maxun-core';
import { Page } from 'playwright';
import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable';
import { pgBoss } from '../pgboss-worker';
import { cancelScheduledWorkflow, pgBoss, scheduleWorkflow } from '../pgboss-worker';
chromium.use(stealthPlugin());
export const router = Router();
@@ -792,17 +792,13 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
return res.status(401).json({ error: 'Unauthorized' });
}
// Create the job in the queue with the cron expression
const job = await workflowQueue.add(
'run workflow',
{ id, runId: uuid(), userId: req.user.id },
{
repeat: {
pattern: cronExpression,
tz: timezone,
},
}
);
try {
await cancelScheduledWorkflow(id);
} catch (cancelError) {
logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`);
}
const jobId = await scheduleWorkflow(id, req.user.id, cronExpression, timezone);
const nextRunAt = computeNextRun(cronExpression, timezone);
@@ -877,12 +873,12 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest,
return res.status(404).json({ error: 'Robot not found' });
}
// Remove existing job from queue if it exists
const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']);
for (const job of existingJobs) {
if (job.data.id === id) {
await job.remove();
}
// Cancel the scheduled job in PgBoss
try {
await cancelScheduledWorkflow(id);
} catch (error) {
logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`);
// Continue with robot update even if cancellation fails
}
// Delete the schedule from the robot