diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index 2cd9498d..ef1e0011 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -5,7 +5,7 @@ import { Socket } from "socket.io"; import { uuid } from 'uuidv4'; -import { createSocketConnection, createSocketConnectionForRun } from "../socket-connection/connection"; +import { createSocketConnection, createSocketConnectionForRun, registerBrowserUserContext } from "../socket-connection/connection"; import { io, browserPool } from "../server"; import { RemoteBrowser } from "./classes/RemoteBrowser"; import { RemoteBrowserOptions } from "../types"; @@ -48,19 +48,27 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => { * Starts and initializes a {@link RemoteBrowser} instance for interpretation. * Creates a new {@link Socket} connection over a dedicated namespace. * Returns the new remote browser's generated id. - * @param options {@link RemoteBrowserOptions} to be used when launching the browser - * @returns string + * @param userId User ID for browser ownership + * @returns string Browser ID * @category BrowserManagement-Controller */ export const createRemoteBrowserForRun = (userId: string): string => { const id = uuid(); + + registerBrowserUserContext(id, userId); + logger.log('debug', `Created new browser for run: ${id} for user: ${userId}`); + createSocketConnectionForRun( - io.of(id), + io.of(`/${id}`), async (socket: Socket) => { - const browserSession = new RemoteBrowser(socket, userId); - await browserSession.initialize(userId); - browserPool.addRemoteBrowser(id, browserSession, userId, false, "run"); - socket.emit('ready-for-run'); + try { + const browserSession = new RemoteBrowser(socket, userId); + await browserSession.initialize(userId); + browserPool.addRemoteBrowser(id, browserSession, userId, false, "run"); + socket.emit('ready-for-run'); + } catch (error: any) { + logger.error(`Error initializing browser: ${error.message}`); + } }); return id; }; diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index b775c99c..269f6773 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -359,7 +359,6 @@ async function processRunExecution(job: Job) { } } - async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); @@ -506,6 +505,10 @@ async function startWorkers() { // Start all workers startWorkers(); +pgBoss.on('error', (error) => { + logger.log('error', `PgBoss error: ${error.message}`); +}); + // Handle graceful shutdown process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss...'); diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index af7850f0..7643a59c 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -22,6 +22,7 @@ import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { Page } from 'playwright'; import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable'; +import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; import { pgBoss } from '../pgboss-worker'; chromium.use(stealthPlugin()); @@ -761,7 +762,7 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re switch (runEveryUnit) { case 'MINUTES': - cronExpression = `${startMinutes} */${runEvery} * * *`; + cronExpression = `*/${runEvery} * * * *`; break; case 'HOURS': cronExpression = `${startMinutes} */${runEvery} * * *`; @@ -774,7 +775,7 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re break; case 'MONTHS': // todo: handle leap year - cronExpression = `0 ${atTimeStart} ${dayOfMonth} * *`; + cronExpression = `${startMinutes} ${startHours} ${dayOfMonth} */${runEvery} *`; if (startFrom !== 'SUNDAY') { cronExpression += ` ${dayIndex}`; } @@ -792,17 +793,13 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re return res.status(401).json({ error: 'Unauthorized' }); } - // Create the job in the queue with the cron expression - const job = await workflowQueue.add( - 'run workflow', - { id, runId: uuid(), userId: req.user.id }, - { - repeat: { - pattern: cronExpression, - tz: timezone, - }, - } - ); + try { + await cancelScheduledWorkflow(id); + } catch (cancelError) { + logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`); + } + + const jobId = await scheduleWorkflow(id, req.user.id, cronExpression, timezone); const nextRunAt = computeNextRun(cronExpression, timezone); @@ -877,12 +874,12 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, return res.status(404).json({ error: 'Robot not found' }); } - // Remove existing job from queue if it exists - const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']); - for (const job of existingJobs) { - if (job.data.id === id) { - await job.remove(); - } + // Cancel the scheduled job in PgBoss + try { + await cancelScheduledWorkflow(id); + } catch (error) { + logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`); + // Continue with robot update even if cancellation fails } // Delete the schedule from the robot diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts new file mode 100644 index 00000000..bb376c18 --- /dev/null +++ b/server/src/schedule-worker.ts @@ -0,0 +1,209 @@ +/** + * Worker process focused solely on scheduling logic + */ +import PgBoss, { Job } from 'pg-boss'; +import logger from './logger'; +import Robot from './models/Robot'; +import { handleRunRecording } from './workflow-management/scheduler'; +import { computeNextRun } from './utils/schedule'; +import { capture } from './utils/analytics'; + +const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; + +const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); + +const registeredQueues = new Set(); + +interface ScheduledWorkflowData { + id: string; + runId: string; + userId: string; +} + +/** + * Utility function to schedule a cron job using PgBoss + * @param id The robot ID + * @param userId The user ID + * @param cronExpression The cron expression for scheduling + * @param timezone The timezone for the cron expression + */ +export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { + try { + const runId = require('uuidv4').uuid(); + + const queueName = `scheduled-workflow-${id}`; + + logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); + + await pgBoss.createQueue(queueName); + + await pgBoss.schedule(queueName, cronExpression, + { id, runId, userId }, + { tz: timezone } + ); + + await registerWorkerForQueue(queueName); + + logger.log('info', `Scheduled workflow job for robot ${id}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to schedule workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Utility function to cancel a scheduled job + * @param robotId The robot ID + * @returns true if successful + */ +export async function cancelScheduledWorkflow(robotId: string) { + try { + const jobs = await pgBoss.getSchedules(); + + const matchingJobs = jobs.filter((job: any) => { + try { + const data = job.data; + return data && data.id === robotId; + } catch { + return false; + } + }); + + for (const job of matchingJobs) { + logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); + await pgBoss.unschedule(job.name); + } + + return true; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); + throw error; + } +} + +/** + * Process a scheduled workflow job + */ +async function processScheduledWorkflow(job: Job) { + const { id, runId, userId } = job.data; + logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`); + + try { + // Execute the workflow using the existing handleRunRecording function + const result = await handleRunRecording(id, userId); + + // Update the robot's schedule with last run and next run times + const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); + if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) { + // Update lastRunAt to the current time + const lastRunAt = new Date(); + + // Compute the next run date + const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined; + + await robot.update({ + schedule: { + ...robot.schedule, + lastRunAt, + nextRunAt, + }, + }); + + logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`); + } else { + logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`); + } + + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed: ${errorMessage}`); + return { success: false }; + } +} + +/** + * Register a worker to handle scheduled workflow jobs + */ +async function registerScheduledWorkflowWorker() { + try { + const jobs = await pgBoss.getSchedules(); + for (const job of jobs) { + await pgBoss.createQueue(job.name); + await registerWorkerForQueue(job.name); + } + + logger.log('info', 'Scheduled workflow workers registered successfully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`); + } +} + +/** + * Register a worker for a specific queue + */ +async function registerWorkerForQueue(queueName: string) { + try { + if (registeredQueues.has(queueName)) { + return; + } + + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processScheduledWorkflow(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredQueues.add(queueName); + logger.log('info', `Registered worker for queue: ${queueName}`); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`); + } +} + +/** + * Initialize PgBoss and register scheduling workers + */ +async function startScheduleWorker() { + try { + logger.log('info', 'Starting PgBoss scheduling worker...'); + await pgBoss.start(); + logger.log('info', 'PgBoss scheduling worker started successfully'); + + // Register the scheduled workflow worker + await registerScheduledWorkflowWorker(); + + logger.log('info', 'Scheduling worker registered successfully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Failed to start PgBoss scheduling worker: ${errorMessage}`); + process.exit(1); + } +} + +startScheduleWorker(); + +pgBoss.on('error', (error) => { + logger.log('error', `PgBoss scheduler error: ${error.message}`); +}); + +process.on('SIGTERM', async () => { + logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...'); + await pgBoss.stop(); + process.exit(0); +}); + +process.on('SIGINT', async () => { + logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...'); + await pgBoss.stop(); + process.exit(0); +}); diff --git a/server/src/server.ts b/server/src/server.ts index 10e33525..cc3dc199 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -97,7 +97,7 @@ readdirSync(path.join(__dirname, 'api')).forEach((r) => { }); const isProduction = process.env.NODE_ENV === 'production'; -const workerPath = path.resolve(__dirname, isProduction ? './worker.js' : './worker.ts'); +const workerPath = path.resolve(__dirname, isProduction ? './schedule-worker.js' : './schedule-worker.ts'); const recordingWorkerPath = path.resolve(__dirname, isProduction ? './pgboss-worker.js' : './pgboss-worker.ts'); let workerProcess: any; diff --git a/server/src/socket-connection/connection.ts b/server/src/socket-connection/connection.ts index 9fad861b..66294ecf 100644 --- a/server/src/socket-connection/connection.ts +++ b/server/src/socket-connection/connection.ts @@ -1,6 +1,6 @@ import { Namespace, Socket } from 'socket.io'; import { IncomingMessage } from 'http'; -import { verify, JwtPayload } from 'jsonwebtoken'; +import { verify, JwtPayload, sign } from 'jsonwebtoken'; import logger from "../logger"; import registerInputHandlers from '../browser-management/inputHandlers'; @@ -12,48 +12,85 @@ interface AuthenticatedSocket extends Socket { request: AuthenticatedIncomingMessage; } +declare global { + var userContextMap: Map; +} + +if (!global.userContextMap) { + global.userContextMap = new Map(); +} + +/** + * Register browser-user association in the global context map + */ +export function registerBrowserUserContext(browserId: string, userId: string) { + if (!global.userContextMap) { + global.userContextMap = new Map(); + } + global.userContextMap.set(browserId, userId); + logger.log('debug', `Registered browser-user association: ${browserId} -> ${userId}`); +} + /** * Socket.io middleware for authentication * This is a socket.io specific auth handler that doesn't rely on Express middleware */ const socketAuthMiddleware = (socket: Socket, next: (err?: Error) => void) => { - const cookies = socket.handshake.headers.cookie; - if (!cookies) { - return next(new Error('Authentication required')); + // Extract browserId from namespace + const namespace = socket.nsp.name; + const browserId = namespace.slice(1); + + // Check if this browser is in our context map + if (global.userContextMap && global.userContextMap.has(browserId)) { + const userId = global.userContextMap.get(browserId); + logger.log('debug', `Found browser in context map: ${browserId} -> ${userId}`); + + const authSocket = socket as AuthenticatedSocket; + authSocket.request.user = { id: userId }; + return next(); + } + + const cookies = socket.handshake.headers.cookie; + if (!cookies) { + logger.log('debug', `No cookies found in socket handshake for ${browserId}`); + return next(new Error('Authentication required')); + } + + const tokenMatch = cookies.split(';').find(c => c.trim().startsWith('token=')); + if (!tokenMatch) { + logger.log('debug', `No token cookie found in socket handshake for ${browserId}`); + return next(new Error('Authentication required')); + } + + const token = tokenMatch.split('=')[1]; + if (!token) { + logger.log('debug', `Empty token value in cookie for ${browserId}`); + return next(new Error('Authentication required')); + } + + const secret = process.env.JWT_SECRET; + if (!secret) { + logger.error('JWT_SECRET environment variable is not defined'); + return next(new Error('Server configuration error')); + } + + verify(token, secret, (err: any, user: any) => { + if (err) { + logger.log('warn', `JWT verification error: ${err.message}`); + return next(new Error('Authentication failed')); } - const tokenMatch = cookies.split(';').find(c => c.trim().startsWith('token=')); - if (!tokenMatch) { - return next(new Error('Authentication required')); + // Normalize payload key + if (user.userId && !user.id) { + user.id = user.userId; + delete user.userId; } - const token = tokenMatch.split('=')[1]; - if (!token) { - return next(new Error('Authentication required')); - } - - const secret = process.env.JWT_SECRET; - if (!secret) { - return next(new Error('Server configuration error')); - } - - verify(token, secret, (err: any, user: any) => { - if (err) { - logger.log('warn', 'JWT verification error:', err); - return next(new Error('Authentication failed')); - } - - // Normalize payload key - if (user.userId && !user.id) { - user.id = user.userId; - delete user.userId; // temporary: del the old key for clarity - } - - // Attach user to socket request - const authSocket = socket as AuthenticatedSocket; - authSocket.request.user = user; - next(); - }); + // Attach user to socket request + const authSocket = socket as AuthenticatedSocket; + authSocket.request.user = user; + next(); + }); }; /** diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index b67e1ca0..8267fbb8 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -114,7 +114,7 @@ async function executeRun(id: string, userId: string) { plainRun.status = 'running'; - const browser = browserPool.getRemoteBrowser(userId); + const browser = browserPool.getRemoteBrowser(plainRun.browserId); if (!browser) { throw new Error('Could not access browser'); }