feat: cron expressions
This commit is contained in:
@@ -11,6 +11,8 @@ import { browserPool } from "../server";
|
||||
import fs from "fs";
|
||||
import { uuid } from "uuidv4";
|
||||
import { workflowQueue } from '../workflow-management/scheduler';
|
||||
import moment from 'moment-timezone';
|
||||
import cron from 'node-cron';
|
||||
|
||||
export const router = Router();
|
||||
|
||||
@@ -194,27 +196,98 @@ router.post('/runs/run/:fileName/:runId', async (req, res) => {
|
||||
|
||||
router.put('/schedule/:fileName/', async (req, res) => {
|
||||
try {
|
||||
const runId = uuid();
|
||||
|
||||
const { fileName } = req.params;
|
||||
const { scheduleTime } = req.body;
|
||||
|
||||
if (!fileName || !runId || !scheduleTime) {
|
||||
const {
|
||||
frequency,
|
||||
frequencyUnit,
|
||||
startDay,
|
||||
time,
|
||||
timezone
|
||||
} = req.body;
|
||||
|
||||
if (!fileName || !frequency || !frequencyUnit || !startDay || !time || !timezone) {
|
||||
return res.status(400).json({ error: 'Missing required parameters' });
|
||||
}
|
||||
|
||||
const delay = new Date(scheduleTime).getTime() - Date.now();
|
||||
|
||||
// Add job to the queue with delay
|
||||
await workflowQueue.add('run workflow', { fileName, runId }, { delay: Math.max(0, delay) });
|
||||
|
||||
res.status(200).json({ message: 'Workflow scheduled successfully' });
|
||||
// Validate inputs
|
||||
if (!['HOURS', 'DAYS', 'WEEKS', 'MONTHS'].includes(frequencyUnit)) {
|
||||
return res.status(400).json({ error: 'Invalid frequency unit' });
|
||||
}
|
||||
|
||||
if (!moment.tz.zone(timezone)) {
|
||||
return res.status(400).json({ error: 'Invalid timezone' });
|
||||
}
|
||||
|
||||
const [hours, minutes] = time.split(':').map(Number);
|
||||
if (isNaN(hours) || isNaN(minutes) || hours < 0 || hours > 23 || minutes < 0 || minutes > 59) {
|
||||
return res.status(400).json({ error: 'Invalid time format' });
|
||||
}
|
||||
|
||||
const days = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY'];
|
||||
if (!days.includes(startDay)) {
|
||||
return res.status(400).json({ error: 'Invalid start day' });
|
||||
}
|
||||
|
||||
// Generate cron expression
|
||||
let cronExpression;
|
||||
switch (frequencyUnit) {
|
||||
case 'HOURS':
|
||||
cronExpression = `${minutes} */${frequency} * * *`;
|
||||
break;
|
||||
case 'DAYS':
|
||||
cronExpression = `${minutes} ${hours} */${frequency} * *`;
|
||||
break;
|
||||
case 'WEEKS':
|
||||
const dayIndex = days.indexOf(startDay);
|
||||
cronExpression = `${minutes} ${hours} * * ${dayIndex}/${7 * frequency}`;
|
||||
break;
|
||||
case 'MONTHS':
|
||||
cronExpression = `${minutes} ${hours} 1-7 */${frequency} *`;
|
||||
if (startDay !== 'SUNDAY') {
|
||||
const dayIndex = days.indexOf(startDay);
|
||||
cronExpression += ` ${dayIndex}`;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (!cronExpression || !cron.validate(cronExpression)) {
|
||||
return res.status(400).json({ error: 'Invalid cron expression generated' });
|
||||
}
|
||||
|
||||
const runId = uuid();
|
||||
|
||||
// Schedule the recurring job
|
||||
await workflowQueue.add(
|
||||
'run workflow',
|
||||
{ fileName, runId },
|
||||
{
|
||||
repeat: {
|
||||
pattern: cronExpression,
|
||||
tz: timezone
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
res.status(200).json({
|
||||
message: 'Workflow scheduled successfully',
|
||||
runId,
|
||||
cronExpression,
|
||||
// nextRunTime: getNextRunTime(cronExpression, timezone)
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error scheduling workflow:', error);
|
||||
res.status(500).json({ error: 'Failed to schedule workflow' });
|
||||
}
|
||||
});
|
||||
|
||||
// function getNextRunTime(cronExpression, timezone) {
|
||||
// const schedule = cron.schedule(cronExpression, () => {}, { timezone });
|
||||
// const nextDate = schedule.nextDate();
|
||||
// schedule.stop();
|
||||
// return nextDate.toDate();
|
||||
// }
|
||||
|
||||
/**
|
||||
* POST endpoint for aborting a current interpretation of the run.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user