From 28b3f650d6812c8eb906d31fdf19e917a2855791 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 29 Mar 2025 18:30:11 +0530 Subject: [PATCH] feat: add scheduler pgboss worker functions --- server/src/pgboss-worker.ts | 181 ++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index b775c99c..542684b9 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -21,9 +21,13 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; +import { computeNextRun } from './utils/schedule'; +import { handleRunRecording } from './workflow-management/scheduler'; const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; +const registeredQueues = new Set(); + interface InitializeBrowserData { userId: string; } @@ -41,6 +45,12 @@ interface DestroyBrowserData { userId: string; } +interface ScheduledWorkflowData { + id: string; + runId: string; + userId: string; +} + interface ExecuteRunData { userId: string; runId: string; @@ -161,6 +171,69 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom } } +/** + * Utility function to schedule a cron job using PgBoss + * @param id The robot ID + * @param userId The user ID + * @param cronExpression The cron expression for scheduling + * @param timezone The timezone for the cron expression + */ +export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { + try { + const runId = require('uuidv4').uuid(); + + const queueName = `scheduled-workflow-${id}`; + + logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); + + await pgBoss.createQueue(queueName); + + await pgBoss.schedule(queueName, cronExpression, + { id, runId, userId }, + { tz: timezone } + ); + + logger.log('info', `Scheduled workflow job for robot ${id}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to schedule workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Utility function to cancel a scheduled job + * @param robotId The robot ID + * @returns true if successful + */ +export async function cancelScheduledWorkflow(robotId: string) { + try { + const jobs = await pgBoss.getSchedules(); + + console.log("Scheduled JOBS", jobs); + + const matchingJobs = jobs.filter((job: any) => { + try { + const data = JSON.parse(job.data); + return data && data.id === robotId; + } catch { + return false; + } + }); + + for (const job of matchingJobs) { + logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); + await pgBoss.unschedule(job.name); + } + + return true; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); + throw error; + } +} + /** * Modified processRunExecution function - only add browser reset */ @@ -359,6 +432,107 @@ async function processRunExecution(job: Job) { } } +/** + * Process a scheduled workflow job + */ +async function processScheduledWorkflow(job: Job) { + const { id, runId, userId } = job.data; + logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`); + + try { + // Execute the workflow using the existing handleRunRecording function + const result = await handleRunRecording(id, userId); + + // Update the robot's schedule with last run and next run times + const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); + if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { + // Update lastRunAt to the current time + const lastRunAt = new Date(); + + // Compute the next run date + const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; + + await robot.update({ + schedule: { + ...robot.schedule, + lastRunAt, + nextRunAt, + }, + }); + + logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`); + } else { + logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`); + } + + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed: ${errorMessage}`); + return { success: false }; + } +} + +/** + * Register a worker to handle scheduled workflow jobs + */ +async function registerScheduledWorkflowWorker() { + try { + // First, get a list of all existing robots + const robots = await Robot.findAll({ + attributes: ['recording_meta.id'], + raw: true + }); + + // Register a worker for each potential robot queue + for (const robot of robots) { + if (robot.recording_meta && robot.recording_meta.id) { + const queueName = `scheduled-workflow-${robot.recording_meta.id}`; + await registerWorkerForQueue(queueName); + } + } + + // Also register workers for any existing PgBoss queues that follow our naming pattern + const queues = await pgBoss.getQueues(); + for (const queue of queues) { + if (queue.name.startsWith('scheduled-workflow-') && + !queue.name.endsWith('_error') && + !queue.name.endsWith('_completed')) { + await registerWorkerForQueue(queue.name); + } + } + + logger.log('info', 'Scheduled workflow workers registered successfully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`); + } +} + +async function registerWorkerForQueue(queueName: string) { + try { + if (registeredQueues.has(queueName)) { + return; + } + + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processScheduledWorkflow(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredQueues.add(queueName); + logger.log('info', `Registered worker for queue: ${queueName}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); + } +} async function registerRunExecutionWorker() { try { @@ -495,6 +669,9 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); + // Register the scheduled workflow worker + await registerScheduledWorkflowWorker(); + logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -506,6 +683,10 @@ async function startWorkers() { // Start all workers startWorkers(); +pgBoss.on('error', (error) => { + logger.log('error', `PgBoss error: ${error.message}`); +}); + // Handle graceful shutdown process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss...');