From a83b69cfc667251e82fdf764bfe53e504e2d067c Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sun, 28 Sep 2025 23:01:23 +0530 Subject: [PATCH] fix: add process retry count logic --- .../classes/Interpreter.ts | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 79d2242c..8c1f7e5e 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -130,9 +130,10 @@ export class WorkflowInterpreter { }> = []; private persistenceTimer: NodeJS.Timeout | null = null; - private readonly BATCH_SIZE = 5; // Process every 5 items - private readonly BATCH_TIMEOUT = 3000; // Or every 3 seconds + private readonly BATCH_SIZE = 5; + private readonly BATCH_TIMEOUT = 3000; private persistenceInProgress = false; + private persistenceRetryCount = 0; /** * An array of id's of the pairs from the workflow that are about to be paused. @@ -340,6 +341,18 @@ export class WorkflowInterpreter { this.persistenceTimer = null; } + if (this.interpreter) { + try { + if (!this.interpreter.getIsAborted()) { + this.interpreter.abort(); + } + await this.interpreter.stop(); + logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup'); + } catch (error: any) { + logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`); + } + } + this.debugMessages = []; this.interpretationIsPaused = false; this.activeId = null; @@ -356,6 +369,7 @@ export class WorkflowInterpreter { this.currentRunId = null; this.persistenceBuffer = []; this.persistenceInProgress = false; + this.persistenceRetryCount = 0; } /** @@ -541,7 +555,6 @@ export class WorkflowInterpreter { } logger.log('debug', `Interpretation finished`); - this.clearState(); return result; } @@ -652,14 +665,29 @@ export class WorkflowInterpreter { } }); + this.persistenceRetryCount = 0; + } catch (error: any) { logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`); - this.persistenceBuffer.unshift(...batchToProcess); + if (!this.persistenceRetryCount) { + this.persistenceRetryCount = 0; + } - setTimeout(async () => { - await this.flushPersistenceBuffer(); - }, 5000); + if (this.persistenceRetryCount < 3) { + this.persistenceBuffer.unshift(...batchToProcess); + this.persistenceRetryCount++; + + const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000); + setTimeout(async () => { + await this.flushPersistenceBuffer(); + }, backoffDelay); + + logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`); + } else { + logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`); + this.persistenceRetryCount = 0; + } } finally { this.persistenceInProgress = false; }