Merge pull request #507 from getmaxun/schedule-migrate
feat: use pgBoss for scheduling robots
This commit is contained in:
@@ -5,7 +5,7 @@
|
||||
import { Socket } from "socket.io";
|
||||
import { uuid } from 'uuidv4';
|
||||
|
||||
import { createSocketConnection, createSocketConnectionForRun } from "../socket-connection/connection";
|
||||
import { createSocketConnection, createSocketConnectionForRun, registerBrowserUserContext } from "../socket-connection/connection";
|
||||
import { io, browserPool } from "../server";
|
||||
import { RemoteBrowser } from "./classes/RemoteBrowser";
|
||||
import { RemoteBrowserOptions } from "../types";
|
||||
@@ -48,19 +48,27 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => {
|
||||
* Starts and initializes a {@link RemoteBrowser} instance for interpretation.
|
||||
* Creates a new {@link Socket} connection over a dedicated namespace.
|
||||
* Returns the new remote browser's generated id.
|
||||
* @param options {@link RemoteBrowserOptions} to be used when launching the browser
|
||||
* @returns string
|
||||
* @param userId User ID for browser ownership
|
||||
* @returns string Browser ID
|
||||
* @category BrowserManagement-Controller
|
||||
*/
|
||||
export const createRemoteBrowserForRun = (userId: string): string => {
|
||||
const id = uuid();
|
||||
|
||||
registerBrowserUserContext(id, userId);
|
||||
logger.log('debug', `Created new browser for run: ${id} for user: ${userId}`);
|
||||
|
||||
createSocketConnectionForRun(
|
||||
io.of(id),
|
||||
io.of(`/${id}`),
|
||||
async (socket: Socket) => {
|
||||
const browserSession = new RemoteBrowser(socket, userId);
|
||||
await browserSession.initialize(userId);
|
||||
browserPool.addRemoteBrowser(id, browserSession, userId, false, "run");
|
||||
socket.emit('ready-for-run');
|
||||
try {
|
||||
const browserSession = new RemoteBrowser(socket, userId);
|
||||
await browserSession.initialize(userId);
|
||||
browserPool.addRemoteBrowser(id, browserSession, userId, false, "run");
|
||||
socket.emit('ready-for-run');
|
||||
} catch (error: any) {
|
||||
logger.error(`Error initializing browser: ${error.message}`);
|
||||
}
|
||||
});
|
||||
return id;
|
||||
};
|
||||
|
||||
@@ -359,7 +359,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async function registerRunExecutionWorker() {
|
||||
try {
|
||||
const registeredUserQueues = new Map();
|
||||
@@ -506,6 +505,10 @@ async function startWorkers() {
|
||||
// Start all workers
|
||||
startWorkers();
|
||||
|
||||
pgBoss.on('error', (error) => {
|
||||
logger.log('error', `PgBoss error: ${error.message}`);
|
||||
});
|
||||
|
||||
// Handle graceful shutdown
|
||||
process.on('SIGTERM', async () => {
|
||||
logger.log('info', 'SIGTERM received, shutting down PgBoss...');
|
||||
|
||||
@@ -22,6 +22,7 @@ import { encrypt, decrypt } from '../utils/auth';
|
||||
import { WorkflowFile } from 'maxun-core';
|
||||
import { Page } from 'playwright';
|
||||
import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable';
|
||||
import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker';
|
||||
import { pgBoss } from '../pgboss-worker';
|
||||
chromium.use(stealthPlugin());
|
||||
|
||||
@@ -761,7 +762,7 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
|
||||
|
||||
switch (runEveryUnit) {
|
||||
case 'MINUTES':
|
||||
cronExpression = `${startMinutes} */${runEvery} * * *`;
|
||||
cronExpression = `*/${runEvery} * * * *`;
|
||||
break;
|
||||
case 'HOURS':
|
||||
cronExpression = `${startMinutes} */${runEvery} * * *`;
|
||||
@@ -774,7 +775,7 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
|
||||
break;
|
||||
case 'MONTHS':
|
||||
// todo: handle leap year
|
||||
cronExpression = `0 ${atTimeStart} ${dayOfMonth} * *`;
|
||||
cronExpression = `${startMinutes} ${startHours} ${dayOfMonth} */${runEvery} *`;
|
||||
if (startFrom !== 'SUNDAY') {
|
||||
cronExpression += ` ${dayIndex}`;
|
||||
}
|
||||
@@ -792,17 +793,13 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
|
||||
return res.status(401).json({ error: 'Unauthorized' });
|
||||
}
|
||||
|
||||
// Create the job in the queue with the cron expression
|
||||
const job = await workflowQueue.add(
|
||||
'run workflow',
|
||||
{ id, runId: uuid(), userId: req.user.id },
|
||||
{
|
||||
repeat: {
|
||||
pattern: cronExpression,
|
||||
tz: timezone,
|
||||
},
|
||||
}
|
||||
);
|
||||
try {
|
||||
await cancelScheduledWorkflow(id);
|
||||
} catch (cancelError) {
|
||||
logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`);
|
||||
}
|
||||
|
||||
const jobId = await scheduleWorkflow(id, req.user.id, cronExpression, timezone);
|
||||
|
||||
const nextRunAt = computeNextRun(cronExpression, timezone);
|
||||
|
||||
@@ -877,12 +874,12 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest,
|
||||
return res.status(404).json({ error: 'Robot not found' });
|
||||
}
|
||||
|
||||
// Remove existing job from queue if it exists
|
||||
const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']);
|
||||
for (const job of existingJobs) {
|
||||
if (job.data.id === id) {
|
||||
await job.remove();
|
||||
}
|
||||
// Cancel the scheduled job in PgBoss
|
||||
try {
|
||||
await cancelScheduledWorkflow(id);
|
||||
} catch (error) {
|
||||
logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`);
|
||||
// Continue with robot update even if cancellation fails
|
||||
}
|
||||
|
||||
// Delete the schedule from the robot
|
||||
|
||||
209
server/src/schedule-worker.ts
Normal file
209
server/src/schedule-worker.ts
Normal file
@@ -0,0 +1,209 @@
|
||||
/**
|
||||
* Worker process focused solely on scheduling logic
|
||||
*/
|
||||
import PgBoss, { Job } from 'pg-boss';
|
||||
import logger from './logger';
|
||||
import Robot from './models/Robot';
|
||||
import { handleRunRecording } from './workflow-management/scheduler';
|
||||
import { computeNextRun } from './utils/schedule';
|
||||
import { capture } from './utils/analytics';
|
||||
|
||||
const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
|
||||
|
||||
const pgBoss = new PgBoss({connectionString: pgBossConnectionString });
|
||||
|
||||
const registeredQueues = new Set<string>();
|
||||
|
||||
interface ScheduledWorkflowData {
|
||||
id: string;
|
||||
runId: string;
|
||||
userId: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility function to schedule a cron job using PgBoss
|
||||
* @param id The robot ID
|
||||
* @param userId The user ID
|
||||
* @param cronExpression The cron expression for scheduling
|
||||
* @param timezone The timezone for the cron expression
|
||||
*/
|
||||
export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> {
|
||||
try {
|
||||
const runId = require('uuidv4').uuid();
|
||||
|
||||
const queueName = `scheduled-workflow-${id}`;
|
||||
|
||||
logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`);
|
||||
|
||||
await pgBoss.createQueue(queueName);
|
||||
|
||||
await pgBoss.schedule(queueName, cronExpression,
|
||||
{ id, runId, userId },
|
||||
{ tz: timezone }
|
||||
);
|
||||
|
||||
await registerWorkerForQueue(queueName);
|
||||
|
||||
logger.log('info', `Scheduled workflow job for robot ${id}`);
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to schedule workflow: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility function to cancel a scheduled job
|
||||
* @param robotId The robot ID
|
||||
* @returns true if successful
|
||||
*/
|
||||
export async function cancelScheduledWorkflow(robotId: string) {
|
||||
try {
|
||||
const jobs = await pgBoss.getSchedules();
|
||||
|
||||
const matchingJobs = jobs.filter((job: any) => {
|
||||
try {
|
||||
const data = job.data;
|
||||
return data && data.id === robotId;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
for (const job of matchingJobs) {
|
||||
logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`);
|
||||
await pgBoss.unschedule(job.name);
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a scheduled workflow job
|
||||
*/
|
||||
async function processScheduledWorkflow(job: Job<ScheduledWorkflowData>) {
|
||||
const { id, runId, userId } = job.data;
|
||||
logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`);
|
||||
|
||||
try {
|
||||
// Execute the workflow using the existing handleRunRecording function
|
||||
const result = await handleRunRecording(id, userId);
|
||||
|
||||
// Update the robot's schedule with last run and next run times
|
||||
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
||||
if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) {
|
||||
// Update lastRunAt to the current time
|
||||
const lastRunAt = new Date();
|
||||
|
||||
// Compute the next run date
|
||||
const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined;
|
||||
|
||||
await robot.update({
|
||||
schedule: {
|
||||
...robot.schedule,
|
||||
lastRunAt,
|
||||
nextRunAt,
|
||||
},
|
||||
});
|
||||
|
||||
logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`);
|
||||
} else {
|
||||
logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`);
|
||||
}
|
||||
|
||||
return { success: true };
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Scheduled workflow job failed: ${errorMessage}`);
|
||||
return { success: false };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a worker to handle scheduled workflow jobs
|
||||
*/
|
||||
async function registerScheduledWorkflowWorker() {
|
||||
try {
|
||||
const jobs = await pgBoss.getSchedules();
|
||||
for (const job of jobs) {
|
||||
await pgBoss.createQueue(job.name);
|
||||
await registerWorkerForQueue(job.name);
|
||||
}
|
||||
|
||||
logger.log('info', 'Scheduled workflow workers registered successfully');
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a worker for a specific queue
|
||||
*/
|
||||
async function registerWorkerForQueue(queueName: string) {
|
||||
try {
|
||||
if (registeredQueues.has(queueName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
await pgBoss.work(queueName, async (job: Job<ScheduledWorkflowData> | Job<ScheduledWorkflowData>[]) => {
|
||||
try {
|
||||
const singleJob = Array.isArray(job) ? job[0] : job;
|
||||
return await processScheduledWorkflow(singleJob);
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
registeredQueues.add(queueName);
|
||||
logger.log('info', `Registered worker for queue: ${queueName}`);
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize PgBoss and register scheduling workers
|
||||
*/
|
||||
async function startScheduleWorker() {
|
||||
try {
|
||||
logger.log('info', 'Starting PgBoss scheduling worker...');
|
||||
await pgBoss.start();
|
||||
logger.log('info', 'PgBoss scheduling worker started successfully');
|
||||
|
||||
// Register the scheduled workflow worker
|
||||
await registerScheduledWorkflowWorker();
|
||||
|
||||
logger.log('info', 'Scheduling worker registered successfully');
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to start PgBoss scheduling worker: ${errorMessage}`);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
startScheduleWorker();
|
||||
|
||||
pgBoss.on('error', (error) => {
|
||||
logger.log('error', `PgBoss scheduler error: ${error.message}`);
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...');
|
||||
await pgBoss.stop();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', async () => {
|
||||
logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...');
|
||||
await pgBoss.stop();
|
||||
process.exit(0);
|
||||
});
|
||||
@@ -97,7 +97,7 @@ readdirSync(path.join(__dirname, 'api')).forEach((r) => {
|
||||
});
|
||||
|
||||
const isProduction = process.env.NODE_ENV === 'production';
|
||||
const workerPath = path.resolve(__dirname, isProduction ? './worker.js' : './worker.ts');
|
||||
const workerPath = path.resolve(__dirname, isProduction ? './schedule-worker.js' : './schedule-worker.ts');
|
||||
const recordingWorkerPath = path.resolve(__dirname, isProduction ? './pgboss-worker.js' : './pgboss-worker.ts');
|
||||
|
||||
let workerProcess: any;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Namespace, Socket } from 'socket.io';
|
||||
import { IncomingMessage } from 'http';
|
||||
import { verify, JwtPayload } from 'jsonwebtoken';
|
||||
import { verify, JwtPayload, sign } from 'jsonwebtoken';
|
||||
import logger from "../logger";
|
||||
import registerInputHandlers from '../browser-management/inputHandlers';
|
||||
|
||||
@@ -12,48 +12,85 @@ interface AuthenticatedSocket extends Socket {
|
||||
request: AuthenticatedIncomingMessage;
|
||||
}
|
||||
|
||||
declare global {
|
||||
var userContextMap: Map<string, string>;
|
||||
}
|
||||
|
||||
if (!global.userContextMap) {
|
||||
global.userContextMap = new Map<string, string>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Register browser-user association in the global context map
|
||||
*/
|
||||
export function registerBrowserUserContext(browserId: string, userId: string) {
|
||||
if (!global.userContextMap) {
|
||||
global.userContextMap = new Map<string, string>();
|
||||
}
|
||||
global.userContextMap.set(browserId, userId);
|
||||
logger.log('debug', `Registered browser-user association: ${browserId} -> ${userId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Socket.io middleware for authentication
|
||||
* This is a socket.io specific auth handler that doesn't rely on Express middleware
|
||||
*/
|
||||
const socketAuthMiddleware = (socket: Socket, next: (err?: Error) => void) => {
|
||||
const cookies = socket.handshake.headers.cookie;
|
||||
if (!cookies) {
|
||||
return next(new Error('Authentication required'));
|
||||
// Extract browserId from namespace
|
||||
const namespace = socket.nsp.name;
|
||||
const browserId = namespace.slice(1);
|
||||
|
||||
// Check if this browser is in our context map
|
||||
if (global.userContextMap && global.userContextMap.has(browserId)) {
|
||||
const userId = global.userContextMap.get(browserId);
|
||||
logger.log('debug', `Found browser in context map: ${browserId} -> ${userId}`);
|
||||
|
||||
const authSocket = socket as AuthenticatedSocket;
|
||||
authSocket.request.user = { id: userId };
|
||||
return next();
|
||||
}
|
||||
|
||||
const cookies = socket.handshake.headers.cookie;
|
||||
if (!cookies) {
|
||||
logger.log('debug', `No cookies found in socket handshake for ${browserId}`);
|
||||
return next(new Error('Authentication required'));
|
||||
}
|
||||
|
||||
const tokenMatch = cookies.split(';').find(c => c.trim().startsWith('token='));
|
||||
if (!tokenMatch) {
|
||||
logger.log('debug', `No token cookie found in socket handshake for ${browserId}`);
|
||||
return next(new Error('Authentication required'));
|
||||
}
|
||||
|
||||
const token = tokenMatch.split('=')[1];
|
||||
if (!token) {
|
||||
logger.log('debug', `Empty token value in cookie for ${browserId}`);
|
||||
return next(new Error('Authentication required'));
|
||||
}
|
||||
|
||||
const secret = process.env.JWT_SECRET;
|
||||
if (!secret) {
|
||||
logger.error('JWT_SECRET environment variable is not defined');
|
||||
return next(new Error('Server configuration error'));
|
||||
}
|
||||
|
||||
verify(token, secret, (err: any, user: any) => {
|
||||
if (err) {
|
||||
logger.log('warn', `JWT verification error: ${err.message}`);
|
||||
return next(new Error('Authentication failed'));
|
||||
}
|
||||
|
||||
const tokenMatch = cookies.split(';').find(c => c.trim().startsWith('token='));
|
||||
if (!tokenMatch) {
|
||||
return next(new Error('Authentication required'));
|
||||
// Normalize payload key
|
||||
if (user.userId && !user.id) {
|
||||
user.id = user.userId;
|
||||
delete user.userId;
|
||||
}
|
||||
|
||||
const token = tokenMatch.split('=')[1];
|
||||
if (!token) {
|
||||
return next(new Error('Authentication required'));
|
||||
}
|
||||
|
||||
const secret = process.env.JWT_SECRET;
|
||||
if (!secret) {
|
||||
return next(new Error('Server configuration error'));
|
||||
}
|
||||
|
||||
verify(token, secret, (err: any, user: any) => {
|
||||
if (err) {
|
||||
logger.log('warn', 'JWT verification error:', err);
|
||||
return next(new Error('Authentication failed'));
|
||||
}
|
||||
|
||||
// Normalize payload key
|
||||
if (user.userId && !user.id) {
|
||||
user.id = user.userId;
|
||||
delete user.userId; // temporary: del the old key for clarity
|
||||
}
|
||||
|
||||
// Attach user to socket request
|
||||
const authSocket = socket as AuthenticatedSocket;
|
||||
authSocket.request.user = user;
|
||||
next();
|
||||
});
|
||||
// Attach user to socket request
|
||||
const authSocket = socket as AuthenticatedSocket;
|
||||
authSocket.request.user = user;
|
||||
next();
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -114,7 +114,7 @@ async function executeRun(id: string, userId: string) {
|
||||
|
||||
plainRun.status = 'running';
|
||||
|
||||
const browser = browserPool.getRemoteBrowser(userId);
|
||||
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
|
||||
if (!browser) {
|
||||
throw new Error('Could not access browser');
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user