feat: compute next run at
This commit is contained in:
@@ -14,6 +14,7 @@ import Run from '../models/Run';
|
|||||||
import { BinaryOutputService } from '../storage/mino';
|
import { BinaryOutputService } from '../storage/mino';
|
||||||
import { workflowQueue } from '../worker';
|
import { workflowQueue } from '../worker';
|
||||||
import { AuthenticatedRequest } from './record';
|
import { AuthenticatedRequest } from './record';
|
||||||
|
import computeNextRun from '../utils/schedule';
|
||||||
|
|
||||||
export const router = Router();
|
export const router = Router();
|
||||||
|
|
||||||
@@ -248,59 +249,29 @@ router.post('/runs/run/:id', requireSignIn, async (req, res) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
||||||
console.log(req.body);
|
|
||||||
try {
|
try {
|
||||||
const { id } = req.params;
|
const { id } = req.params;
|
||||||
const {
|
const { runEvery, runEveryUnit, startFrom, atTimeStart, atTimeEnd, timezone } = req.body;
|
||||||
// enabled = true,
|
|
||||||
runEvery,
|
|
||||||
runEveryUnit,
|
|
||||||
startFrom,
|
|
||||||
atTimeStart,
|
|
||||||
atTimeEnd,
|
|
||||||
timezone
|
|
||||||
} = req.body;
|
|
||||||
|
|
||||||
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
||||||
if (!robot) {
|
if (!robot) {
|
||||||
return res.status(404).json({ error: 'Robot not found' });
|
return res.status(404).json({ error: 'Robot not found' });
|
||||||
}
|
}
|
||||||
|
|
||||||
// If disabled, remove scheduling
|
// Validate required parameters
|
||||||
// if (!enabled) {
|
if (!runEvery || !runEveryUnit || !startFrom || !atTimeStart || !atTimeEnd || !timezone) {
|
||||||
// // Remove existing job from queue if it exists
|
|
||||||
// const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']);
|
|
||||||
// for (const job of existingJobs) {
|
|
||||||
// if (job.data.id === id) {
|
|
||||||
// await job.remove();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Update robot to disable scheduling
|
|
||||||
// await robot.update({
|
|
||||||
// schedule: null
|
|
||||||
// });
|
|
||||||
|
|
||||||
// return res.status(200).json({
|
|
||||||
// message: 'Schedule disabled successfully'
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (!id || !runEvery || !runEveryUnit || !startFrom || !timezone || (runEveryUnit === 'HOURS' || runEveryUnit === 'MINUTES') && (!atTimeStart || !atTimeEnd)) {
|
|
||||||
return res.status(400).json({ error: 'Missing required parameters' });
|
return res.status(400).json({ error: 'Missing required parameters' });
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!['HOURS', 'DAYS', 'WEEKS', 'MONTHS', 'MINUTES'].includes(runEveryUnit)) {
|
// Validate time zone
|
||||||
return res.status(400).json({ error: 'Invalid runEvery unit' });
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!moment.tz.zone(timezone)) {
|
if (!moment.tz.zone(timezone)) {
|
||||||
return res.status(400).json({ error: 'Invalid timezone' });
|
return res.status(400).json({ error: 'Invalid timezone' });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate and parse start and end times
|
||||||
const [startHours, startMinutes] = atTimeStart.split(':').map(Number);
|
const [startHours, startMinutes] = atTimeStart.split(':').map(Number);
|
||||||
const [endHours, endMinutes] = atTimeEnd.split(':').map(Number);
|
const [endHours, endMinutes] = atTimeEnd.split(':').map(Number);
|
||||||
|
|
||||||
if (isNaN(startHours) || isNaN(startMinutes) || isNaN(endHours) || isNaN(endMinutes) ||
|
if (isNaN(startHours) || isNaN(startMinutes) || isNaN(endHours) || isNaN(endMinutes) ||
|
||||||
startHours < 0 || startHours > 23 || startMinutes < 0 || startMinutes > 59 ||
|
startHours < 0 || startHours > 23 || startMinutes < 0 || startMinutes > 59 ||
|
||||||
endHours < 0 || endHours > 23 || endMinutes < 0 || endMinutes > 59) {
|
endHours < 0 || endHours > 23 || endMinutes < 0 || endMinutes > 59) {
|
||||||
@@ -312,62 +283,56 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
|
|||||||
return res.status(400).json({ error: 'Invalid start day' });
|
return res.status(400).json({ error: 'Invalid start day' });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build cron expression based on run frequency and starting day
|
||||||
let cronExpression;
|
let cronExpression;
|
||||||
|
const dayIndex = days.indexOf(startFrom);
|
||||||
|
|
||||||
switch (runEveryUnit) {
|
switch (runEveryUnit) {
|
||||||
case 'MINUTES':
|
case 'MINUTES':
|
||||||
case 'HOURS':
|
|
||||||
cronExpression = `${startMinutes}-${endMinutes} */${runEvery} * * *`;
|
cronExpression = `${startMinutes}-${endMinutes} */${runEvery} * * *`;
|
||||||
break;
|
break;
|
||||||
|
case 'HOURS':
|
||||||
|
cronExpression = `${startMinutes} ${startHours}-${endHours} */${runEvery} * *`;
|
||||||
|
break;
|
||||||
case 'DAYS':
|
case 'DAYS':
|
||||||
cronExpression = `${startMinutes} ${startHours} */${runEvery} * *`;
|
cronExpression = `${startMinutes} ${startHours} */${runEvery} * *`;
|
||||||
break;
|
break;
|
||||||
case 'WEEKS':
|
case 'WEEKS':
|
||||||
const dayIndex = days.indexOf(startFrom);
|
cronExpression = `${startMinutes} ${startHours} * * ${dayIndex}`;
|
||||||
cronExpression = `${startMinutes} ${startHours} * * ${dayIndex}/${7 * runEvery}`;
|
|
||||||
break;
|
break;
|
||||||
case 'MONTHS':
|
case 'MONTHS':
|
||||||
cronExpression = `${startMinutes} ${startHours} 1-7 */${runEvery} *`;
|
cronExpression = `${startMinutes} ${startHours} 1-7 * *`;
|
||||||
if (startFrom !== 'SUNDAY') {
|
if (startFrom !== 'SUNDAY') {
|
||||||
const dayIndex = days.indexOf(startFrom);
|
|
||||||
cronExpression += ` ${dayIndex}`;
|
cronExpression += ` ${dayIndex}`;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
return res.status(400).json({ error: 'Invalid runEveryUnit' });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate cron expression
|
||||||
if (!cronExpression || !cron.validate(cronExpression)) {
|
if (!cronExpression || !cron.validate(cronExpression)) {
|
||||||
return res.status(400).json({ error: 'Invalid cron expression generated' });
|
return res.status(400).json({ error: 'Invalid cron expression generated' });
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!req.user) {
|
if (!req.user) {
|
||||||
return res.status(401).send({ error: 'Unauthorized' });
|
return res.status(401).json({ error: 'Unauthorized' });
|
||||||
}
|
}
|
||||||
|
|
||||||
const runId = uuid();
|
// Create the job in the queue with the cron expression
|
||||||
const userId = req.user.id;
|
|
||||||
|
|
||||||
// Remove existing jobs for this robot just in case some were left
|
|
||||||
// const existingJobs = await workflowQueue.getJobs(['delayed', 'waiting']);
|
|
||||||
// for (const job of existingJobs) {
|
|
||||||
// if (job.data.id === id) {
|
|
||||||
// await job.remove();
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Add new job
|
|
||||||
const job = await workflowQueue.add(
|
const job = await workflowQueue.add(
|
||||||
'run workflow',
|
'run workflow',
|
||||||
{ id, runId, userId },
|
{ id, runId: uuid(), userId: req.user.id },
|
||||||
{
|
{
|
||||||
repeat: {
|
repeat: {
|
||||||
pattern: cronExpression,
|
pattern: cronExpression,
|
||||||
tz: timezone
|
tz: timezone,
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
const nextRun = job.timestamp;
|
const nextRunAt = computeNextRun(cronExpression, timezone);
|
||||||
|
|
||||||
// Update robot with schedule details
|
|
||||||
await robot.update({
|
await robot.update({
|
||||||
schedule: {
|
schedule: {
|
||||||
runEvery,
|
runEvery,
|
||||||
@@ -378,8 +343,8 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
|
|||||||
timezone,
|
timezone,
|
||||||
cronExpression,
|
cronExpression,
|
||||||
lastRunAt: undefined,
|
lastRunAt: undefined,
|
||||||
nextRunAt: new Date(nextRun)
|
nextRunAt,
|
||||||
}
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
// Fetch updated schedule details after setting it
|
// Fetch updated schedule details after setting it
|
||||||
@@ -387,16 +352,15 @@ router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, re
|
|||||||
|
|
||||||
res.status(200).json({
|
res.status(200).json({
|
||||||
message: 'success',
|
message: 'success',
|
||||||
runId,
|
robot: updatedRobot,
|
||||||
robot: updatedRobot
|
|
||||||
});
|
});
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error scheduling workflow:', error);
|
console.error('Error scheduling workflow:', error);
|
||||||
res.status(500).json({ error: 'Failed to schedule workflow' });
|
res.status(500).json({ error: 'Failed to schedule workflow' });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
// Endpoint to get schedule details
|
// Endpoint to get schedule details
|
||||||
router.get('/schedule/:id', requireSignIn, async (req, res) => {
|
router.get('/schedule/:id', requireSignIn, async (req, res) => {
|
||||||
try {
|
try {
|
||||||
|
|||||||
Reference in New Issue
Block a user