feat: persist data to db

This commit is contained in:
Rohit Rajan
2025-09-10 00:21:43 +05:30
parent b948ac9d9e
commit d619097600

View File

@@ -4,6 +4,7 @@ import { Socket } from "socket.io";
import { Page } from "playwright";
import { InterpreterSettings } from "../../types";
import { decrypt } from "../../utils/auth";
import Run from "../../models/Run";
/**
* Decrypts any encrypted inputs in the workflow. If checkLimit is true, it will also handle the limit validation for scrapeList action.
@@ -112,6 +113,11 @@ export class WorkflowInterpreter {
*/
private currentScrapeListIndex: number = 0;
/**
* Current run ID for real-time persistence
*/
private currentRunId: string | null = null;
/**
* An array of id's of the pairs from the workflow that are about to be paused.
* As "breakpoints".
@@ -128,10 +134,12 @@ export class WorkflowInterpreter {
/**
* A public constructor taking a socket instance for communication with the client.
* @param socket Socket.io socket instance enabling communication with the client (frontend) side.
* @param runId Optional run ID for real-time data persistence
* @constructor
*/
constructor(socket: Socket) {
constructor(socket: Socket, runId?: string) {
this.socket = socket;
this.currentRunId = runId || null;
}
/**
@@ -202,8 +210,14 @@ export class WorkflowInterpreter {
this.currentActionType = type;
}
},
serializableCallback: (data: any) => {
serializableCallback: async (data: any) => {
if (this.currentActionType === 'scrapeSchema') {
const cumulativeScrapeSchemaData = Array.isArray(data) && data.length > 0 ? data : [data];
if (cumulativeScrapeSchemaData.length > 0) {
await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
}
if (Array.isArray(data) && data.length > 0) {
this.socket.emit('serializableCallback', {
type: 'captureText',
@@ -216,13 +230,24 @@ export class WorkflowInterpreter {
});
}
} else if (this.currentActionType === 'scrapeList') {
if (data && Array.isArray(data) && data.length > 0) {
// Use the current index for persistence
await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex);
}
this.socket.emit('serializableCallback', {
type: 'captureList',
data
});
}
},
binaryCallback: (data: string, mimetype: string) => {
binaryCallback: async (data: string, mimetype: string) => {
const binaryItem = { mimetype, data: JSON.stringify(data) };
this.binaryData.push(binaryItem);
// Persist binary data to database
await this.persistBinaryDataToDatabase(binaryItem);
this.socket.emit('binaryCallback', {
data,
mimetype,
@@ -272,6 +297,10 @@ export class WorkflowInterpreter {
public stopInterpretation = async () => {
if (this.interpreter) {
logger.log('info', 'Stopping the interpretation.');
this.interpreter.abort();
logger.log('info', 'maxun-core interpreter aborted - data collection stopped immediately');
await this.interpreter.stop();
this.socket.emit('log', '----- The interpretation has been stopped -----', false);
this.clearState();
@@ -294,8 +323,115 @@ export class WorkflowInterpreter {
};
this.binaryData = [];
this.currentScrapeListIndex = 0;
this.currentRunId = null;
}
/**
* Sets the current run ID for real-time persistence.
* @param runId The run ID to set
*/
public setRunId = (runId: string): void => {
this.currentRunId = runId;
logger.log('debug', `Set run ID for real-time persistence: ${runId}`);
};
/**
* Persists data to database in real-time during interpretation
* @private
*/
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
if (!this.currentRunId) {
logger.log('debug', 'No run ID available for real-time persistence');
return;
}
try {
const run = await Run.findOne({ where: { runId: this.currentRunId } });
if (!run) {
logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`);
return;
}
const currentSerializableOutput = run.serializableOutput ?
JSON.parse(JSON.stringify(run.serializableOutput)) :
{ scrapeSchema: [], scrapeList: [] };
if (actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(data) ? data : [data];
const updatedOutput = {
...currentSerializableOutput,
scrapeSchema: newSchemaData
};
await run.update({
serializableOutput: updatedOutput
});
logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`);
} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
currentSerializableOutput.scrapeList = [];
}
const updatedList = [...currentSerializableOutput.scrapeList];
updatedList[listIndex] = data;
const updatedOutput = {
...currentSerializableOutput,
scrapeList: updatedList
};
await run.update({
serializableOutput: updatedOutput
});
logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`);
}
} catch (error: any) {
logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`);
}
};
/**
* Persists binary data to database in real-time
* @private
*/
private persistBinaryDataToDatabase = async (binaryItem: { mimetype: string, data: string }): Promise<void> => {
if (!this.currentRunId) {
logger.log('debug', 'No run ID available for binary data persistence');
return;
}
try {
const run = await Run.findOne({ where: { runId: this.currentRunId } });
if (!run) {
logger.log('warn', `Run not found for binary data persistence: ${this.currentRunId}`);
return;
}
const currentBinaryOutput = run.binaryOutput ?
JSON.parse(JSON.stringify(run.binaryOutput)) :
{};
const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`;
const updatedBinaryOutput = {
...currentBinaryOutput,
[uniqueKey]: binaryItem
};
await run.update({
binaryOutput: updatedBinaryOutput
});
logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.mimetype}`);
} catch (error: any) {
logger.log('error', `Failed to persist binary data in real-time for run ${this.currentRunId}: ${error.message}`);
}
};
/**
* Interprets the recording as a run.
* @param workflow The workflow to interpret.
@@ -333,7 +469,7 @@ export class WorkflowInterpreter {
this.currentScrapeListIndex++;
}
},
serializableCallback: (data: any) => {
serializableCallback: async (data: any) => {
if (this.currentActionType === 'scrapeSchema') {
if (Array.isArray(data) && data.length > 0) {
mergedScrapeSchema = { ...mergedScrapeSchema, ...data[0] };
@@ -342,14 +478,29 @@ export class WorkflowInterpreter {
mergedScrapeSchema = { ...mergedScrapeSchema, ...data };
this.serializableDataByType.scrapeSchema.push([data]);
}
// Persist the cumulative scrapeSchema data
const cumulativeScrapeSchemaData = Object.keys(mergedScrapeSchema).length > 0 ? [mergedScrapeSchema] : [];
if (cumulativeScrapeSchemaData.length > 0) {
await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
}
} else if (this.currentActionType === 'scrapeList') {
if (data && Array.isArray(data) && data.length > 0) {
// Use the current index for persistence
await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex);
}
this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data;
}
this.socket.emit('serializableCallback', data);
},
binaryCallback: async (data: string, mimetype: string) => {
this.binaryData.push({ mimetype, data: JSON.stringify(data) });
const binaryItem = { mimetype, data: JSON.stringify(data) };
this.binaryData.push(binaryItem);
// Persist binary data to database
await this.persistBinaryDataToDatabase(binaryItem);
this.socket.emit('binaryCallback', { data, mimetype });
}
}