diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 190afdad..269f6773 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -21,13 +21,9 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; -import { computeNextRun } from './utils/schedule'; -import { handleRunRecording } from './workflow-management/scheduler'; const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; -const registeredQueues = new Set(); - interface InitializeBrowserData { userId: string; } @@ -45,12 +41,6 @@ interface DestroyBrowserData { userId: string; } -interface ScheduledWorkflowData { - id: string; - runId: string; - userId: string; -} - interface ExecuteRunData { userId: string; runId: string; @@ -171,69 +161,6 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom } } -/** - * Utility function to schedule a cron job using PgBoss - * @param id The robot ID - * @param userId The user ID - * @param cronExpression The cron expression for scheduling - * @param timezone The timezone for the cron expression - */ -export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { - try { - const runId = require('uuidv4').uuid(); - - const queueName = `scheduled-workflow-${id}`; - - logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); - - await pgBoss.createQueue(queueName); - - await pgBoss.schedule(queueName, cronExpression, - { id, runId, userId }, - { tz: timezone } - ); - - await registerWorkerForQueue(queueName); - - logger.log('info', `Scheduled workflow job for robot ${id}`); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to schedule workflow: ${errorMessage}`); - throw error; - } -} - -/** - * Utility function to cancel a scheduled job - * @param robotId The robot ID - * @returns true if successful - */ -export async function cancelScheduledWorkflow(robotId: string) { - try { - const jobs = await pgBoss.getSchedules(); - - const matchingJobs = jobs.filter((job: any) => { - try { - const data = job.data; - return data && data.id === robotId; - } catch { - return false; - } - }); - - for (const job of matchingJobs) { - logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); - await pgBoss.unschedule(job.name); - } - - return true; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); - throw error; - } -} - /** * Modified processRunExecution function - only add browser reset */ @@ -432,90 +359,6 @@ async function processRunExecution(job: Job) { } } -/** - * Process a scheduled workflow job - */ -async function processScheduledWorkflow(job: Job) { - const { id, runId, userId } = job.data; - logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`); - - try { - // Execute the workflow using the existing handleRunRecording function - const result = await handleRunRecording(id, userId); - - // Update the robot's schedule with last run and next run times - const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); - if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { - // Update lastRunAt to the current time - const lastRunAt = new Date(); - - // Compute the next run date - const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; - - await robot.update({ - schedule: { - ...robot.schedule, - lastRunAt, - nextRunAt, - }, - }); - - logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`); - } else { - logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`); - } - - return { success: true }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Scheduled workflow job failed: ${errorMessage}`); - return { success: false }; - } -} - -/** - * Register a worker to handle scheduled workflow jobs - */ -async function registerScheduledWorkflowWorker() { - try { - const jobs = await pgBoss.getSchedules(); - for (const job of jobs) { - await pgBoss.createQueue(job.name); - await registerWorkerForQueue(job.name); - } - - logger.log('info', 'Scheduled workflow workers registered successfully'); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`); - } -} - -async function registerWorkerForQueue(queueName: string) { - try { - if (registeredQueues.has(queueName)) { - return; - } - - await pgBoss.work(queueName, async (job: Job | Job[]) => { - try { - const singleJob = Array.isArray(job) ? job[0] : job; - return await processScheduledWorkflow(singleJob); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); - throw error; - } - }); - - registeredQueues.add(queueName); - logger.log('info', `Registered worker for queue: ${queueName}`); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); - } -} - async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); @@ -651,9 +494,6 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); - // Register the scheduled workflow worker - await registerScheduledWorkflowWorker(); - logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error);