feat: set max limit + cleanup schedule utils
This commit is contained in:
@@ -58,7 +58,8 @@ interface AbortRunData {
|
|||||||
|
|
||||||
const pgBoss = new PgBoss({
|
const pgBoss = new PgBoss({
|
||||||
connectionString: pgBossConnectionString,
|
connectionString: pgBossConnectionString,
|
||||||
expireInHours: 23
|
expireInHours: 23,
|
||||||
|
max: 3,
|
||||||
});
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -14,7 +14,11 @@ if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST ||
|
|||||||
|
|
||||||
const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIComponent(process.env.DB_PASSWORD)}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
|
const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIComponent(process.env.DB_PASSWORD)}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
|
||||||
|
|
||||||
const pgBoss = new PgBoss({connectionString: pgBossConnectionString });
|
const pgBoss = new PgBoss({
|
||||||
|
connectionString: pgBossConnectionString,
|
||||||
|
max: 5,
|
||||||
|
expireInHours: 23,
|
||||||
|
});
|
||||||
|
|
||||||
const registeredQueues = new Set<string>();
|
const registeredQueues = new Set<string>();
|
||||||
|
|
||||||
@@ -23,70 +27,6 @@ interface ScheduledWorkflowData {
|
|||||||
runId: string;
|
runId: string;
|
||||||
userId: string;
|
userId: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Utility function to schedule a cron job using PgBoss
|
|
||||||
* @param id The robot ID
|
|
||||||
* @param userId The user ID
|
|
||||||
* @param cronExpression The cron expression for scheduling
|
|
||||||
* @param timezone The timezone for the cron expression
|
|
||||||
*/
|
|
||||||
export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise<void> {
|
|
||||||
try {
|
|
||||||
const runId = uuid();
|
|
||||||
|
|
||||||
const queueName = `scheduled-workflow-${id}`;
|
|
||||||
|
|
||||||
logger.log('info', `Scheduling workflow ${id} with cron expression ${cronExpression} in timezone ${timezone}`);
|
|
||||||
|
|
||||||
await pgBoss.createQueue(queueName);
|
|
||||||
|
|
||||||
await pgBoss.schedule(queueName, cronExpression,
|
|
||||||
{ id, runId, userId },
|
|
||||||
{ tz: timezone }
|
|
||||||
);
|
|
||||||
|
|
||||||
await registerWorkerForQueue(queueName);
|
|
||||||
|
|
||||||
logger.log('info', `Scheduled workflow job for robot ${id}`);
|
|
||||||
} catch (error: unknown) {
|
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
||||||
logger.log('error', `Failed to schedule workflow: ${errorMessage}`);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Utility function to cancel a scheduled job
|
|
||||||
* @param robotId The robot ID
|
|
||||||
* @returns true if successful
|
|
||||||
*/
|
|
||||||
export async function cancelScheduledWorkflow(robotId: string) {
|
|
||||||
try {
|
|
||||||
const jobs = await pgBoss.getSchedules();
|
|
||||||
|
|
||||||
const matchingJobs = jobs.filter((job: any) => {
|
|
||||||
try {
|
|
||||||
const data = job.data;
|
|
||||||
return data && data.id === robotId;
|
|
||||||
} catch {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (const job of matchingJobs) {
|
|
||||||
logger.log('info', `Cancelling scheduled job ${job.name} for robot ${robotId}`);
|
|
||||||
await pgBoss.unschedule(job.name);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (error: unknown) {
|
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
||||||
logger.log('error', `Failed to cancel scheduled workflow: ${errorMessage}`);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a scheduled workflow job
|
* Process a scheduled workflow job
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user