From 3977b6feb4b8e056b15234df231fdbbb6f7a11c5 Mon Sep 17 00:00:00 2001 From: Rohit Date: Fri, 28 Mar 2025 17:24:58 +0530 Subject: [PATCH 01/12] feat: schedule routes using pgboss queue --- server/src/routes/storage.ts | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index af7850f0..603cde14 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -22,7 +22,7 @@ import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { Page } from 'playwright'; import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable'; -import { pgBoss } from '../pgboss-worker'; +import { cancelScheduledWorkflow, pgBoss, scheduleWorkflow } from '../pgboss-worker'; chromium.use(stealthPlugin()); export const router = Router(); @@ -792,17 +792,13 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re return res.status(401).json({ error: 'Unauthorized' }); } - // Create the job in the queue with the cron expression - const job = await workflowQueue.add( - 'run workflow', - { id, runId: uuid(), userId: req.user.id }, - { - repeat: { - pattern: cronExpression, - tz: timezone, - }, - } - ); + try { + await cancelScheduledWorkflow(id); + } catch (cancelError) { + logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`); + } + + const jobId = await scheduleWorkflow(id, req.user.id, cronExpression, timezone); const nextRunAt = computeNextRun(cronExpression, timezone); @@ -877,12 +873,12 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, return res.status(404).json({ error: 'Robot not found' }); } - // Remove existing job from queue if it exists - const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']); - for (const job of existingJobs) { - if (job.data.id === id) { - await job.remove(); - } + // Cancel the scheduled job in PgBoss + try { + await cancelScheduledWorkflow(id); + } catch (error) { + logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`); + // Continue with robot update even if cancellation fails } // Delete the schedule from the robot From 28b3f650d6812c8eb906d31fdf19e917a2855791 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 29 Mar 2025 18:30:11 +0530 Subject: [PATCH 02/12] feat: add scheduler pgboss worker functions --- server/src/pgboss-worker.ts | 181 ++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index b775c99c..542684b9 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -21,9 +21,13 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; +import { computeNextRun } from './utils/schedule'; +import { handleRunRecording } from './workflow-management/scheduler'; const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; +const registeredQueues = new Set(); + interface InitializeBrowserData { userId: string; } @@ -41,6 +45,12 @@ interface DestroyBrowserData { userId: string; } +interface ScheduledWorkflowData { + id: string; + runId: string; + userId: string; +} + interface ExecuteRunData { userId: string; runId: string; @@ -161,6 +171,69 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom } } +/** + * Utility function to schedule a cron job using PgBoss + * @param id The robot ID + * @param userId The user ID + * @param cronExpression The cron expression for scheduling + * @param timezone The timezone for the cron expression + */ +export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { + try { + const runId = require('uuidv4').uuid(); + + const queueName = `scheduled-workflow-${id}`; + + logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); + + await pgBoss.createQueue(queueName); + + await pgBoss.schedule(queueName, cronExpression, + { id, runId, userId }, + { tz: timezone } + ); + + logger.log('info', `Scheduled workflow job for robot ${id}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to schedule workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Utility function to cancel a scheduled job + * @param robotId The robot ID + * @returns true if successful + */ +export async function cancelScheduledWorkflow(robotId: string) { + try { + const jobs = await pgBoss.getSchedules(); + + console.log("Scheduled JOBS", jobs); + + const matchingJobs = jobs.filter((job: any) => { + try { + const data = JSON.parse(job.data); + return data && data.id === robotId; + } catch { + return false; + } + }); + + for (const job of matchingJobs) { + logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); + await pgBoss.unschedule(job.name); + } + + return true; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); + throw error; + } +} + /** * Modified processRunExecution function - only add browser reset */ @@ -359,6 +432,107 @@ async function processRunExecution(job: Job) { } } +/** + * Process a scheduled workflow job + */ +async function processScheduledWorkflow(job: Job) { + const { id, runId, userId } = job.data; + logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`); + + try { + // Execute the workflow using the existing handleRunRecording function + const result = await handleRunRecording(id, userId); + + // Update the robot's schedule with last run and next run times + const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); + if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { + // Update lastRunAt to the current time + const lastRunAt = new Date(); + + // Compute the next run date + const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; + + await robot.update({ + schedule: { + ...robot.schedule, + lastRunAt, + nextRunAt, + }, + }); + + logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`); + } else { + logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`); + } + + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed: ${errorMessage}`); + return { success: false }; + } +} + +/** + * Register a worker to handle scheduled workflow jobs + */ +async function registerScheduledWorkflowWorker() { + try { + // First, get a list of all existing robots + const robots = await Robot.findAll({ + attributes: ['recording_meta.id'], + raw: true + }); + + // Register a worker for each potential robot queue + for (const robot of robots) { + if (robot.recording_meta && robot.recording_meta.id) { + const queueName = `scheduled-workflow-${robot.recording_meta.id}`; + await registerWorkerForQueue(queueName); + } + } + + // Also register workers for any existing PgBoss queues that follow our naming pattern + const queues = await pgBoss.getQueues(); + for (const queue of queues) { + if (queue.name.startsWith('scheduled-workflow-') && + !queue.name.endsWith('_error') && + !queue.name.endsWith('_completed')) { + await registerWorkerForQueue(queue.name); + } + } + + logger.log('info', 'Scheduled workflow workers registered successfully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`); + } +} + +async function registerWorkerForQueue(queueName: string) { + try { + if (registeredQueues.has(queueName)) { + return; + } + + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processScheduledWorkflow(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredQueues.add(queueName); + logger.log('info', `Registered worker for queue: ${queueName}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); + } +} async function registerRunExecutionWorker() { try { @@ -495,6 +669,9 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); + // Register the scheduled workflow worker + await registerScheduledWorkflowWorker(); + logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -506,6 +683,10 @@ async function startWorkers() { // Start all workers startWorkers(); +pgBoss.on('error', (error) => { + logger.log('error', `PgBoss error: ${error.message}`); +}); + // Handle graceful shutdown process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss...'); From 54c0a8768154dfc053108c6bd1e02a6d33a3311c Mon Sep 17 00:00:00 2001 From: Rohit Date: Sat, 29 Mar 2025 18:58:44 +0530 Subject: [PATCH 03/12] feat: rm job data parsing --- server/src/pgboss-worker.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 542684b9..8373b680 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -209,12 +209,10 @@ export async function scheduleWorkflow(id: string, userId: string, cronExpressio export async function cancelScheduledWorkflow(robotId: string) { try { const jobs = await pgBoss.getSchedules(); - - console.log("Scheduled JOBS", jobs); const matchingJobs = jobs.filter((job: any) => { try { - const data = JSON.parse(job.data); + const data = job.data; return data && data.id === robotId; } catch { return false; From 7a75b9d2673599c60cf900ac9d344bfdedf9e5dc Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 00:20:12 +0530 Subject: [PATCH 04/12] feat: get user id from namespace auth --- server/src/socket-connection/connection.ts | 105 ++++++++++++++------- 1 file changed, 71 insertions(+), 34 deletions(-) diff --git a/server/src/socket-connection/connection.ts b/server/src/socket-connection/connection.ts index 9fad861b..66294ecf 100644 --- a/server/src/socket-connection/connection.ts +++ b/server/src/socket-connection/connection.ts @@ -1,6 +1,6 @@ import { Namespace, Socket } from 'socket.io'; import { IncomingMessage } from 'http'; -import { verify, JwtPayload } from 'jsonwebtoken'; +import { verify, JwtPayload, sign } from 'jsonwebtoken'; import logger from "../logger"; import registerInputHandlers from '../browser-management/inputHandlers'; @@ -12,48 +12,85 @@ interface AuthenticatedSocket extends Socket { request: AuthenticatedIncomingMessage; } +declare global { + var userContextMap: Map; +} + +if (!global.userContextMap) { + global.userContextMap = new Map(); +} + +/** + * Register browser-user association in the global context map + */ +export function registerBrowserUserContext(browserId: string, userId: string) { + if (!global.userContextMap) { + global.userContextMap = new Map(); + } + global.userContextMap.set(browserId, userId); + logger.log('debug', `Registered browser-user association: ${browserId} -> ${userId}`); +} + /** * Socket.io middleware for authentication * This is a socket.io specific auth handler that doesn't rely on Express middleware */ const socketAuthMiddleware = (socket: Socket, next: (err?: Error) => void) => { - const cookies = socket.handshake.headers.cookie; - if (!cookies) { - return next(new Error('Authentication required')); + // Extract browserId from namespace + const namespace = socket.nsp.name; + const browserId = namespace.slice(1); + + // Check if this browser is in our context map + if (global.userContextMap && global.userContextMap.has(browserId)) { + const userId = global.userContextMap.get(browserId); + logger.log('debug', `Found browser in context map: ${browserId} -> ${userId}`); + + const authSocket = socket as AuthenticatedSocket; + authSocket.request.user = { id: userId }; + return next(); + } + + const cookies = socket.handshake.headers.cookie; + if (!cookies) { + logger.log('debug', `No cookies found in socket handshake for ${browserId}`); + return next(new Error('Authentication required')); + } + + const tokenMatch = cookies.split(';').find(c => c.trim().startsWith('token=')); + if (!tokenMatch) { + logger.log('debug', `No token cookie found in socket handshake for ${browserId}`); + return next(new Error('Authentication required')); + } + + const token = tokenMatch.split('=')[1]; + if (!token) { + logger.log('debug', `Empty token value in cookie for ${browserId}`); + return next(new Error('Authentication required')); + } + + const secret = process.env.JWT_SECRET; + if (!secret) { + logger.error('JWT_SECRET environment variable is not defined'); + return next(new Error('Server configuration error')); + } + + verify(token, secret, (err: any, user: any) => { + if (err) { + logger.log('warn', `JWT verification error: ${err.message}`); + return next(new Error('Authentication failed')); } - const tokenMatch = cookies.split(';').find(c => c.trim().startsWith('token=')); - if (!tokenMatch) { - return next(new Error('Authentication required')); + // Normalize payload key + if (user.userId && !user.id) { + user.id = user.userId; + delete user.userId; } - const token = tokenMatch.split('=')[1]; - if (!token) { - return next(new Error('Authentication required')); - } - - const secret = process.env.JWT_SECRET; - if (!secret) { - return next(new Error('Server configuration error')); - } - - verify(token, secret, (err: any, user: any) => { - if (err) { - logger.log('warn', 'JWT verification error:', err); - return next(new Error('Authentication failed')); - } - - // Normalize payload key - if (user.userId && !user.id) { - user.id = user.userId; - delete user.userId; // temporary: del the old key for clarity - } - - // Attach user to socket request - const authSocket = socket as AuthenticatedSocket; - authSocket.request.user = user; - next(); - }); + // Attach user to socket request + const authSocket = socket as AuthenticatedSocket; + authSocket.request.user = user; + next(); + }); }; /** From d13e9d56cef50449810b647d7d277648d6f21078 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 00:21:24 +0530 Subject: [PATCH 05/12] feat: register browser user context --- server/src/browser-management/controller.ts | 24 ++++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index 2cd9498d..ef1e0011 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -5,7 +5,7 @@ import { Socket } from "socket.io"; import { uuid } from 'uuidv4'; -import { createSocketConnection, createSocketConnectionForRun } from "../socket-connection/connection"; +import { createSocketConnection, createSocketConnectionForRun, registerBrowserUserContext } from "../socket-connection/connection"; import { io, browserPool } from "../server"; import { RemoteBrowser } from "./classes/RemoteBrowser"; import { RemoteBrowserOptions } from "../types"; @@ -48,19 +48,27 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => { * Starts and initializes a {@link RemoteBrowser} instance for interpretation. * Creates a new {@link Socket} connection over a dedicated namespace. * Returns the new remote browser's generated id. - * @param options {@link RemoteBrowserOptions} to be used when launching the browser - * @returns string + * @param userId User ID for browser ownership + * @returns string Browser ID * @category BrowserManagement-Controller */ export const createRemoteBrowserForRun = (userId: string): string => { const id = uuid(); + + registerBrowserUserContext(id, userId); + logger.log('debug', `Created new browser for run: ${id} for user: ${userId}`); + createSocketConnectionForRun( - io.of(id), + io.of(`/${id}`), async (socket: Socket) => { - const browserSession = new RemoteBrowser(socket, userId); - await browserSession.initialize(userId); - browserPool.addRemoteBrowser(id, browserSession, userId, false, "run"); - socket.emit('ready-for-run'); + try { + const browserSession = new RemoteBrowser(socket, userId); + await browserSession.initialize(userId); + browserPool.addRemoteBrowser(id, browserSession, userId, false, "run"); + socket.emit('ready-for-run'); + } catch (error: any) { + logger.error(`Error initializing browser: ${error.message}`); + } }); return id; }; From b2cd7c7bc23ce388e2bb2d8db6bd6dd6038a5951 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 00:22:40 +0530 Subject: [PATCH 06/12] feat: register worker for queue --- server/src/pgboss-worker.ts | 28 ++++++---------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 8373b680..190afdad 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -192,6 +192,8 @@ export async function scheduleWorkflow(id: string, userId: string, cronExpressio { id, runId, userId }, { tz: timezone } ); + + await registerWorkerForQueue(queueName); logger.log('info', `Scheduled workflow job for robot ${id}`); } catch (error: unknown) { @@ -476,28 +478,10 @@ async function processScheduledWorkflow(job: Job) { */ async function registerScheduledWorkflowWorker() { try { - // First, get a list of all existing robots - const robots = await Robot.findAll({ - attributes: ['recording_meta.id'], - raw: true - }); - - // Register a worker for each potential robot queue - for (const robot of robots) { - if (robot.recording_meta && robot.recording_meta.id) { - const queueName = `scheduled-workflow-${robot.recording_meta.id}`; - await registerWorkerForQueue(queueName); - } - } - - // Also register workers for any existing PgBoss queues that follow our naming pattern - const queues = await pgBoss.getQueues(); - for (const queue of queues) { - if (queue.name.startsWith('scheduled-workflow-') && - !queue.name.endsWith('_error') && - !queue.name.endsWith('_completed')) { - await registerWorkerForQueue(queue.name); - } + const jobs = await pgBoss.getSchedules(); + for (const job of jobs) { + await pgBoss.createQueue(job.name); + await registerWorkerForQueue(job.name); } logger.log('info', 'Scheduled workflow workers registered successfully'); From 10970b563c1495655302de7c30dd67b244b5ce13 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 00:23:16 +0530 Subject: [PATCH 07/12] feat: get browser from id --- server/src/workflow-management/scheduler/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index b67e1ca0..8267fbb8 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -114,7 +114,7 @@ async function executeRun(id: string, userId: string) { plainRun.status = 'running'; - const browser = browserPool.getRemoteBrowser(userId); + const browser = browserPool.getRemoteBrowser(plainRun.browserId); if (!browser) { throw new Error('Could not access browser'); } From 47f9fd7eb0c6365074fee59fc7cb3a08c4d1e7fa Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 00:39:10 +0530 Subject: [PATCH 08/12] feat: refactor cron expressions for pgboss --- server/src/routes/storage.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 603cde14..e4ed5444 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -761,7 +761,7 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re switch (runEveryUnit) { case 'MINUTES': - cronExpression = `${startMinutes} */${runEvery} * * *`; + cronExpression = `*/${runEvery} * * * *`; break; case 'HOURS': cronExpression = `${startMinutes} */${runEvery} * * *`; @@ -774,7 +774,7 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re break; case 'MONTHS': // todo: handle leap year - cronExpression = `0 ${atTimeStart} ${dayOfMonth} * *`; + cronExpression = `${startMinutes} ${startHours} ${dayOfMonth} */${runEvery} *`; if (startFrom !== 'SUNDAY') { cronExpression += ` ${dayIndex}`; } From 660da53fba6220521057c91ab991b7d687cc5478 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 03:30:34 +0530 Subject: [PATCH 09/12] feat: rm scheduling logic --- server/src/pgboss-worker.ts | 160 ------------------------------------ 1 file changed, 160 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 190afdad..269f6773 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -21,13 +21,9 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; -import { computeNextRun } from './utils/schedule'; -import { handleRunRecording } from './workflow-management/scheduler'; const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; -const registeredQueues = new Set(); - interface InitializeBrowserData { userId: string; } @@ -45,12 +41,6 @@ interface DestroyBrowserData { userId: string; } -interface ScheduledWorkflowData { - id: string; - runId: string; - userId: string; -} - interface ExecuteRunData { userId: string; runId: string; @@ -171,69 +161,6 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom } } -/** - * Utility function to schedule a cron job using PgBoss - * @param id The robot ID - * @param userId The user ID - * @param cronExpression The cron expression for scheduling - * @param timezone The timezone for the cron expression - */ -export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { - try { - const runId = require('uuidv4').uuid(); - - const queueName = `scheduled-workflow-${id}`; - - logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); - - await pgBoss.createQueue(queueName); - - await pgBoss.schedule(queueName, cronExpression, - { id, runId, userId }, - { tz: timezone } - ); - - await registerWorkerForQueue(queueName); - - logger.log('info', `Scheduled workflow job for robot ${id}`); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to schedule workflow: ${errorMessage}`); - throw error; - } -} - -/** - * Utility function to cancel a scheduled job - * @param robotId The robot ID - * @returns true if successful - */ -export async function cancelScheduledWorkflow(robotId: string) { - try { - const jobs = await pgBoss.getSchedules(); - - const matchingJobs = jobs.filter((job: any) => { - try { - const data = job.data; - return data && data.id === robotId; - } catch { - return false; - } - }); - - for (const job of matchingJobs) { - logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); - await pgBoss.unschedule(job.name); - } - - return true; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); - throw error; - } -} - /** * Modified processRunExecution function - only add browser reset */ @@ -432,90 +359,6 @@ async function processRunExecution(job: Job) { } } -/** - * Process a scheduled workflow job - */ -async function processScheduledWorkflow(job: Job) { - const { id, runId, userId } = job.data; - logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`); - - try { - // Execute the workflow using the existing handleRunRecording function - const result = await handleRunRecording(id, userId); - - // Update the robot's schedule with last run and next run times - const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); - if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { - // Update lastRunAt to the current time - const lastRunAt = new Date(); - - // Compute the next run date - const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; - - await robot.update({ - schedule: { - ...robot.schedule, - lastRunAt, - nextRunAt, - }, - }); - - logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`); - } else { - logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`); - } - - return { success: true }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Scheduled workflow job failed: ${errorMessage}`); - return { success: false }; - } -} - -/** - * Register a worker to handle scheduled workflow jobs - */ -async function registerScheduledWorkflowWorker() { - try { - const jobs = await pgBoss.getSchedules(); - for (const job of jobs) { - await pgBoss.createQueue(job.name); - await registerWorkerForQueue(job.name); - } - - logger.log('info', 'Scheduled workflow workers registered successfully'); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`); - } -} - -async function registerWorkerForQueue(queueName: string) { - try { - if (registeredQueues.has(queueName)) { - return; - } - - await pgBoss.work(queueName, async (job: Job | Job[]) => { - try { - const singleJob = Array.isArray(job) ? job[0] : job; - return await processScheduledWorkflow(singleJob); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); - throw error; - } - }); - - registeredQueues.add(queueName); - logger.log('info', `Registered worker for queue: ${queueName}`); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); - } -} - async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); @@ -651,9 +494,6 @@ async function startWorkers() { // Register the run execution worker await registerRunExecutionWorker(); - // Register the scheduled workflow worker - await registerScheduledWorkflowWorker(); - logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); From 3c380252d9c264e48c041e6a7f10056b2eae93b3 Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 03:31:18 +0530 Subject: [PATCH 10/12] feat: add schedule worker --- server/src/schedule-worker.ts | 209 ++++++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) create mode 100644 server/src/schedule-worker.ts diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts new file mode 100644 index 00000000..bb376c18 --- /dev/null +++ b/server/src/schedule-worker.ts @@ -0,0 +1,209 @@ +/** + * Worker process focused solely on scheduling logic + */ +import PgBoss, { Job } from 'pg-boss'; +import logger from './logger'; +import Robot from './models/Robot'; +import { handleRunRecording } from './workflow-management/scheduler'; +import { computeNextRun } from './utils/schedule'; +import { capture } from './utils/analytics'; + +const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; + +const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); + +const registeredQueues = new Set(); + +interface ScheduledWorkflowData { + id: string; + runId: string; + userId: string; +} + +/** + * Utility function to schedule a cron job using PgBoss + * @param id The robot ID + * @param userId The user ID + * @param cronExpression The cron expression for scheduling + * @param timezone The timezone for the cron expression + */ +export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { + try { + const runId = require('uuidv4').uuid(); + + const queueName = `scheduled-workflow-${id}`; + + logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); + + await pgBoss.createQueue(queueName); + + await pgBoss.schedule(queueName, cronExpression, + { id, runId, userId }, + { tz: timezone } + ); + + await registerWorkerForQueue(queueName); + + logger.log('info', `Scheduled workflow job for robot ${id}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to schedule workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Utility function to cancel a scheduled job + * @param robotId The robot ID + * @returns true if successful + */ +export async function cancelScheduledWorkflow(robotId: string) { + try { + const jobs = await pgBoss.getSchedules(); + + const matchingJobs = jobs.filter((job: any) => { + try { + const data = job.data; + return data && data.id === robotId; + } catch { + return false; + } + }); + + for (const job of matchingJobs) { + logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); + await pgBoss.unschedule(job.name); + } + + return true; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Process a scheduled workflow job + */ +async function processScheduledWorkflow(job: Job) { + const { id, runId, userId } = job.data; + logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`); + + try { + // Execute the workflow using the existing handleRunRecording function + const result = await handleRunRecording(id, userId); + + // Update the robot's schedule with last run and next run times + const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); + if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { + // Update lastRunAt to the current time + const lastRunAt = new Date(); + + // Compute the next run date + const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; + + await robot.update({ + schedule: { + ...robot.schedule, + lastRunAt, + nextRunAt, + }, + }); + + logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`); + } else { + logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`); + } + + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed: ${errorMessage}`); + return { success: false }; + } +} + +/** + * Register a worker to handle scheduled workflow jobs + */ +async function registerScheduledWorkflowWorker() { + try { + const jobs = await pgBoss.getSchedules(); + for (const job of jobs) { + await pgBoss.createQueue(job.name); + await registerWorkerForQueue(job.name); + } + + logger.log('info', 'Scheduled workflow workers registered successfully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`); + } +} + +/** + * Register a worker for a specific queue + */ +async function registerWorkerForQueue(queueName: string) { + try { + if (registeredQueues.has(queueName)) { + return; + } + + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processScheduledWorkflow(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredQueues.add(queueName); + logger.log('info', `Registered worker for queue: ${queueName}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); + } +} + +/** + * Initialize PgBoss and register scheduling workers + */ +async function startScheduleWorker() { + try { + logger.log('info', 'Starting PgBoss scheduling worker...'); + await pgBoss.start(); + logger.log('info', 'PgBoss scheduling worker started successfully'); + + // Register the scheduled workflow worker + await registerScheduledWorkflowWorker(); + + logger.log('info', 'Scheduling worker registered successfully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to start PgBoss scheduling worker: ${errorMessage}`); + process.exit(1); + } +} + +startScheduleWorker(); + +pgBoss.on('error', (error) => { + logger.log('error', `PgBoss scheduler error: ${error.message}`); +}); + +process.on('SIGTERM', async () => { + logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...'); + await pgBoss.stop(); + process.exit(0); +}); + +process.on('SIGINT', async () => { + logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...'); + await pgBoss.stop(); + process.exit(0); +}); From b4e5f2c149e5949e2495922e0f549fc131efa7ea Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 03:31:49 +0530 Subject: [PATCH 11/12] feat: resolve imports --- server/src/routes/storage.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index e4ed5444..7643a59c 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -22,7 +22,8 @@ import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { Page } from 'playwright'; import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable'; -import { cancelScheduledWorkflow, pgBoss, scheduleWorkflow } from '../pgboss-worker'; +import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; +import { pgBoss } from '../pgboss-worker'; chromium.use(stealthPlugin()); export const router = Router(); From e6ef51392ad81ba21f3d5a043152a8a2c6f83f0c Mon Sep 17 00:00:00 2001 From: Rohit Date: Sun, 30 Mar 2025 03:32:14 +0530 Subject: [PATCH 12/12] feat: add schedule worker path --- server/src/server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/server.ts b/server/src/server.ts index 10e33525..cc3dc199 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -97,7 +97,7 @@ readdirSync(path.join(__dirname, 'api')).forEach((r) => { }); const isProduction = process.env.NODE_ENV === 'production'; -const workerPath = path.resolve(__dirname, isProduction ? './worker.js' : './worker.ts'); +const workerPath = path.resolve(__dirname, isProduction ? './schedule-worker.js' : './schedule-worker.ts'); const recordingWorkerPath = path.resolve(__dirname, isProduction ? './pgboss-worker.js' : './pgboss-worker.ts'); let workerProcess: any;