1238 lines
37 KiB
TypeScript
1238 lines
37 KiB
TypeScript
import { Router } from 'express';
|
|
import logger from "../logger";
|
|
import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserIdByState } from "../browser-management/controller";
|
|
import { browserPool } from "../server";
|
|
import { v4 as uuid } from "uuid";
|
|
import moment from 'moment-timezone';
|
|
import cron from 'node-cron';
|
|
import { requireSignIn } from '../middlewares/auth';
|
|
import Robot from '../models/Robot';
|
|
import Run from '../models/Run';
|
|
import { AuthenticatedRequest } from './record';
|
|
import { computeNextRun } from '../utils/schedule';
|
|
import { capture } from "../utils/analytics";
|
|
import { encrypt, decrypt } from '../utils/auth';
|
|
import { WorkflowFile } from 'maxun-core';
|
|
import { cancelScheduledWorkflow, scheduleWorkflow } from '../storage/schedule';
|
|
import { pgBossClient } from '../storage/pgboss';
|
|
|
|
export const router = Router();
|
|
|
|
export const processWorkflowActions = async (workflow: any[], checkLimit: boolean = false): Promise<any[]> => {
|
|
const processedWorkflow = JSON.parse(JSON.stringify(workflow));
|
|
|
|
processedWorkflow.forEach((pair: any) => {
|
|
pair.what.forEach((action: any) => {
|
|
// Handle limit validation for scrapeList action
|
|
if (action.action === 'scrapeList' && checkLimit && Array.isArray(action.args) && action.args.length > 0) {
|
|
const scrapeConfig = action.args[0];
|
|
if (scrapeConfig && typeof scrapeConfig === 'object' && 'limit' in scrapeConfig) {
|
|
if (typeof scrapeConfig.limit === 'number' && scrapeConfig.limit > 5) {
|
|
scrapeConfig.limit = 5;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle decryption for type and press actions
|
|
if ((action.action === 'type' || action.action === 'press') && Array.isArray(action.args) && action.args.length > 1) {
|
|
try {
|
|
const encryptedValue = action.args[1];
|
|
if (typeof encryptedValue === 'string') {
|
|
const decryptedValue = decrypt(encryptedValue);
|
|
action.args[1] = decryptedValue;
|
|
} else {
|
|
logger.log('error', 'Encrypted value is not a string');
|
|
action.args[1] = '';
|
|
}
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
logger.log('error', `Failed to decrypt input value: ${errorMessage}`);
|
|
action.args[1] = '';
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
return processedWorkflow;
|
|
}
|
|
|
|
/**
|
|
* Logs information about recordings API.
|
|
*/
|
|
router.all('/', requireSignIn, (req, res, next) => {
|
|
logger.log('debug', `The recordings API was invoked: ${req.url}`)
|
|
next() // pass control to the next handler
|
|
})
|
|
|
|
/**
|
|
* GET endpoint for getting an array of all stored recordings.
|
|
*/
|
|
router.get('/recordings', requireSignIn, async (req, res) => {
|
|
try {
|
|
const data = await Robot.findAll();
|
|
return res.send(data);
|
|
} catch (e) {
|
|
logger.log('info', 'Error while reading robots');
|
|
return res.send(null);
|
|
}
|
|
});
|
|
|
|
/**
|
|
* GET endpoint for getting a recording.
|
|
*/
|
|
router.get('/recordings/:id', requireSignIn, async (req, res) => {
|
|
try {
|
|
const data = await Robot.findOne({
|
|
where: { 'recording_meta.id': req.params.id },
|
|
raw: true
|
|
}
|
|
);
|
|
|
|
if (data?.recording?.workflow) {
|
|
data.recording.workflow = await processWorkflowActions(
|
|
data.recording.workflow,
|
|
);
|
|
}
|
|
|
|
return res.send(data);
|
|
} catch (e) {
|
|
logger.log('info', 'Error while reading robots');
|
|
return res.send(null);
|
|
}
|
|
})
|
|
|
|
router.get(('/recordings/:id/runs'), requireSignIn, async (req, res) => {
|
|
try {
|
|
const runs = await Run.findAll({
|
|
where: {
|
|
robotMetaId: req.params.id
|
|
},
|
|
raw: true
|
|
});
|
|
const formattedRuns = runs.map(formatRunResponse);
|
|
const response = {
|
|
statusCode: 200,
|
|
messageCode: "success",
|
|
runs: {
|
|
totalCount: formattedRuns.length,
|
|
items: formattedRuns,
|
|
},
|
|
};
|
|
|
|
res.status(200).json(response);
|
|
} catch (error) {
|
|
console.error("Error fetching runs:", error);
|
|
res.status(500).json({
|
|
statusCode: 500,
|
|
messageCode: "error",
|
|
message: "Failed to retrieve runs",
|
|
});
|
|
}
|
|
})
|
|
|
|
function formatRunResponse(run: any) {
|
|
const formattedRun = {
|
|
id: run.id,
|
|
status: run.status,
|
|
name: run.name,
|
|
robotId: run.robotMetaId, // Renaming robotMetaId to robotId
|
|
startedAt: run.startedAt,
|
|
finishedAt: run.finishedAt,
|
|
runId: run.runId,
|
|
runByUserId: run.runByUserId,
|
|
runByScheduleId: run.runByScheduleId,
|
|
runByAPI: run.runByAPI,
|
|
data: {},
|
|
screenshot: null,
|
|
};
|
|
|
|
if (run.serializableOutput && run.serializableOutput['item-0']) {
|
|
formattedRun.data = run.serializableOutput['item-0'];
|
|
} else if (run.binaryOutput && run.binaryOutput['item-0']) {
|
|
formattedRun.screenshot = run.binaryOutput['item-0'];
|
|
}
|
|
|
|
return formattedRun;
|
|
}
|
|
|
|
interface CredentialInfo {
|
|
value: string;
|
|
type: string;
|
|
}
|
|
|
|
interface Credentials {
|
|
[key: string]: CredentialInfo;
|
|
}
|
|
|
|
function handleWorkflowActions(workflow: any[], credentials: Credentials) {
|
|
return workflow.map(step => {
|
|
if (!step.what) return step;
|
|
|
|
const newWhat: any[] = [];
|
|
const processedSelectors = new Set<string>();
|
|
|
|
for (let i = 0; i < step.what.length; i++) {
|
|
const action = step.what[i];
|
|
|
|
if (!action?.action || !action?.args?.[0]) {
|
|
newWhat.push(action);
|
|
continue;
|
|
}
|
|
|
|
const selector = action.args[0];
|
|
const credential = credentials[selector];
|
|
|
|
if (!credential) {
|
|
newWhat.push(action);
|
|
continue;
|
|
}
|
|
|
|
if (action.action === 'click') {
|
|
newWhat.push(action);
|
|
|
|
if (!processedSelectors.has(selector) &&
|
|
i + 1 < step.what.length &&
|
|
(step.what[i + 1].action === 'type' || step.what[i + 1].action === 'press')) {
|
|
|
|
newWhat.push({
|
|
action: 'type',
|
|
args: [selector, encrypt(credential.value), credential.type]
|
|
});
|
|
|
|
newWhat.push({
|
|
action: 'waitForLoadState',
|
|
args: ['networkidle']
|
|
});
|
|
|
|
processedSelectors.add(selector);
|
|
|
|
while (i + 1 < step.what.length &&
|
|
(step.what[i + 1].action === 'type' ||
|
|
step.what[i + 1].action === 'press' ||
|
|
step.what[i + 1].action === 'waitForLoadState')) {
|
|
i++;
|
|
}
|
|
}
|
|
} else if ((action.action === 'type' || action.action === 'press') &&
|
|
!processedSelectors.has(selector)) {
|
|
newWhat.push({
|
|
action: 'type',
|
|
args: [selector, encrypt(credential.value), credential.type]
|
|
});
|
|
|
|
newWhat.push({
|
|
action: 'waitForLoadState',
|
|
args: ['networkidle']
|
|
});
|
|
|
|
processedSelectors.add(selector);
|
|
|
|
// Skip subsequent type/press/waitForLoadState actions for this selector
|
|
while (i + 1 < step.what.length &&
|
|
(step.what[i + 1].action === 'type' ||
|
|
step.what[i + 1].action === 'press' ||
|
|
step.what[i + 1].action === 'waitForLoadState')) {
|
|
i++;
|
|
}
|
|
}
|
|
}
|
|
|
|
return {
|
|
...step,
|
|
what: newWhat
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* PUT endpoint to update the name and limit of a robot.
|
|
*/
|
|
router.put('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { name, limits, credentials, targetUrl, workflow: incomingWorkflow } = req.body;
|
|
|
|
// Validate input
|
|
if (!name && !limits && !credentials && !targetUrl) {
|
|
return res.status(400).json({ error: 'Either "name", "limits", "credentials" or "target_url" must be provided.' });
|
|
}
|
|
|
|
// Fetch the robot by ID
|
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
|
|
|
if (!robot) {
|
|
return res.status(404).json({ error: 'Robot not found.' });
|
|
}
|
|
|
|
// Update fields if provided
|
|
if (name) {
|
|
robot.set('recording_meta', { ...robot.recording_meta, name });
|
|
}
|
|
|
|
if (targetUrl) {
|
|
robot.set('recording_meta', { ...robot.recording_meta, url: targetUrl });
|
|
|
|
const updatedWorkflow = [...robot.recording.workflow];
|
|
let foundGoto = false;
|
|
|
|
for (let i = updatedWorkflow.length - 1; i >= 0; i--) {
|
|
const step = updatedWorkflow[i];
|
|
for (let j = 0; j < step.what.length; j++) {
|
|
const action = step.what[j];
|
|
if (action.action === "goto" && action.args?.length) {
|
|
|
|
action.args[0] = targetUrl;
|
|
if (step.where?.url && step.where.url !== "about:blank") {
|
|
step.where.url = targetUrl;
|
|
}
|
|
|
|
robot.set('recording', { ...robot.recording, workflow: updatedWorkflow });
|
|
robot.changed('recording', true);
|
|
foundGoto = true;
|
|
i = -1;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
await robot.save();
|
|
|
|
// Start with existing workflow or allow client to supply a full workflow replacement
|
|
let workflow = incomingWorkflow && Array.isArray(incomingWorkflow)
|
|
? JSON.parse(JSON.stringify(incomingWorkflow))
|
|
: [...robot.recording.workflow]; // Create a copy of the workflow
|
|
|
|
if (credentials) {
|
|
workflow = handleWorkflowActions(workflow, credentials);
|
|
}
|
|
|
|
if (limits && Array.isArray(limits) && limits.length > 0) {
|
|
for (const limitInfo of limits) {
|
|
const { pairIndex, actionIndex, argIndex, limit } = limitInfo;
|
|
|
|
const pair = workflow[pairIndex];
|
|
if (!pair || !pair.what) continue;
|
|
|
|
const action = pair.what[actionIndex];
|
|
if (!action || !action.args) continue;
|
|
|
|
const arg = action.args[argIndex];
|
|
if (!arg || typeof arg !== 'object') continue;
|
|
|
|
(arg as { limit: number }).limit = limit;
|
|
}
|
|
}
|
|
|
|
const updates: any = {
|
|
recording: {
|
|
...robot.recording,
|
|
workflow
|
|
}
|
|
};
|
|
|
|
if (name || targetUrl) {
|
|
updates.recording_meta = {
|
|
...robot.recording_meta,
|
|
...(name && { name }),
|
|
...(targetUrl && { url: targetUrl })
|
|
};
|
|
}
|
|
|
|
await Robot.update(updates, {
|
|
where: { 'recording_meta.id': id }
|
|
});
|
|
|
|
const updatedRobot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
|
|
|
logger.log('info', `Robot with ID ${id} was updated successfully.`);
|
|
|
|
return res.status(200).json({ message: 'Robot updated successfully', robot });
|
|
} catch (error) {
|
|
// Safely handle the error type
|
|
if (error instanceof Error) {
|
|
logger.log('error', `Error updating robot with ID ${req.params.id}: ${error.message}`);
|
|
return res.status(500).json({ error: error.message });
|
|
} else {
|
|
logger.log('error', `Unknown error updating robot with ID ${req.params.id}`);
|
|
return res.status(500).json({ error: 'An unknown error occurred.' });
|
|
}
|
|
}
|
|
});
|
|
|
|
|
|
/**
|
|
* POST endpoint to duplicate a robot and update its target URL.
|
|
*/
|
|
router.post('/recordings/:id/duplicate', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { targetUrl } = req.body;
|
|
|
|
if (!targetUrl) {
|
|
return res.status(400).json({ error: 'The "targetUrl" field is required.' });
|
|
}
|
|
|
|
const originalRobot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
|
|
|
if (!originalRobot) {
|
|
return res.status(404).json({ error: 'Original robot not found.' });
|
|
}
|
|
|
|
const lastWord = targetUrl.split('/').filter(Boolean).pop() || 'Unnamed';
|
|
|
|
const workflow = originalRobot.recording.workflow.map((step) => {
|
|
if (step.where?.url && step.where.url !== "about:blank") {
|
|
step.where.url = targetUrl;
|
|
}
|
|
|
|
step.what.forEach((action) => {
|
|
if (action.action === "goto" && action.args?.length) {
|
|
action.args[0] = targetUrl;
|
|
}
|
|
});
|
|
|
|
return step;
|
|
});
|
|
|
|
const currentTimestamp = new Date().toLocaleString();
|
|
|
|
const newRobot = await Robot.create({
|
|
id: uuid(),
|
|
userId: originalRobot.userId,
|
|
recording_meta: {
|
|
...originalRobot.recording_meta,
|
|
id: uuid(),
|
|
name: `${originalRobot.recording_meta.name} (${lastWord})`,
|
|
createdAt: currentTimestamp,
|
|
updatedAt: currentTimestamp,
|
|
},
|
|
recording: { ...originalRobot.recording, workflow },
|
|
google_sheet_email: null,
|
|
google_sheet_name: null,
|
|
google_sheet_id: null,
|
|
google_access_token: null,
|
|
google_refresh_token: null,
|
|
schedule: null,
|
|
});
|
|
|
|
logger.log('info', `Robot with ID ${id} duplicated successfully as ${newRobot.id}.`);
|
|
|
|
return res.status(201).json({
|
|
message: 'Robot duplicated and target URL updated successfully.',
|
|
robot: newRobot,
|
|
});
|
|
} catch (error) {
|
|
if (error instanceof Error) {
|
|
logger.log('error', `Error duplicating robot with ID ${req.params.id}: ${error.message}`);
|
|
return res.status(500).json({ error: error.message });
|
|
} else {
|
|
logger.log('error', `Unknown error duplicating robot with ID ${req.params.id}`);
|
|
return res.status(500).json({ error: 'An unknown error occurred.' });
|
|
}
|
|
}
|
|
});
|
|
|
|
/**
|
|
* POST endpoint for creating a markdown robot
|
|
*/
|
|
router.post('/recordings/scrape', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
const { url, name, formats } = req.body;
|
|
|
|
if (!url) {
|
|
return res.status(400).json({ error: 'The "url" field is required.' });
|
|
}
|
|
|
|
if (!req.user) {
|
|
return res.status(401).send({ error: 'Unauthorized' });
|
|
}
|
|
|
|
// Validate URL format
|
|
try {
|
|
new URL(url);
|
|
} catch (err) {
|
|
return res.status(400).json({ error: 'Invalid URL format' });
|
|
}
|
|
|
|
// Validate format
|
|
const validFormats = ['markdown', 'html'];
|
|
|
|
if (!Array.isArray(formats) || formats.length === 0) {
|
|
return res.status(400).json({ error: 'At least one output format must be selected.' });
|
|
}
|
|
|
|
const invalid = formats.filter(f => !validFormats.includes(f));
|
|
if (invalid.length > 0) {
|
|
return res.status(400).json({ error: `Invalid formats: ${invalid.join(', ')}` });
|
|
}
|
|
|
|
const robotName = name || `Markdown Robot - ${new URL(url).hostname}`;
|
|
const currentTimestamp = new Date().toLocaleString();
|
|
const robotId = uuid();
|
|
|
|
const newRobot = await Robot.create({
|
|
id: uuid(),
|
|
userId: req.user.id,
|
|
recording_meta: {
|
|
name: robotName,
|
|
id: robotId,
|
|
createdAt: currentTimestamp,
|
|
updatedAt: currentTimestamp,
|
|
pairs: 0,
|
|
params: [],
|
|
type: 'scrape',
|
|
url: url,
|
|
formats: formats,
|
|
},
|
|
recording: { workflow: [] },
|
|
google_sheet_email: null,
|
|
google_sheet_name: null,
|
|
google_sheet_id: null,
|
|
google_access_token: null,
|
|
google_refresh_token: null,
|
|
schedule: null,
|
|
});
|
|
|
|
logger.log('info', `Markdown robot created with id: ${newRobot.id}`);
|
|
capture(
|
|
'maxun-oss-robot-created',
|
|
{
|
|
robot_meta: newRobot.recording_meta,
|
|
recording: newRobot.recording,
|
|
}
|
|
)
|
|
|
|
return res.status(201).json({
|
|
message: 'Markdown robot created successfully.',
|
|
robot: newRobot,
|
|
});
|
|
} catch (error) {
|
|
if (error instanceof Error) {
|
|
logger.log('error', `Error creating markdown robot: ${error.message}`);
|
|
return res.status(500).json({ error: error.message });
|
|
} else {
|
|
logger.log('error', 'Unknown error creating markdown robot');
|
|
return res.status(500).json({ error: 'An unknown error occurred.' });
|
|
}
|
|
}
|
|
});
|
|
|
|
/**
|
|
* DELETE endpoint for deleting a recording from the storage.
|
|
*/
|
|
router.delete('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
if (!req.user) {
|
|
return res.status(401).send({ error: 'Unauthorized' });
|
|
}
|
|
try {
|
|
await Robot.destroy({
|
|
where: { 'recording_meta.id': req.params.id }
|
|
});
|
|
capture(
|
|
'maxun-oss-robot-deleted',
|
|
{
|
|
robotId: req.params.id,
|
|
user_id: req.user?.id,
|
|
deleted_at: new Date().toISOString(),
|
|
}
|
|
)
|
|
return res.send(true);
|
|
} catch (e) {
|
|
const { message } = e as Error;
|
|
logger.log('info', `Error while deleting a recording with name: ${req.params.fileName}.json`);
|
|
return res.send(false);
|
|
}
|
|
});
|
|
|
|
/**
|
|
* GET endpoint for getting an array of runs from the storage.
|
|
*/
|
|
router.get('/runs', requireSignIn, async (req, res) => {
|
|
try {
|
|
const data = await Run.findAll();
|
|
return res.send(data);
|
|
} catch (e) {
|
|
logger.log('info', 'Error while reading runs');
|
|
return res.send(null);
|
|
}
|
|
});
|
|
|
|
/**
|
|
* DELETE endpoint for deleting a run from the storage.
|
|
*/
|
|
router.delete('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
if (!req.user) {
|
|
return res.status(401).send({ error: 'Unauthorized' });
|
|
}
|
|
try {
|
|
await Run.destroy({ where: { runId: req.params.id } });
|
|
capture(
|
|
'maxun-oss-run-deleted',
|
|
{
|
|
runId: req.params.id,
|
|
user_id: req.user?.id,
|
|
deleted_at: new Date().toISOString(),
|
|
}
|
|
)
|
|
return res.send(true);
|
|
} catch (e) {
|
|
const { message } = e as Error;
|
|
logger.log('info', `Error while deleting a run with name: ${req.params.fileName}.json`);
|
|
return res.send(false);
|
|
}
|
|
});
|
|
|
|
/**
|
|
* PUT endpoint for starting a remote browser instance and saving run metadata to the storage.
|
|
* Making it ready for interpretation and returning a runId.
|
|
*
|
|
* If the user has reached their browser limit, the run will be queued using pgBossClient.
|
|
*/
|
|
router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
const recording = await Robot.findOne({
|
|
where: {
|
|
'recording_meta.id': req.params.id
|
|
},
|
|
raw: true
|
|
});
|
|
|
|
if (!recording || !recording.recording_meta || !recording.recording_meta.id) {
|
|
return res.status(404).send({ error: 'Recording not found' });
|
|
}
|
|
|
|
if (!req.user) {
|
|
return res.status(401).send({ error: 'Unauthorized' });
|
|
}
|
|
|
|
// Generate runId first
|
|
const runId = uuid();
|
|
|
|
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(req.user.id, "run");
|
|
|
|
if (canCreateBrowser) {
|
|
let browserId: string;
|
|
|
|
try {
|
|
browserId = await createRemoteBrowserForRun(req.user.id);
|
|
|
|
if (!browserId || browserId.trim() === '') {
|
|
throw new Error('Failed to generate valid browser ID');
|
|
}
|
|
|
|
logger.log('info', `Created browser ${browserId} for run ${runId}`);
|
|
|
|
} catch (browserError: any) {
|
|
logger.log('error', `Failed to create browser: ${browserError.message}`);
|
|
return res.status(500).send({ error: 'Failed to create browser instance' });
|
|
}
|
|
|
|
try {
|
|
await Run.create({
|
|
status: 'running',
|
|
name: recording.recording_meta.name,
|
|
robotId: recording.id,
|
|
robotMetaId: recording.recording_meta.id,
|
|
startedAt: new Date().toLocaleString(),
|
|
finishedAt: '',
|
|
browserId: browserId,
|
|
interpreterSettings: req.body,
|
|
log: '',
|
|
runId,
|
|
runByUserId: req.user.id,
|
|
serializableOutput: {},
|
|
binaryOutput: {},
|
|
});
|
|
|
|
logger.log('info', `Created run ${runId} with browser ${browserId}`);
|
|
|
|
} catch (dbError: any) {
|
|
logger.log('error', `Database error creating run: ${dbError.message}`);
|
|
|
|
try {
|
|
await destroyRemoteBrowser(browserId, req.user.id);
|
|
} catch (cleanupError: any) {
|
|
logger.log('warn', `Failed to cleanup browser after run creation failure: ${cleanupError.message}`);
|
|
}
|
|
|
|
return res.status(500).send({ error: 'Failed to create run record' });
|
|
}
|
|
|
|
try {
|
|
const userQueueName = `execute-run-user-${req.user.id}`;
|
|
await pgBossClient.createQueue(userQueueName);
|
|
|
|
const jobId = await pgBossClient.send(userQueueName, {
|
|
userId: req.user.id,
|
|
runId: runId,
|
|
browserId: browserId,
|
|
});
|
|
|
|
logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`);
|
|
} catch (queueError: any) {
|
|
logger.log('error', `Failed to queue run execution: ${queueError.message}`);
|
|
|
|
try {
|
|
await Run.update({
|
|
status: 'failed',
|
|
finishedAt: new Date().toLocaleString(),
|
|
log: 'Failed to queue execution job'
|
|
}, { where: { runId: runId } });
|
|
|
|
await destroyRemoteBrowser(browserId, req.user.id);
|
|
} catch (cleanupError: any) {
|
|
logger.log('warn', `Failed to cleanup after queue error: ${cleanupError.message}`);
|
|
}
|
|
|
|
return res.status(503).send({ error: 'Unable to queue run, please try again later' });
|
|
}
|
|
|
|
return res.send({
|
|
browserId: browserId,
|
|
runId: runId,
|
|
robotMetaId: recording.recording_meta.id,
|
|
queued: false
|
|
});
|
|
} else {
|
|
const browserId = uuid();
|
|
|
|
await Run.create({
|
|
status: 'queued',
|
|
name: recording.recording_meta.name,
|
|
robotId: recording.id,
|
|
robotMetaId: recording.recording_meta.id,
|
|
startedAt: new Date().toLocaleString(),
|
|
finishedAt: '',
|
|
browserId,
|
|
interpreterSettings: req.body,
|
|
log: 'Run queued - waiting for available browser slot',
|
|
runId,
|
|
runByUserId: req.user.id,
|
|
serializableOutput: {},
|
|
binaryOutput: {},
|
|
});
|
|
|
|
return res.send({
|
|
browserId: browserId,
|
|
runId: runId,
|
|
robotMetaId: recording.recording_meta.id,
|
|
queued: true
|
|
});
|
|
}
|
|
} catch (e) {
|
|
const { message } = e as Error;
|
|
logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
|
|
return res.status(500).send({ error: 'Internal server error' });
|
|
}
|
|
});
|
|
|
|
/**
|
|
* GET endpoint for getting a run from the storage.
|
|
*/
|
|
router.get('/runs/run/:id', requireSignIn, async (req, res) => {
|
|
try {
|
|
const run = await Run.findOne({ where: { runId: req.params.runId }, raw: true });
|
|
if (!run) {
|
|
return res.status(404).send(null);
|
|
}
|
|
return res.send(run);
|
|
} catch (e) {
|
|
const { message } = e as Error;
|
|
logger.log('error', `Error ${message} while reading a run with id: ${req.params.id}.json`);
|
|
return res.send(null);
|
|
}
|
|
});
|
|
|
|
function AddGeneratedFlags(workflow: WorkflowFile) {
|
|
const copy = JSON.parse(JSON.stringify(workflow));
|
|
for (let i = 0; i < workflow.workflow.length; i++) {
|
|
copy.workflow[i].what.unshift({
|
|
action: 'flag',
|
|
args: ['generated'],
|
|
});
|
|
}
|
|
return copy;
|
|
};
|
|
|
|
/**
|
|
* PUT endpoint for finishing a run and saving it to the storage.
|
|
*/
|
|
router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); }
|
|
|
|
const run = await Run.findOne({ where: { runId: req.params.id } });
|
|
if (!run) {
|
|
return res.status(404).send(false);
|
|
}
|
|
|
|
const plainRun = run.toJSON();
|
|
|
|
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
|
|
if (!recording) {
|
|
return res.status(404).send(false);
|
|
}
|
|
|
|
try {
|
|
const userQueueName = `execute-run-user-${req.user.id}`;
|
|
|
|
// Queue the execution job
|
|
await pgBossClient.createQueue(userQueueName);
|
|
|
|
const jobId = await pgBossClient.send(userQueueName, {
|
|
userId: req.user.id,
|
|
runId: req.params.id,
|
|
browserId: plainRun.browserId
|
|
});
|
|
|
|
logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${req.params.id}`);
|
|
} catch (queueError: any) {
|
|
logger.log('error', `Failed to queue run execution`);
|
|
|
|
}
|
|
} catch (e) {
|
|
const { message } = e as Error;
|
|
// If error occurs, set run status to failed
|
|
const run = await Run.findOne({ where: { runId: req.params.id } });
|
|
if (run) {
|
|
await run.update({
|
|
status: 'failed',
|
|
finishedAt: new Date().toLocaleString(),
|
|
});
|
|
}
|
|
logger.log('info', `Error while running a robot with id: ${req.params.id} - ${message}`);
|
|
capture(
|
|
'maxun-oss-run-created-manual',
|
|
{
|
|
runId: req.params.id,
|
|
user_id: req.user?.id,
|
|
created_at: new Date().toISOString(),
|
|
status: 'failed',
|
|
error_message: message,
|
|
}
|
|
);
|
|
return res.send(false);
|
|
}
|
|
});
|
|
|
|
router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
const { runEvery, runEveryUnit, startFrom, dayOfMonth, atTimeStart, atTimeEnd, timezone } = req.body;
|
|
|
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
|
if (!robot) {
|
|
return res.status(404).json({ error: 'Robot not found' });
|
|
}
|
|
|
|
// Validate required parameters
|
|
if (!runEvery || !runEveryUnit || !startFrom || !atTimeStart || !atTimeEnd || !timezone) {
|
|
return res.status(400).json({ error: 'Missing required parameters' });
|
|
}
|
|
|
|
// Validate time zone
|
|
if (!moment.tz.zone(timezone)) {
|
|
return res.status(400).json({ error: 'Invalid timezone' });
|
|
}
|
|
|
|
// Validate and parse start and end times
|
|
const [startHours, startMinutes] = atTimeStart.split(':').map(Number);
|
|
const [endHours, endMinutes] = atTimeEnd.split(':').map(Number);
|
|
|
|
if (isNaN(startHours) || isNaN(startMinutes) || isNaN(endHours) || isNaN(endMinutes) ||
|
|
startHours < 0 || startHours > 23 || startMinutes < 0 || startMinutes > 59 ||
|
|
endHours < 0 || endHours > 23 || endMinutes < 0 || endMinutes > 59) {
|
|
return res.status(400).json({ error: 'Invalid time format' });
|
|
}
|
|
|
|
const days = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY'];
|
|
if (!days.includes(startFrom)) {
|
|
return res.status(400).json({ error: 'Invalid start day' });
|
|
}
|
|
|
|
// Build cron expression based on run frequency and starting day
|
|
let cronExpression;
|
|
const dayIndex = days.indexOf(startFrom);
|
|
|
|
switch (runEveryUnit) {
|
|
case 'MINUTES':
|
|
cronExpression = `*/${runEvery} * * * *`;
|
|
break;
|
|
case 'HOURS':
|
|
cronExpression = `${startMinutes} */${runEvery} * * *`;
|
|
break;
|
|
case 'DAYS':
|
|
cronExpression = `${startMinutes} ${startHours} */${runEvery} * *`;
|
|
break;
|
|
case 'WEEKS':
|
|
cronExpression = `${startMinutes} ${startHours} * * ${dayIndex}`;
|
|
break;
|
|
case 'MONTHS':
|
|
// todo: handle leap year
|
|
cronExpression = `${startMinutes} ${startHours} ${dayOfMonth} */${runEvery} *`;
|
|
if (startFrom !== 'SUNDAY') {
|
|
cronExpression += ` ${dayIndex}`;
|
|
}
|
|
break;
|
|
default:
|
|
return res.status(400).json({ error: 'Invalid runEveryUnit' });
|
|
}
|
|
|
|
// Validate cron expression
|
|
if (!cronExpression || !cron.validate(cronExpression)) {
|
|
return res.status(400).json({ error: 'Invalid cron expression generated' });
|
|
}
|
|
|
|
if (!req.user) {
|
|
return res.status(401).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
try {
|
|
await cancelScheduledWorkflow(id);
|
|
} catch (cancelError) {
|
|
logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`);
|
|
}
|
|
|
|
const jobId = await scheduleWorkflow(id, req.user.id, cronExpression, timezone);
|
|
|
|
const nextRunAt = computeNextRun(cronExpression, timezone);
|
|
|
|
await robot.update({
|
|
schedule: {
|
|
runEvery,
|
|
runEveryUnit,
|
|
startFrom,
|
|
dayOfMonth,
|
|
atTimeStart,
|
|
atTimeEnd,
|
|
timezone,
|
|
cronExpression,
|
|
lastRunAt: undefined,
|
|
nextRunAt: nextRunAt || undefined,
|
|
},
|
|
});
|
|
|
|
capture(
|
|
'maxun-oss-robot-scheduled',
|
|
{
|
|
robotId: id,
|
|
user_id: req.user.id,
|
|
scheduled_at: new Date().toISOString(),
|
|
}
|
|
)
|
|
|
|
// Fetch updated schedule details after setting it
|
|
const updatedRobot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
|
|
|
res.status(200).json({
|
|
message: 'success',
|
|
robot: updatedRobot,
|
|
});
|
|
} catch (error) {
|
|
console.error('Error scheduling workflow:', error);
|
|
res.status(500).json({ error: 'Failed to schedule workflow' });
|
|
}
|
|
});
|
|
|
|
|
|
// Endpoint to get schedule details
|
|
router.get('/schedule/:id', requireSignIn, async (req, res) => {
|
|
try {
|
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': req.params.id }, raw: true });
|
|
|
|
if (!robot) {
|
|
return res.status(404).json({ error: 'Robot not found' });
|
|
}
|
|
|
|
return res.status(200).json({
|
|
schedule: robot.schedule
|
|
});
|
|
|
|
} catch (error) {
|
|
console.error('Error getting schedule:', error);
|
|
res.status(500).json({ error: 'Failed to get schedule' });
|
|
}
|
|
});
|
|
|
|
// Endpoint to delete schedule
|
|
router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
const { id } = req.params;
|
|
|
|
if (!req.user) {
|
|
return res.status(401).json({ error: 'Unauthorized' });
|
|
}
|
|
|
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
|
if (!robot) {
|
|
return res.status(404).json({ error: 'Robot not found' });
|
|
}
|
|
|
|
// Cancel the scheduled job in pgBossClient
|
|
try {
|
|
await cancelScheduledWorkflow(id);
|
|
} catch (error) {
|
|
logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`);
|
|
// Continue with robot update even if cancellation fails
|
|
}
|
|
|
|
// Delete the schedule from the robot
|
|
await robot.update({
|
|
schedule: null
|
|
});
|
|
|
|
capture(
|
|
'maxun-oss-robot-schedule-deleted',
|
|
{
|
|
robotId: id,
|
|
user_id: req.user?.id,
|
|
unscheduled_at: new Date().toISOString(),
|
|
}
|
|
)
|
|
|
|
res.status(200).json({ message: 'Schedule deleted successfully' });
|
|
|
|
} catch (error) {
|
|
console.error('Error deleting schedule:', error);
|
|
res.status(500).json({ error: 'Failed to delete schedule' });
|
|
}
|
|
});
|
|
|
|
/**
|
|
* POST endpoint for aborting a current interpretation of the run.
|
|
*/
|
|
router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
|
|
try {
|
|
if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); }
|
|
|
|
const run = await Run.findOne({ where: { runId: req.params.id } });
|
|
|
|
if (!run) {
|
|
return res.status(404).send({ error: 'Run not found' });
|
|
}
|
|
|
|
if (!['running', 'queued'].includes(run.status)) {
|
|
return res.status(400).send({
|
|
error: `Cannot abort run with status: ${run.status}`
|
|
});
|
|
}
|
|
|
|
const isQueued = run.status === 'queued';
|
|
|
|
await run.update({
|
|
status: 'aborting'
|
|
});
|
|
|
|
if (isQueued) {
|
|
await run.update({
|
|
status: 'aborted',
|
|
finishedAt: new Date().toLocaleString(),
|
|
log: 'Run aborted while queued'
|
|
});
|
|
|
|
return res.send({
|
|
success: true,
|
|
message: 'Queued run aborted',
|
|
isQueued: true
|
|
});
|
|
}
|
|
|
|
// Immediately stop interpreter like cloud version
|
|
try {
|
|
const browser = browserPool.getRemoteBrowser(run.browserId);
|
|
if (browser && browser.interpreter) {
|
|
logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`);
|
|
await browser.interpreter.stopInterpretation();
|
|
}
|
|
} catch (immediateStopError: any) {
|
|
logger.log('warn', `Failed to immediately stop interpreter: ${immediateStopError.message}`);
|
|
}
|
|
|
|
const userQueueName = `abort-run-user-${req.user.id}`;
|
|
await pgBossClient.createQueue(userQueueName);
|
|
|
|
const jobId = await pgBossClient.send(userQueueName, {
|
|
userId: req.user.id,
|
|
runId: req.params.id
|
|
});
|
|
|
|
logger.log('info', `Abort signal sent for run ${req.params.id}, job ID: ${jobId}`);
|
|
|
|
return res.send({
|
|
success: true,
|
|
message: 'Run stopped immediately, cleanup queued',
|
|
jobId,
|
|
isQueued: false
|
|
});
|
|
|
|
} catch (e) {
|
|
const { message } = e as Error;
|
|
logger.log('error', `Error aborting run ${req.params.id}: ${message}`);
|
|
return res.status(500).send({ error: 'Failed to abort run' });
|
|
}
|
|
});
|
|
|
|
// Circuit breaker for database connection issues
|
|
let consecutiveDbErrors = 0;
|
|
const MAX_CONSECUTIVE_ERRORS = 3;
|
|
const CIRCUIT_BREAKER_COOLDOWN = 30000;
|
|
let circuitBreakerOpenUntil = 0;
|
|
|
|
async function processQueuedRuns() {
|
|
try {
|
|
if (Date.now() < circuitBreakerOpenUntil) {
|
|
return;
|
|
}
|
|
const queuedRun = await Run.findOne({
|
|
where: { status: 'queued' },
|
|
order: [['startedAt', 'ASC']],
|
|
});
|
|
consecutiveDbErrors = 0;
|
|
if (!queuedRun) return;
|
|
|
|
const userId = queuedRun.runByUserId;
|
|
|
|
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run");
|
|
|
|
if (canCreateBrowser) {
|
|
logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`);
|
|
|
|
const recording = await Robot.findOne({
|
|
where: {
|
|
'recording_meta.id': queuedRun.robotMetaId
|
|
},
|
|
raw: true
|
|
});
|
|
|
|
if (!recording) {
|
|
await queuedRun.update({
|
|
status: 'failed',
|
|
finishedAt: new Date().toLocaleString(),
|
|
log: 'Recording not found'
|
|
});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const newBrowserId = await createRemoteBrowserForRun(userId);
|
|
|
|
logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`);
|
|
|
|
await queuedRun.update({
|
|
status: 'running',
|
|
browserId: newBrowserId,
|
|
log: 'Browser created and ready for execution'
|
|
});
|
|
|
|
const userQueueName = `execute-run-user-${userId}`;
|
|
await pgBossClient.createQueue(userQueueName);
|
|
|
|
const jobId = await pgBossClient.send(userQueueName, {
|
|
userId: userId,
|
|
runId: queuedRun.runId,
|
|
browserId: newBrowserId,
|
|
});
|
|
|
|
logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`);
|
|
|
|
} catch (browserError: any) {
|
|
logger.log('error', `Failed to create browser for queued run: ${browserError.message}`);
|
|
await queuedRun.update({
|
|
status: 'failed',
|
|
finishedAt: new Date().toLocaleString(),
|
|
log: `Failed to create browser: ${browserError.message}`
|
|
});
|
|
}
|
|
}
|
|
} catch (error: any) {
|
|
consecutiveDbErrors++;
|
|
|
|
if (consecutiveDbErrors >= MAX_CONSECUTIVE_ERRORS) {
|
|
circuitBreakerOpenUntil = Date.now() + CIRCUIT_BREAKER_COOLDOWN;
|
|
logger.log('error', `Circuit breaker opened after ${MAX_CONSECUTIVE_ERRORS} consecutive errors. Cooling down for ${CIRCUIT_BREAKER_COOLDOWN/1000}s`);
|
|
}
|
|
|
|
logger.log('error', `Error processing queued runs (${consecutiveDbErrors}/${MAX_CONSECUTIVE_ERRORS}): ${error.message}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Recovers orphaned runs that were left in "running" status due to instance crashes
|
|
* This function runs on server startup to ensure data reliability
|
|
*/
|
|
export async function recoverOrphanedRuns() {
|
|
try {
|
|
logger.log('info', 'Starting recovery of orphaned runs...');
|
|
|
|
const orphanedRuns = await Run.findAll({
|
|
where: {
|
|
status: ['running', 'scheduled']
|
|
},
|
|
order: [['startedAt', 'ASC']]
|
|
});
|
|
|
|
if (orphanedRuns.length === 0) {
|
|
logger.log('info', 'No orphaned runs found');
|
|
return;
|
|
}
|
|
|
|
logger.log('info', `Found ${orphanedRuns.length} orphaned runs to recover (including scheduled runs)`);
|
|
|
|
for (const run of orphanedRuns) {
|
|
try {
|
|
const runData = run.toJSON();
|
|
logger.log('info', `Recovering orphaned run: ${runData.runId}`);
|
|
|
|
const browser = browserPool.getRemoteBrowser(runData.browserId);
|
|
|
|
if (!browser) {
|
|
const retryCount = runData.retryCount || 0;
|
|
|
|
if (retryCount < 3) {
|
|
await run.update({
|
|
status: 'queued',
|
|
retryCount: retryCount + 1,
|
|
serializableOutput: {},
|
|
binaryOutput: {},
|
|
browserId: undefined,
|
|
log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash`
|
|
});
|
|
|
|
logger.log('info', `Re-queued crashed run ${runData.runId} (retry ${retryCount + 1}/3)`);
|
|
} else {
|
|
const crashRecoveryMessage = `Max retries exceeded (3/3) - Run failed after multiple server crashes.`;
|
|
|
|
await run.update({
|
|
status: 'failed',
|
|
finishedAt: new Date().toLocaleString(),
|
|
log: runData.log ? `${runData.log}\n${crashRecoveryMessage}` : crashRecoveryMessage
|
|
});
|
|
|
|
logger.log('warn', `Max retries reached for run ${runData.runId}, marked as permanently failed`);
|
|
}
|
|
|
|
if (runData.browserId) {
|
|
try {
|
|
browserPool.deleteRemoteBrowser(runData.browserId);
|
|
logger.log('info', `Cleaned up stale browser reference: ${runData.browserId}`);
|
|
} catch (cleanupError: any) {
|
|
logger.log('warn', `Failed to cleanup browser reference ${runData.browserId}: ${cleanupError.message}`);
|
|
}
|
|
}
|
|
} else {
|
|
logger.log('info', `Run ${runData.runId} browser still active, not orphaned`);
|
|
}
|
|
} catch (runError: any) {
|
|
logger.log('error', `Failed to recover run ${run.runId}: ${runError.message}`);
|
|
}
|
|
}
|
|
|
|
logger.log('info', `Orphaned run recovery completed. Processed ${orphanedRuns.length} runs.`);
|
|
} catch (error: any) {
|
|
logger.log('error', `Failed to recover orphaned runs: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
export { processQueuedRuns };
|