feat: add scheduler pgboss worker functions
This commit is contained in:
@@ -21,9 +21,13 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma
|
|||||||
import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
|
import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
|
||||||
import { RemoteBrowser } from './browser-management/classes/RemoteBrowser';
|
import { RemoteBrowser } from './browser-management/classes/RemoteBrowser';
|
||||||
import { io as serverIo } from "./server";
|
import { io as serverIo } from "./server";
|
||||||
|
import { computeNextRun } from './utils/schedule';
|
||||||
|
import { handleRunRecording } from './workflow-management/scheduler';
|
||||||
|
|
||||||
const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
|
const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
|
||||||
|
|
||||||
|
const registeredQueues = new Set<string>();
|
||||||
|
|
||||||
interface InitializeBrowserData {
|
interface InitializeBrowserData {
|
||||||
userId: string;
|
userId: string;
|
||||||
}
|
}
|
||||||
@@ -41,6 +45,12 @@ interface DestroyBrowserData {
|
|||||||
userId: string;
|
userId: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface ScheduledWorkflowData {
|
||||||
|
id: string;
|
||||||
|
runId: string;
|
||||||
|
userId: string;
|
||||||
|
}
|
||||||
|
|
||||||
interface ExecuteRunData {
|
interface ExecuteRunData {
|
||||||
userId: string;
|
userId: string;
|
||||||
runId: string;
|
runId: string;
|
||||||
@@ -161,6 +171,69 @@ async function checkAndProcessQueuedRun(userId: string, browserId: string): Prom
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility function to schedule a cron job using PgBoss
|
||||||
|
* @param id The robot ID
|
||||||
|
* @param userId The user ID
|
||||||
|
* @param cronExpression The cron expression for scheduling
|
||||||
|
* @param timezone The timezone for the cron expression
|
||||||
|
*/
|
||||||
|
export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const runId = require('uuidv4').uuid();
|
||||||
|
|
||||||
|
const queueName = `scheduled-workflow-${id}`;
|
||||||
|
|
||||||
|
logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`);
|
||||||
|
|
||||||
|
await pgBoss.createQueue(queueName);
|
||||||
|
|
||||||
|
await pgBoss.schedule(queueName, cronExpression,
|
||||||
|
{ id, runId, userId },
|
||||||
|
{ tz: timezone }
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.log('info', `Scheduled workflow job for robot ${id}`);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
logger.log('error', `Failed to schedule workflow: ${errorMessage}`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility function to cancel a scheduled job
|
||||||
|
* @param robotId The robot ID
|
||||||
|
* @returns true if successful
|
||||||
|
*/
|
||||||
|
export async function cancelScheduledWorkflow(robotId: string) {
|
||||||
|
try {
|
||||||
|
const jobs = await pgBoss.getSchedules();
|
||||||
|
|
||||||
|
console.log("Scheduled JOBS", jobs);
|
||||||
|
|
||||||
|
const matchingJobs = jobs.filter((job: any) => {
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(job.data);
|
||||||
|
return data && data.id === robotId;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const job of matchingJobs) {
|
||||||
|
logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`);
|
||||||
|
await pgBoss.unschedule(job.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Modified processRunExecution function - only add browser reset
|
* Modified processRunExecution function - only add browser reset
|
||||||
*/
|
*/
|
||||||
@@ -359,6 +432,107 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a scheduled workflow job
|
||||||
|
*/
|
||||||
|
async function processScheduledWorkflow(job: Job<ScheduledWorkflowData>) {
|
||||||
|
const { id, runId, userId } = job.data;
|
||||||
|
logger.log('info', `Processing scheduled workflow job for robotId: ${id}, runId: ${runId}, userId: ${userId}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Execute the workflow using the existing handleRunRecording function
|
||||||
|
const result = await handleRunRecording(id, userId);
|
||||||
|
|
||||||
|
// Update the robot's schedule with last run and next run times
|
||||||
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
||||||
|
if (robot && robot.schedule && robot.schedule.cronExpression && robot.schedule.timezone) {
|
||||||
|
// Update lastRunAt to the current time
|
||||||
|
const lastRunAt = new Date();
|
||||||
|
|
||||||
|
// Compute the next run date
|
||||||
|
const nextRunAt = computeNextRun(robot.schedule.cronExpression, robot.schedule.timezone) || undefined;
|
||||||
|
|
||||||
|
await robot.update({
|
||||||
|
schedule: {
|
||||||
|
...robot.schedule,
|
||||||
|
lastRunAt,
|
||||||
|
nextRunAt,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.log('info', `Updated robot ${id} schedule - next run at: ${nextRunAt}`);
|
||||||
|
} else {
|
||||||
|
logger.log('error', `Robot ${id} schedule, cronExpression, or timezone is missing.`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return { success: true };
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
logger.log('error', `Scheduled workflow job failed: ${errorMessage}`);
|
||||||
|
return { success: false };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register a worker to handle scheduled workflow jobs
|
||||||
|
*/
|
||||||
|
async function registerScheduledWorkflowWorker() {
|
||||||
|
try {
|
||||||
|
// First, get a list of all existing robots
|
||||||
|
const robots = await Robot.findAll({
|
||||||
|
attributes: ['recording_meta.id'],
|
||||||
|
raw: true
|
||||||
|
});
|
||||||
|
|
||||||
|
// Register a worker for each potential robot queue
|
||||||
|
for (const robot of robots) {
|
||||||
|
if (robot.recording_meta && robot.recording_meta.id) {
|
||||||
|
const queueName = `scheduled-workflow-${robot.recording_meta.id}`;
|
||||||
|
await registerWorkerForQueue(queueName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also register workers for any existing PgBoss queues that follow our naming pattern
|
||||||
|
const queues = await pgBoss.getQueues();
|
||||||
|
for (const queue of queues) {
|
||||||
|
if (queue.name.startsWith('scheduled-workflow-') &&
|
||||||
|
!queue.name.endsWith('_error') &&
|
||||||
|
!queue.name.endsWith('_completed')) {
|
||||||
|
await registerWorkerForQueue(queue.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.log('info', 'Scheduled workflow workers registered successfully');
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
logger.log('error', `Failed to register scheduled workflow workers: ${errorMessage}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function registerWorkerForQueue(queueName: string) {
|
||||||
|
try {
|
||||||
|
if (registeredQueues.has(queueName)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await pgBoss.work(queueName, async (job: Job<ScheduledWorkflowData> | Job<ScheduledWorkflowData>[]) => {
|
||||||
|
try {
|
||||||
|
const singleJob = Array.isArray(job) ? job[0] : job;
|
||||||
|
return await processScheduledWorkflow(singleJob);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
logger.log('error', `Scheduled workflow job failed in queue ${queueName}: ${errorMessage}`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
registeredQueues.add(queueName);
|
||||||
|
logger.log('info', `Registered worker for queue: ${queueName}`);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
|
logger.log('error', `Failed to register worker for queue ${queueName}: ${errorMessage}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function registerRunExecutionWorker() {
|
async function registerRunExecutionWorker() {
|
||||||
try {
|
try {
|
||||||
@@ -495,6 +669,9 @@ async function startWorkers() {
|
|||||||
// Register the run execution worker
|
// Register the run execution worker
|
||||||
await registerRunExecutionWorker();
|
await registerRunExecutionWorker();
|
||||||
|
|
||||||
|
// Register the scheduled workflow worker
|
||||||
|
await registerScheduledWorkflowWorker();
|
||||||
|
|
||||||
logger.log('info', 'All recording workers registered successfully');
|
logger.log('info', 'All recording workers registered successfully');
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
@@ -506,6 +683,10 @@ async function startWorkers() {
|
|||||||
// Start all workers
|
// Start all workers
|
||||||
startWorkers();
|
startWorkers();
|
||||||
|
|
||||||
|
pgBoss.on('error', (error) => {
|
||||||
|
logger.log('error', `PgBoss error: ${error.message}`);
|
||||||
|
});
|
||||||
|
|
||||||
// Handle graceful shutdown
|
// Handle graceful shutdown
|
||||||
process.on('SIGTERM', async () => {
|
process.on('SIGTERM', async () => {
|
||||||
logger.log('info', 'SIGTERM received, shutting down PgBoss...');
|
logger.log('info', 'SIGTERM received, shutting down PgBoss...');
|
||||||
|
|||||||
Reference in New Issue
Block a user