fix: add process retry count logic
This commit is contained in:
@@ -130,9 +130,10 @@ export class WorkflowInterpreter {
|
|||||||
}> = [];
|
}> = [];
|
||||||
|
|
||||||
private persistenceTimer: NodeJS.Timeout | null = null;
|
private persistenceTimer: NodeJS.Timeout | null = null;
|
||||||
private readonly BATCH_SIZE = 5; // Process every 5 items
|
private readonly BATCH_SIZE = 5;
|
||||||
private readonly BATCH_TIMEOUT = 3000; // Or every 3 seconds
|
private readonly BATCH_TIMEOUT = 3000;
|
||||||
private persistenceInProgress = false;
|
private persistenceInProgress = false;
|
||||||
|
private persistenceRetryCount = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An array of id's of the pairs from the workflow that are about to be paused.
|
* An array of id's of the pairs from the workflow that are about to be paused.
|
||||||
@@ -340,6 +341,18 @@ export class WorkflowInterpreter {
|
|||||||
this.persistenceTimer = null;
|
this.persistenceTimer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.interpreter) {
|
||||||
|
try {
|
||||||
|
if (!this.interpreter.getIsAborted()) {
|
||||||
|
this.interpreter.abort();
|
||||||
|
}
|
||||||
|
await this.interpreter.stop();
|
||||||
|
logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup');
|
||||||
|
} catch (error: any) {
|
||||||
|
logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.debugMessages = [];
|
this.debugMessages = [];
|
||||||
this.interpretationIsPaused = false;
|
this.interpretationIsPaused = false;
|
||||||
this.activeId = null;
|
this.activeId = null;
|
||||||
@@ -356,6 +369,7 @@ export class WorkflowInterpreter {
|
|||||||
this.currentRunId = null;
|
this.currentRunId = null;
|
||||||
this.persistenceBuffer = [];
|
this.persistenceBuffer = [];
|
||||||
this.persistenceInProgress = false;
|
this.persistenceInProgress = false;
|
||||||
|
this.persistenceRetryCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -541,7 +555,6 @@ export class WorkflowInterpreter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
logger.log('debug', `Interpretation finished`);
|
logger.log('debug', `Interpretation finished`);
|
||||||
this.clearState();
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -652,14 +665,29 @@ export class WorkflowInterpreter {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.persistenceRetryCount = 0;
|
||||||
|
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`);
|
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`);
|
||||||
|
|
||||||
this.persistenceBuffer.unshift(...batchToProcess);
|
if (!this.persistenceRetryCount) {
|
||||||
|
this.persistenceRetryCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
setTimeout(async () => {
|
if (this.persistenceRetryCount < 3) {
|
||||||
await this.flushPersistenceBuffer();
|
this.persistenceBuffer.unshift(...batchToProcess);
|
||||||
}, 5000);
|
this.persistenceRetryCount++;
|
||||||
|
|
||||||
|
const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000);
|
||||||
|
setTimeout(async () => {
|
||||||
|
await this.flushPersistenceBuffer();
|
||||||
|
}, backoffDelay);
|
||||||
|
|
||||||
|
logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`);
|
||||||
|
} else {
|
||||||
|
logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`);
|
||||||
|
this.persistenceRetryCount = 0;
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.persistenceInProgress = false;
|
this.persistenceInProgress = false;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user