Files
parcer/server/src/workflow-management/classes/Interpreter.ts

668 lines
22 KiB
TypeScript
Raw Normal View History

2024-07-31 22:46:19 +05:30
import Interpreter, { WorkflowFile } from "maxun-core";
2024-06-05 04:24:49 +05:30
import logger from "../../logger";
import { Socket } from "socket.io";
import { Page } from "playwright";
import { InterpreterSettings } from "../../types";
import { decrypt } from "../../utils/auth";
2025-09-10 00:21:43 +05:30
import Run from "../../models/Run";
/**
2025-01-09 19:08:29 +05:30
* Decrypts any encrypted inputs in the workflow. If checkLimit is true, it will also handle the limit validation for scrapeList action.
* @param workflow The workflow to decrypt.
2025-01-09 19:08:29 +05:30
* @param checkLimit If true, it will handle the limit validation for scrapeList action.
*/
2025-01-09 19:08:29 +05:30
function processWorkflow(workflow: WorkflowFile, checkLimit: boolean = false): WorkflowFile {
const processedWorkflow = JSON.parse(JSON.stringify(workflow)) as WorkflowFile;
2025-01-09 19:08:29 +05:30
processedWorkflow.workflow.forEach((pair) => {
pair.what.forEach((action) => {
2025-01-09 19:08:29 +05:30
// Handle limit validation for scrapeList action
if (action.action === 'scrapeList' && checkLimit && Array.isArray(action.args) && action.args.length > 0) {
const scrapeConfig = action.args[0];
if (scrapeConfig && typeof scrapeConfig === 'object' && 'limit' in scrapeConfig) {
if (typeof scrapeConfig.limit === 'number' && scrapeConfig.limit > 5) {
scrapeConfig.limit = 5;
}
}
}
// Handle decryption for type and press actions
if ((action.action === 'type' || action.action === 'press') && Array.isArray(action.args) && action.args.length > 1) {
try {
const encryptedValue = action.args[1];
if (typeof encryptedValue === 'string') {
const decryptedValue = decrypt(encryptedValue);
action.args[1] = decryptedValue;
} else {
logger.log('error', 'Encrypted value is not a string');
action.args[1] = '';
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to decrypt input value: ${errorMessage}`);
action.args[1] = '';
}
}
});
});
2025-01-09 19:08:29 +05:30
return processedWorkflow;
}
2024-06-05 04:24:49 +05:30
/**
* This class implements the main interpretation functions.
* It holds some information about the current interpretation process and
* registers to some events to allow the client (frontend) to interact with the interpreter.
2024-07-31 22:46:19 +05:30
* It uses the [maxun-core](https://www.npmjs.com/package/maxun-core)
2024-06-05 04:24:49 +05:30
* library to interpret the workflow.
* @category WorkflowManagement
*/
export class WorkflowInterpreter {
/**
* Socket.io socket instance enabling communication with the client (frontend) side.
* @private
*/
2024-06-05 04:36:22 +05:30
private socket: Socket;
2024-06-05 04:24:49 +05:30
/**
* True if the interpretation is paused.
*/
public interpretationIsPaused: boolean = false;
/**
* The instance of the {@link Interpreter} class used to interpret the workflow.
2024-07-31 22:46:19 +05:30
* From maxun-core.
2024-06-05 04:24:49 +05:30
* @private
*/
private interpreter: Interpreter | null = null;
/**
* An id of the currently interpreted pair in the workflow.
* @private
*/
private activeId: number | null = null;
/**
* An array of debug messages emitted by the {@link Interpreter}.
*/
public debugMessages: string[] = [];
/**
2025-04-27 15:23:05 +05:30
* Storage for different types of serializable data
2024-06-05 04:24:49 +05:30
*/
2025-04-27 15:23:05 +05:30
public serializableDataByType: {
scrapeSchema: any[],
scrapeList: any[],
} = {
scrapeSchema: [],
scrapeList: [],
};
/**
* Track the current action type being processed
*/
private currentActionType: string | null = null;
2024-06-05 04:24:49 +05:30
/**
* An array of all the binary data extracted from the run.
*/
2024-06-05 04:36:22 +05:30
public binaryData: { mimetype: string, data: string }[] = [];
2024-06-05 04:24:49 +05:30
2025-06-07 14:50:28 +05:30
/**
* Track current scrapeList index
*/
private currentScrapeListIndex: number = 0;
2025-09-10 00:21:43 +05:30
/**
* Current run ID for real-time persistence
*/
private currentRunId: string | null = null;
2025-09-28 22:53:52 +05:30
/**
* Batched persistence system for performance optimization
*/
private persistenceBuffer: Array<{
actionType: string;
data: any;
listIndex?: number;
timestamp: number;
creditValidated: boolean;
}> = [];
private persistenceTimer: NodeJS.Timeout | null = null;
private readonly BATCH_SIZE = 5; // Process every 5 items
private readonly BATCH_TIMEOUT = 3000; // Or every 3 seconds
private persistenceInProgress = false;
2024-06-05 04:24:49 +05:30
/**
* An array of id's of the pairs from the workflow that are about to be paused.
* As "breakpoints".
* @private
*/
private breakpoints: boolean[] = [];
/**
* Callback to resume the interpretation after a pause.
* @private
*/
private interpretationResume: (() => void) | null = null;
/**
* A public constructor taking a socket instance for communication with the client.
* @param socket Socket.io socket instance enabling communication with the client (frontend) side.
2025-09-10 00:21:43 +05:30
* @param runId Optional run ID for real-time data persistence
* @constructor
*/
2025-09-10 00:21:43 +05:30
constructor(socket: Socket, runId?: string) {
this.socket = socket;
2025-09-10 00:21:43 +05:30
this.currentRunId = runId || null;
}
2024-06-05 04:29:55 +05:30
/**
* Subscribes to the events that are used to control the interpretation.
* The events are pause, resume, step and breakpoints.
* Step is used to interpret a single pair and pause on the other matched pair.
* @returns void
*/
public subscribeToPausing = () => {
this.socket.on('pause', () => {
this.interpretationIsPaused = true;
});
2024-06-05 04:30:26 +05:30
this.socket.on('resume', () => {
this.interpretationIsPaused = false;
if (this.interpretationResume) {
this.interpretationResume();
this.socket.emit('log', '----- The interpretation has been resumed -----', false);
} else {
2024-06-05 04:36:22 +05:30
logger.log('debug', "Resume called but no resume function is set");
2024-06-05 04:30:26 +05:30
}
});
2024-06-05 04:30:44 +05:30
this.socket.on('step', () => {
if (this.interpretationResume) {
this.interpretationResume();
} else {
logger.log('debug', "Step called but no resume function is set");
}
});
2024-06-05 04:30:59 +05:30
this.socket.on('breakpoints', (data: boolean[]) => {
logger.log('debug', "Setting breakpoints: " + data);
this.breakpoints = data
});
2024-06-05 04:29:55 +05:30
}
/**
* Sets up the instance of {@link Interpreter} and interprets
* the workflow inside the recording editor.
* Cleans up this interpreter instance after the interpretation is finished.
* @param workflow The workflow to interpret.
* @param page The page instance used to interact with the browser.
* @param updatePageOnPause A callback to update the page after a pause.
* @returns {Promise<void>}
*/
public interpretRecordingInEditor = async (
workflow: WorkflowFile,
page: Page,
updatePageOnPause: (page: Page) => void,
settings: InterpreterSettings,
) => {
const params = settings.params ? settings.params : null;
delete settings.params;
2025-01-09 19:08:29 +05:30
const processedWorkflow = processWorkflow(workflow, true);
const options = {
...settings,
2025-05-07 09:29:03 +05:30
mode: 'editor',
debugChannel: {
activeId: (id: any) => {
this.activeId = id;
this.socket.emit('activePairId', id);
},
debugMessage: (msg: any) => {
this.debugMessages.push(`[${new Date().toLocaleString()}] ` + msg);
this.socket.emit('log', msg)
},
2025-04-27 15:23:05 +05:30
setActionType: (type: string) => {
this.currentActionType = type;
}
},
2025-09-10 00:21:43 +05:30
serializableCallback: async (data: any) => {
if (this.currentActionType === 'scrapeSchema') {
2025-09-10 00:21:43 +05:30
const cumulativeScrapeSchemaData = Array.isArray(data) && data.length > 0 ? data : [data];
if (cumulativeScrapeSchemaData.length > 0) {
await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
}
if (Array.isArray(data) && data.length > 0) {
this.socket.emit('serializableCallback', {
type: 'captureText',
data
});
} else {
this.socket.emit('serializableCallback', {
type: 'captureText',
data : [data]
});
}
} else if (this.currentActionType === 'scrapeList') {
2025-09-10 00:21:43 +05:30
if (data && Array.isArray(data) && data.length > 0) {
// Use the current index for persistence
await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex);
}
this.socket.emit('serializableCallback', {
type: 'captureList',
data
});
2025-04-30 19:24:38 +05:30
}
},
2025-09-10 00:21:43 +05:30
binaryCallback: async (data: string, mimetype: string) => {
const binaryItem = { mimetype, data: JSON.stringify(data) };
this.binaryData.push(binaryItem);
// Persist binary data to database
await this.persistBinaryDataToDatabase(binaryItem);
this.socket.emit('binaryCallback', {
data,
mimetype,
type: 'captureScreenshot'
});
}
}
2025-01-09 19:08:29 +05:30
const interpreter = new Interpreter(processedWorkflow, options);
this.interpreter = interpreter;
interpreter.on('flag', async (page, resume) => {
if (this.activeId !== null && this.breakpoints[this.activeId]) {
2024-06-05 04:36:22 +05:30
logger.log('debug', `breakpoint hit id: ${this.activeId}`);
this.socket.emit('breakpointHit');
this.interpretationIsPaused = true;
}
if (this.interpretationIsPaused) {
this.interpretationResume = resume;
2024-06-05 04:36:22 +05:30
logger.log('debug', `Paused inside of flag: ${page.url()}`);
updatePageOnPause(page);
this.socket.emit('log', '----- The interpretation has been paused -----', false);
} else {
resume();
}
});
this.socket.emit('log', '----- Starting the interpretation -----', false);
const status = await interpreter.run(page, params);
this.socket.emit('log', `----- The interpretation finished with status: ${status} -----`, false);
2024-06-05 04:36:22 +05:30
logger.log('debug', `Interpretation finished`);
this.interpreter = null;
this.socket.emit('activePairId', -1);
this.interpretationIsPaused = false;
this.interpretationResume = null;
this.socket.emit('finished');
};
/**
* Stops the current process of the interpretation of the workflow.
* @returns {Promise<void>}
*/
public stopInterpretation = async () => {
if (this.interpreter) {
logger.log('info', 'Stopping the interpretation.');
2025-09-10 00:21:43 +05:30
this.interpreter.abort();
logger.log('info', 'maxun-core interpreter aborted - data collection stopped immediately');
await this.interpreter.stop();
this.socket.emit('log', '----- The interpretation has been stopped -----', false);
2025-09-28 22:53:52 +05:30
await this.clearState();
} else {
logger.log('error', 'Cannot stop: No active interpretation.');
}
};
2025-09-28 22:53:52 +05:30
public clearState = async (): Promise<void> => {
if (this.persistenceBuffer.length > 0) {
try {
await this.flushPersistenceBuffer();
logger.log('debug', 'Successfully flushed final persistence buffer during cleanup');
} catch (error: any) {
logger.log('error', `Failed to flush final persistence buffer: ${error.message}`);
}
}
if (this.persistenceTimer) {
clearTimeout(this.persistenceTimer);
this.persistenceTimer = null;
}
this.debugMessages = [];
this.interpretationIsPaused = false;
this.activeId = null;
this.interpreter = null;
this.breakpoints = [];
this.interpretationResume = null;
2025-04-27 15:23:05 +05:30
this.currentActionType = null;
this.serializableDataByType = {
scrapeSchema: [],
scrapeList: [],
};
this.binaryData = [];
2025-06-07 14:50:28 +05:30
this.currentScrapeListIndex = 0;
2025-09-10 00:21:43 +05:30
this.currentRunId = null;
2025-09-28 22:53:52 +05:30
this.persistenceBuffer = [];
this.persistenceInProgress = false;
}
2025-09-10 00:21:43 +05:30
/**
* Sets the current run ID for real-time persistence.
* @param runId The run ID to set
*/
public setRunId = (runId: string): void => {
this.currentRunId = runId;
logger.log('debug', `Set run ID for real-time persistence: ${runId}`);
};
/**
2025-09-28 22:53:52 +05:30
* Persists extracted data to database with intelligent batching for performance
* Falls back to immediate persistence for critical operations
2025-09-10 00:21:43 +05:30
* @private
*/
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
if (!this.currentRunId) {
2025-09-28 22:53:52 +05:30
logger.log('debug', 'No run ID available for persistence');
2025-09-10 00:21:43 +05:30
return;
}
2025-09-28 22:53:52 +05:30
this.addToPersistenceBatch(actionType, data, listIndex, true);
2025-09-10 00:21:43 +05:30
2025-09-28 22:53:52 +05:30
if (actionType === 'scrapeSchema' || this.persistenceBuffer.length >= this.BATCH_SIZE) {
await this.flushPersistenceBuffer();
} else {
this.scheduleBatchFlush();
2025-09-10 00:21:43 +05:30
}
};
/**
* Persists binary data to database in real-time
* @private
*/
private persistBinaryDataToDatabase = async (binaryItem: { mimetype: string, data: string }): Promise<void> => {
if (!this.currentRunId) {
logger.log('debug', 'No run ID available for binary data persistence');
return;
}
try {
const run = await Run.findOne({ where: { runId: this.currentRunId } });
if (!run) {
logger.log('warn', `Run not found for binary data persistence: ${this.currentRunId}`);
return;
}
const currentBinaryOutput = run.binaryOutput ?
JSON.parse(JSON.stringify(run.binaryOutput)) :
{};
const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`;
const updatedBinaryOutput = {
...currentBinaryOutput,
[uniqueKey]: binaryItem
};
await run.update({
binaryOutput: updatedBinaryOutput
});
logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.mimetype}`);
} catch (error: any) {
logger.log('error', `Failed to persist binary data in real-time for run ${this.currentRunId}: ${error.message}`);
}
};
2024-06-05 04:34:36 +05:30
/**
* Interprets the recording as a run.
* @param workflow The workflow to interpret.
* @param page The page instance used to interact with the browser.
* @param settings The settings to use for the interpretation.
*/
public InterpretRecording = async (
workflow: WorkflowFile,
page: Page,
updatePageOnPause: (page: Page) => void,
settings: InterpreterSettings
) => {
2024-06-05 04:34:36 +05:30
const params = settings.params ? settings.params : null;
delete settings.params;
2025-01-09 19:08:29 +05:30
const processedWorkflow = processWorkflow(workflow);
let mergedScrapeSchema = {};
2024-06-05 04:34:36 +05:30
const options = {
...settings,
debugChannel: {
activeId: (id: any) => {
this.activeId = id;
this.socket.emit('activePairId', id);
},
debugMessage: (msg: any) => {
this.debugMessages.push(`[${new Date().toLocaleString()}] ` + msg);
this.socket.emit('debugMessage', msg)
},
2025-04-27 15:23:05 +05:30
setActionType: (type: string) => {
this.currentActionType = type;
2025-06-07 14:50:28 +05:30
},
incrementScrapeListIndex: () => {
this.currentScrapeListIndex++;
2025-04-27 15:23:05 +05:30
}
2024-06-05 04:34:36 +05:30
},
2025-09-10 00:21:43 +05:30
serializableCallback: async (data: any) => {
2025-04-27 15:23:05 +05:30
if (this.currentActionType === 'scrapeSchema') {
if (Array.isArray(data) && data.length > 0) {
mergedScrapeSchema = { ...mergedScrapeSchema, ...data[0] };
this.serializableDataByType.scrapeSchema.push(data);
} else {
mergedScrapeSchema = { ...mergedScrapeSchema, ...data };
this.serializableDataByType.scrapeSchema.push([data]);
}
2025-09-10 00:21:43 +05:30
// Persist the cumulative scrapeSchema data
const cumulativeScrapeSchemaData = Object.keys(mergedScrapeSchema).length > 0 ? [mergedScrapeSchema] : [];
if (cumulativeScrapeSchemaData.length > 0) {
await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
}
2025-04-27 15:23:05 +05:30
} else if (this.currentActionType === 'scrapeList') {
2025-09-10 00:21:43 +05:30
if (data && Array.isArray(data) && data.length > 0) {
// Use the current index for persistence
await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex);
}
2025-06-07 14:50:28 +05:30
this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data;
2025-04-30 19:24:38 +05:30
}
2025-04-27 15:23:05 +05:30
2024-06-05 04:34:36 +05:30
this.socket.emit('serializableCallback', data);
},
binaryCallback: async (data: string, mimetype: string) => {
2025-09-10 00:21:43 +05:30
const binaryItem = { mimetype, data: JSON.stringify(data) };
this.binaryData.push(binaryItem);
// Persist binary data to database
await this.persistBinaryDataToDatabase(binaryItem);
2024-06-05 04:36:22 +05:30
this.socket.emit('binaryCallback', { data, mimetype });
2024-06-05 04:34:36 +05:30
}
}
2025-01-09 19:08:29 +05:30
const interpreter = new Interpreter(processedWorkflow, options);
2024-06-05 04:34:36 +05:30
this.interpreter = interpreter;
interpreter.on('flag', async (page, resume) => {
if (this.activeId !== null && this.breakpoints[this.activeId]) {
logger.log('debug', `breakpoint hit id: ${this.activeId}`);
this.socket.emit('breakpointHit');
this.interpretationIsPaused = true;
}
if (this.interpretationIsPaused) {
this.interpretationResume = resume;
logger.log('debug', `Paused inside of flag: ${page.url()}`);
updatePageOnPause(page);
this.socket.emit('log', '----- The interpretation has been paused -----', false);
} else {
resume();
}
});
2024-06-05 04:34:36 +05:30
const status = await interpreter.run(page, params);
2025-04-27 15:23:05 +05:30
// Structure the output to maintain separate data for each action type
2024-06-05 04:34:36 +05:30
const result = {
log: this.debugMessages,
result: status,
scrapeSchemaOutput: Object.keys(mergedScrapeSchema).length > 0
2025-05-27 23:20:49 +05:30
? { "schema_merged": [mergedScrapeSchema] }
: this.serializableDataByType.scrapeSchema.reduce((reducedObject, item, index) => {
reducedObject[`schema_${index}`] = item;
return reducedObject;
}, {} as Record<string, any>),
2025-04-27 15:23:05 +05:30
scrapeListOutput: this.serializableDataByType.scrapeList.reduce((reducedObject, item, index) => {
reducedObject[`list_${index}`] = item;
return reducedObject;
}, {} as Record<string, any>),
2024-06-05 04:34:36 +05:30
binaryOutput: this.binaryData.reduce((reducedObject, item, index) => {
reducedObject[`item_${index}`] = item;
return reducedObject;
}, {} as Record<string, any>)
2024-06-05 04:34:36 +05:30
}
2024-06-05 04:36:22 +05:30
logger.log('debug', `Interpretation finished`);
2024-06-05 04:34:36 +05:30
this.clearState();
return result;
}
/**
* Returns true if an interpretation is currently running.
* @returns {boolean}
*/
public interpretationInProgress = () => {
return this.interpreter !== null;
};
/**
* Updates the socket used for communication with the client (frontend).
* @param socket Socket.io socket instance enabling communication with the client (frontend) side.
* @returns void
*/
2024-06-05 04:36:22 +05:30
public updateSocket = (socket: Socket): void => {
this.socket = socket;
this.subscribeToPausing();
};
2025-09-28 22:53:52 +05:30
/**
* Adds data to persistence buffer for batched processing
* @private
*/
private addToPersistenceBatch(actionType: string, data: any, listIndex?: number, creditValidated: boolean = false): void {
this.persistenceBuffer.push({
actionType,
data,
listIndex,
timestamp: Date.now(),
creditValidated
});
logger.log('debug', `Added ${actionType} to persistence buffer (${this.persistenceBuffer.length} items)`);
}
/**
* Schedules a batched flush if not already scheduled
* @private
*/
private scheduleBatchFlush(): void {
if (!this.persistenceTimer && !this.persistenceInProgress) {
this.persistenceTimer = setTimeout(async () => {
await this.flushPersistenceBuffer();
}, this.BATCH_TIMEOUT);
}
}
/**
* Flushes persistence buffer to database in a single transaction
* @private
*/
private async flushPersistenceBuffer(): Promise<void> {
if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) {
return;
}
if (this.persistenceTimer) {
clearTimeout(this.persistenceTimer);
this.persistenceTimer = null;
}
this.persistenceInProgress = true;
const batchToProcess = [...this.persistenceBuffer];
this.persistenceBuffer = [];
try {
const sequelize = require('../../storage/db').default;
await sequelize.transaction(async (transaction: any) => {
const { Run } = require('../../models');
const run = await Run.findOne({
where: { runId: this.currentRunId! },
transaction
});
if (!run) {
logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`);
return;
}
const currentSerializableOutput = run.serializableOutput ?
JSON.parse(JSON.stringify(run.serializableOutput)) :
{ scrapeSchema: [], scrapeList: [] };
let hasUpdates = false;
for (const item of batchToProcess) {
if (item.actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(item.data) ? item.data : [item.data];
currentSerializableOutput.scrapeSchema = newSchemaData;
hasUpdates = true;
} else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') {
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
currentSerializableOutput.scrapeList = [];
}
currentSerializableOutput.scrapeList[item.listIndex] = item.data;
hasUpdates = true;
}
}
if (hasUpdates) {
await run.update({
serializableOutput: currentSerializableOutput
}, { transaction });
logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`);
}
});
} catch (error: any) {
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`);
this.persistenceBuffer.unshift(...batchToProcess);
setTimeout(async () => {
await this.flushPersistenceBuffer();
}, 5000);
} finally {
this.persistenceInProgress = false;
}
};
2024-06-05 04:34:36 +05:30
}