feat: add batch persistence logic

This commit is contained in:
Rohit Rajan
2025-09-28 22:53:52 +05:30
parent 24af62c026
commit e75a10dcfe

View File

@@ -118,6 +118,22 @@ export class WorkflowInterpreter {
*/
private currentRunId: string | null = null;
/**
* Batched persistence system for performance optimization
*/
private persistenceBuffer: Array<{
actionType: string;
data: any;
listIndex?: number;
timestamp: number;
creditValidated: boolean;
}> = [];
private persistenceTimer: NodeJS.Timeout | null = null;
private readonly BATCH_SIZE = 5; // Process every 5 items
private readonly BATCH_TIMEOUT = 3000; // Or every 3 seconds
private persistenceInProgress = false;
/**
* An array of id's of the pairs from the workflow that are about to be paused.
* As "breakpoints".
@@ -303,13 +319,27 @@ export class WorkflowInterpreter {
await this.interpreter.stop();
this.socket.emit('log', '----- The interpretation has been stopped -----', false);
this.clearState();
await this.clearState();
} else {
logger.log('error', 'Cannot stop: No active interpretation.');
}
};
public clearState = () => {
public clearState = async (): Promise<void> => {
if (this.persistenceBuffer.length > 0) {
try {
await this.flushPersistenceBuffer();
logger.log('debug', 'Successfully flushed final persistence buffer during cleanup');
} catch (error: any) {
logger.log('error', `Failed to flush final persistence buffer: ${error.message}`);
}
}
if (this.persistenceTimer) {
clearTimeout(this.persistenceTimer);
this.persistenceTimer = null;
}
this.debugMessages = [];
this.interpretationIsPaused = false;
this.activeId = null;
@@ -324,6 +354,8 @@ export class WorkflowInterpreter {
this.binaryData = [];
this.currentScrapeListIndex = 0;
this.currentRunId = null;
this.persistenceBuffer = [];
this.persistenceInProgress = false;
}
/**
@@ -336,61 +368,22 @@ export class WorkflowInterpreter {
};
/**
* Persists data to database in real-time during interpretation
* Persists extracted data to database with intelligent batching for performance
* Falls back to immediate persistence for critical operations
* @private
*/
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
if (!this.currentRunId) {
logger.log('debug', 'No run ID available for real-time persistence');
logger.log('debug', 'No run ID available for persistence');
return;
}
try {
const run = await Run.findOne({ where: { runId: this.currentRunId } });
this.addToPersistenceBatch(actionType, data, listIndex, true);
if (!run) {
logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`);
return;
}
const currentSerializableOutput = run.serializableOutput ?
JSON.parse(JSON.stringify(run.serializableOutput)) :
{ scrapeSchema: [], scrapeList: [] };
if (actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(data) ? data : [data];
const updatedOutput = {
...currentSerializableOutput,
scrapeSchema: newSchemaData
};
await run.update({
serializableOutput: updatedOutput
});
logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`);
} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
currentSerializableOutput.scrapeList = [];
}
const updatedList = [...currentSerializableOutput.scrapeList];
updatedList[listIndex] = data;
const updatedOutput = {
...currentSerializableOutput,
scrapeList: updatedList
};
await run.update({
serializableOutput: updatedOutput
});
logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`);
}
} catch (error: any) {
logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`);
if (actionType === 'scrapeSchema' || this.persistenceBuffer.length >= this.BATCH_SIZE) {
await this.flushPersistenceBuffer();
} else {
this.scheduleBatchFlush();
}
};
@@ -569,4 +562,106 @@ export class WorkflowInterpreter {
this.socket = socket;
this.subscribeToPausing();
};
/**
* Adds data to persistence buffer for batched processing
* @private
*/
private addToPersistenceBatch(actionType: string, data: any, listIndex?: number, creditValidated: boolean = false): void {
this.persistenceBuffer.push({
actionType,
data,
listIndex,
timestamp: Date.now(),
creditValidated
});
logger.log('debug', `Added ${actionType} to persistence buffer (${this.persistenceBuffer.length} items)`);
}
/**
* Schedules a batched flush if not already scheduled
* @private
*/
private scheduleBatchFlush(): void {
if (!this.persistenceTimer && !this.persistenceInProgress) {
this.persistenceTimer = setTimeout(async () => {
await this.flushPersistenceBuffer();
}, this.BATCH_TIMEOUT);
}
}
/**
* Flushes persistence buffer to database in a single transaction
* @private
*/
private async flushPersistenceBuffer(): Promise<void> {
if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) {
return;
}
if (this.persistenceTimer) {
clearTimeout(this.persistenceTimer);
this.persistenceTimer = null;
}
this.persistenceInProgress = true;
const batchToProcess = [...this.persistenceBuffer];
this.persistenceBuffer = [];
try {
const sequelize = require('../../storage/db').default;
await sequelize.transaction(async (transaction: any) => {
const { Run } = require('../../models');
const run = await Run.findOne({
where: { runId: this.currentRunId! },
transaction
});
if (!run) {
logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`);
return;
}
const currentSerializableOutput = run.serializableOutput ?
JSON.parse(JSON.stringify(run.serializableOutput)) :
{ scrapeSchema: [], scrapeList: [] };
let hasUpdates = false;
for (const item of batchToProcess) {
if (item.actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(item.data) ? item.data : [item.data];
currentSerializableOutput.scrapeSchema = newSchemaData;
hasUpdates = true;
} else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') {
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
currentSerializableOutput.scrapeList = [];
}
currentSerializableOutput.scrapeList[item.listIndex] = item.data;
hasUpdates = true;
}
}
if (hasUpdates) {
await run.update({
serializableOutput: currentSerializableOutput
}, { transaction });
logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`);
}
});
} catch (error: any) {
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`);
this.persistenceBuffer.unshift(...batchToProcess);
setTimeout(async () => {
await this.flushPersistenceBuffer();
}, 5000);
} finally {
this.persistenceInProgress = false;
}
};
}