From e75a10dcfed98f4b46ac448f7d31da8be54d24a1 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sun, 28 Sep 2025 22:53:52 +0530 Subject: [PATCH] feat: add batch persistence logic --- .../classes/Interpreter.ts | 193 +++++++++++++----- 1 file changed, 144 insertions(+), 49 deletions(-) diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 9338f506..79d2242c 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -118,6 +118,22 @@ export class WorkflowInterpreter { */ private currentRunId: string | null = null; + /** + * Batched persistence system for performance optimization + */ + private persistenceBuffer: Array<{ + actionType: string; + data: any; + listIndex?: number; + timestamp: number; + creditValidated: boolean; + }> = []; + + private persistenceTimer: NodeJS.Timeout | null = null; + private readonly BATCH_SIZE = 5; // Process every 5 items + private readonly BATCH_TIMEOUT = 3000; // Or every 3 seconds + private persistenceInProgress = false; + /** * An array of id's of the pairs from the workflow that are about to be paused. * As "breakpoints". @@ -303,13 +319,27 @@ export class WorkflowInterpreter { await this.interpreter.stop(); this.socket.emit('log', '----- The interpretation has been stopped -----', false); - this.clearState(); + await this.clearState(); } else { logger.log('error', 'Cannot stop: No active interpretation.'); } }; - public clearState = () => { + public clearState = async (): Promise => { + if (this.persistenceBuffer.length > 0) { + try { + await this.flushPersistenceBuffer(); + logger.log('debug', 'Successfully flushed final persistence buffer during cleanup'); + } catch (error: any) { + logger.log('error', `Failed to flush final persistence buffer: ${error.message}`); + } + } + + if (this.persistenceTimer) { + clearTimeout(this.persistenceTimer); + this.persistenceTimer = null; + } + this.debugMessages = []; this.interpretationIsPaused = false; this.activeId = null; @@ -324,6 +354,8 @@ export class WorkflowInterpreter { this.binaryData = []; this.currentScrapeListIndex = 0; this.currentRunId = null; + this.persistenceBuffer = []; + this.persistenceInProgress = false; } /** @@ -336,61 +368,22 @@ export class WorkflowInterpreter { }; /** - * Persists data to database in real-time during interpretation + * Persists extracted data to database with intelligent batching for performance + * Falls back to immediate persistence for critical operations * @private */ private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise => { if (!this.currentRunId) { - logger.log('debug', 'No run ID available for real-time persistence'); + logger.log('debug', 'No run ID available for persistence'); return; } - try { - const run = await Run.findOne({ where: { runId: this.currentRunId } }); - - if (!run) { - logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`); - return; - } - - const currentSerializableOutput = run.serializableOutput ? - JSON.parse(JSON.stringify(run.serializableOutput)) : - { scrapeSchema: [], scrapeList: [] }; + this.addToPersistenceBatch(actionType, data, listIndex, true); - if (actionType === 'scrapeSchema') { - const newSchemaData = Array.isArray(data) ? data : [data]; - const updatedOutput = { - ...currentSerializableOutput, - scrapeSchema: newSchemaData - }; - - await run.update({ - serializableOutput: updatedOutput - }); - - logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`); - - } else if (actionType === 'scrapeList' && typeof listIndex === 'number') { - if (!Array.isArray(currentSerializableOutput.scrapeList)) { - currentSerializableOutput.scrapeList = []; - } - - const updatedList = [...currentSerializableOutput.scrapeList]; - updatedList[listIndex] = data; - - const updatedOutput = { - ...currentSerializableOutput, - scrapeList: updatedList - }; - - await run.update({ - serializableOutput: updatedOutput - }); - - logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`); - } - } catch (error: any) { - logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`); + if (actionType === 'scrapeSchema' || this.persistenceBuffer.length >= this.BATCH_SIZE) { + await this.flushPersistenceBuffer(); + } else { + this.scheduleBatchFlush(); } }; @@ -569,4 +562,106 @@ export class WorkflowInterpreter { this.socket = socket; this.subscribeToPausing(); }; + + /** + * Adds data to persistence buffer for batched processing + * @private + */ + private addToPersistenceBatch(actionType: string, data: any, listIndex?: number, creditValidated: boolean = false): void { + this.persistenceBuffer.push({ + actionType, + data, + listIndex, + timestamp: Date.now(), + creditValidated + }); + + logger.log('debug', `Added ${actionType} to persistence buffer (${this.persistenceBuffer.length} items)`); + } + + /** + * Schedules a batched flush if not already scheduled + * @private + */ + private scheduleBatchFlush(): void { + if (!this.persistenceTimer && !this.persistenceInProgress) { + this.persistenceTimer = setTimeout(async () => { + await this.flushPersistenceBuffer(); + }, this.BATCH_TIMEOUT); + } + } + + /** + * Flushes persistence buffer to database in a single transaction + * @private + */ + private async flushPersistenceBuffer(): Promise { + if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { + return; + } + + if (this.persistenceTimer) { + clearTimeout(this.persistenceTimer); + this.persistenceTimer = null; + } + + this.persistenceInProgress = true; + const batchToProcess = [...this.persistenceBuffer]; + this.persistenceBuffer = []; + + try { + const sequelize = require('../../storage/db').default; + await sequelize.transaction(async (transaction: any) => { + const { Run } = require('../../models'); + const run = await Run.findOne({ + where: { runId: this.currentRunId! }, + transaction + }); + + if (!run) { + logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`); + return; + } + + const currentSerializableOutput = run.serializableOutput ? + JSON.parse(JSON.stringify(run.serializableOutput)) : + { scrapeSchema: [], scrapeList: [] }; + + let hasUpdates = false; + + for (const item of batchToProcess) { + if (item.actionType === 'scrapeSchema') { + const newSchemaData = Array.isArray(item.data) ? item.data : [item.data]; + currentSerializableOutput.scrapeSchema = newSchemaData; + hasUpdates = true; + } else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') { + if (!Array.isArray(currentSerializableOutput.scrapeList)) { + currentSerializableOutput.scrapeList = []; + } + currentSerializableOutput.scrapeList[item.listIndex] = item.data; + hasUpdates = true; + } + } + + if (hasUpdates) { + await run.update({ + serializableOutput: currentSerializableOutput + }, { transaction }); + + logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`); + } + }); + + } catch (error: any) { + logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`); + + this.persistenceBuffer.unshift(...batchToProcess); + + setTimeout(async () => { + await this.flushPersistenceBuffer(); + }, 5000); + } finally { + this.persistenceInProgress = false; + } + }; }