From d6190976002810d61fc858358128155d514911c4 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Wed, 10 Sep 2025 00:21:43 +0530 Subject: [PATCH] feat: persist data to db --- .../classes/Interpreter.ts | 161 +++++++++++++++++- 1 file changed, 156 insertions(+), 5 deletions(-) diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index f249f26e..0ed19f19 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -4,6 +4,7 @@ import { Socket } from "socket.io"; import { Page } from "playwright"; import { InterpreterSettings } from "../../types"; import { decrypt } from "../../utils/auth"; +import Run from "../../models/Run"; /** * Decrypts any encrypted inputs in the workflow. If checkLimit is true, it will also handle the limit validation for scrapeList action. @@ -112,6 +113,11 @@ export class WorkflowInterpreter { */ private currentScrapeListIndex: number = 0; + /** + * Current run ID for real-time persistence + */ + private currentRunId: string | null = null; + /** * An array of id's of the pairs from the workflow that are about to be paused. * As "breakpoints". @@ -128,10 +134,12 @@ export class WorkflowInterpreter { /** * A public constructor taking a socket instance for communication with the client. * @param socket Socket.io socket instance enabling communication with the client (frontend) side. + * @param runId Optional run ID for real-time data persistence * @constructor */ - constructor(socket: Socket) { + constructor(socket: Socket, runId?: string) { this.socket = socket; + this.currentRunId = runId || null; } /** @@ -202,8 +210,14 @@ export class WorkflowInterpreter { this.currentActionType = type; } }, - serializableCallback: (data: any) => { + serializableCallback: async (data: any) => { if (this.currentActionType === 'scrapeSchema') { + const cumulativeScrapeSchemaData = Array.isArray(data) && data.length > 0 ? data : [data]; + + if (cumulativeScrapeSchemaData.length > 0) { + await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData); + } + if (Array.isArray(data) && data.length > 0) { this.socket.emit('serializableCallback', { type: 'captureText', @@ -216,13 +230,24 @@ export class WorkflowInterpreter { }); } } else if (this.currentActionType === 'scrapeList') { + if (data && Array.isArray(data) && data.length > 0) { + // Use the current index for persistence + await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex); + } + this.socket.emit('serializableCallback', { type: 'captureList', data }); } }, - binaryCallback: (data: string, mimetype: string) => { + binaryCallback: async (data: string, mimetype: string) => { + const binaryItem = { mimetype, data: JSON.stringify(data) }; + this.binaryData.push(binaryItem); + + // Persist binary data to database + await this.persistBinaryDataToDatabase(binaryItem); + this.socket.emit('binaryCallback', { data, mimetype, @@ -272,6 +297,10 @@ export class WorkflowInterpreter { public stopInterpretation = async () => { if (this.interpreter) { logger.log('info', 'Stopping the interpretation.'); + + this.interpreter.abort(); + logger.log('info', 'maxun-core interpreter aborted - data collection stopped immediately'); + await this.interpreter.stop(); this.socket.emit('log', '----- The interpretation has been stopped -----', false); this.clearState(); @@ -294,8 +323,115 @@ export class WorkflowInterpreter { }; this.binaryData = []; this.currentScrapeListIndex = 0; + this.currentRunId = null; } + /** + * Sets the current run ID for real-time persistence. + * @param runId The run ID to set + */ + public setRunId = (runId: string): void => { + this.currentRunId = runId; + logger.log('debug', `Set run ID for real-time persistence: ${runId}`); + }; + + /** + * Persists data to database in real-time during interpretation + * @private + */ + private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise => { + if (!this.currentRunId) { + logger.log('debug', 'No run ID available for real-time persistence'); + return; + } + + try { + const run = await Run.findOne({ where: { runId: this.currentRunId } }); + + if (!run) { + logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`); + return; + } + + const currentSerializableOutput = run.serializableOutput ? + JSON.parse(JSON.stringify(run.serializableOutput)) : + { scrapeSchema: [], scrapeList: [] }; + + if (actionType === 'scrapeSchema') { + const newSchemaData = Array.isArray(data) ? data : [data]; + const updatedOutput = { + ...currentSerializableOutput, + scrapeSchema: newSchemaData + }; + + await run.update({ + serializableOutput: updatedOutput + }); + + logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`); + + } else if (actionType === 'scrapeList' && typeof listIndex === 'number') { + if (!Array.isArray(currentSerializableOutput.scrapeList)) { + currentSerializableOutput.scrapeList = []; + } + + const updatedList = [...currentSerializableOutput.scrapeList]; + updatedList[listIndex] = data; + + const updatedOutput = { + ...currentSerializableOutput, + scrapeList: updatedList + }; + + await run.update({ + serializableOutput: updatedOutput + }); + + logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`); + } + } catch (error: any) { + logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`); + } + }; + + /** + * Persists binary data to database in real-time + * @private + */ + private persistBinaryDataToDatabase = async (binaryItem: { mimetype: string, data: string }): Promise => { + if (!this.currentRunId) { + logger.log('debug', 'No run ID available for binary data persistence'); + return; + } + + try { + const run = await Run.findOne({ where: { runId: this.currentRunId } }); + if (!run) { + logger.log('warn', `Run not found for binary data persistence: ${this.currentRunId}`); + return; + } + + const currentBinaryOutput = run.binaryOutput ? + JSON.parse(JSON.stringify(run.binaryOutput)) : + {}; + + const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`; + + const updatedBinaryOutput = { + ...currentBinaryOutput, + [uniqueKey]: binaryItem + }; + + await run.update({ + binaryOutput: updatedBinaryOutput + }); + + logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.mimetype}`); + } catch (error: any) { + logger.log('error', `Failed to persist binary data in real-time for run ${this.currentRunId}: ${error.message}`); + } + }; + /** * Interprets the recording as a run. * @param workflow The workflow to interpret. @@ -333,7 +469,7 @@ export class WorkflowInterpreter { this.currentScrapeListIndex++; } }, - serializableCallback: (data: any) => { + serializableCallback: async (data: any) => { if (this.currentActionType === 'scrapeSchema') { if (Array.isArray(data) && data.length > 0) { mergedScrapeSchema = { ...mergedScrapeSchema, ...data[0] }; @@ -342,14 +478,29 @@ export class WorkflowInterpreter { mergedScrapeSchema = { ...mergedScrapeSchema, ...data }; this.serializableDataByType.scrapeSchema.push([data]); } + + // Persist the cumulative scrapeSchema data + const cumulativeScrapeSchemaData = Object.keys(mergedScrapeSchema).length > 0 ? [mergedScrapeSchema] : []; + if (cumulativeScrapeSchemaData.length > 0) { + await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData); + } } else if (this.currentActionType === 'scrapeList') { + if (data && Array.isArray(data) && data.length > 0) { + // Use the current index for persistence + await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex); + } this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data; } this.socket.emit('serializableCallback', data); }, binaryCallback: async (data: string, mimetype: string) => { - this.binaryData.push({ mimetype, data: JSON.stringify(data) }); + const binaryItem = { mimetype, data: JSON.stringify(data) }; + this.binaryData.push(binaryItem); + + // Persist binary data to database + await this.persistBinaryDataToDatabase(binaryItem); + this.socket.emit('binaryCallback', { data, mimetype }); } }