diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index f5d719b4..bb33ffe5 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -58,7 +58,8 @@ interface AbortRunData { const pgBoss = new PgBoss({ connectionString: pgBossConnectionString, - expireInHours: 23 + expireInHours: 23, + max: 3, }); /** diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts index c75770e4..da3f9dd4 100644 --- a/server/src/schedule-worker.ts +++ b/server/src/schedule-worker.ts @@ -14,7 +14,11 @@ if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIComponent(process.env.DB_PASSWORD)}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; -const pgBoss = new PgBoss({connectionString: pgBossConnectionString }); +const pgBoss = new PgBoss({ + connectionString: pgBossConnectionString, + max: 5, + expireInHours: 23, + }); const registeredQueues = new Set(); @@ -23,70 +27,6 @@ interface ScheduledWorkflowData { runId: string; userId: string; } - -/** - * Utility function to schedule a cron job using PgBoss - * @param id The robot ID - * @param userId The user ID - * @param cronExpression The cron expression for scheduling - * @param timezone The timezone for the cron expression - */ -export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { - try { - const runId = uuid(); - - const queueName = `scheduled-workflow-${id}`; - - logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`); - - await pgBoss.createQueue(queueName); - - await pgBoss.schedule(queueName, cronExpression, - { id, runId, userId }, - { tz: timezone } - ); - - await registerWorkerForQueue(queueName); - - logger.log('info', `Scheduled workflow job for robot ${id}`); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to schedule workflow: ${errorMessage}`); - throw error; - } -} - -/** - * Utility function to cancel a scheduled job - * @param robotId The robot ID - * @returns true if successful - */ -export async function cancelScheduledWorkflow(robotId: string) { - try { - const jobs = await pgBoss.getSchedules(); - - const matchingJobs = jobs.filter((job: any) => { - try { - const data = job.data; - return data && data.id === robotId; - } catch { - return false; - } - }); - - for (const job of matchingJobs) { - logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`); - await pgBoss.unschedule(job.name); - } - - return true; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`); - throw error; - } -} - /** * Process a scheduled workflow job */