feat: circuit breaker for db connection issues
This commit is contained in:
@@ -1076,13 +1076,22 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Circuit breaker for database connection issues
|
||||||
|
let consecutiveDbErrors = 0;
|
||||||
|
const MAX_CONSECUTIVE_ERRORS = 3;
|
||||||
|
const CIRCUIT_BREAKER_COOLDOWN = 30000;
|
||||||
|
let circuitBreakerOpenUntil = 0;
|
||||||
|
|
||||||
async function processQueuedRuns() {
|
async function processQueuedRuns() {
|
||||||
try {
|
try {
|
||||||
|
if (Date.now() < circuitBreakerOpenUntil) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
const queuedRun = await Run.findOne({
|
const queuedRun = await Run.findOne({
|
||||||
where: { status: 'queued' },
|
where: { status: 'queued' },
|
||||||
order: [['startedAt', 'ASC']]
|
order: [['startedAt', 'ASC']],
|
||||||
});
|
});
|
||||||
|
consecutiveDbErrors = 0;
|
||||||
if (!queuedRun) return;
|
if (!queuedRun) return;
|
||||||
|
|
||||||
const userId = queuedRun.runByUserId;
|
const userId = queuedRun.runByUserId;
|
||||||
@@ -1140,7 +1149,14 @@ async function processQueuedRuns() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
logger.log('error', `Error processing queued runs: ${error.message}`);
|
consecutiveDbErrors++;
|
||||||
|
|
||||||
|
if (consecutiveDbErrors >= MAX_CONSECUTIVE_ERRORS) {
|
||||||
|
circuitBreakerOpenUntil = Date.now() + CIRCUIT_BREAKER_COOLDOWN;
|
||||||
|
logger.log('error', `Circuit breaker opened after ${MAX_CONSECUTIVE_ERRORS} consecutive errors. Cooling down for ${CIRCUIT_BREAKER_COOLDOWN/1000}s`);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.log('error', `Error processing queued runs (${consecutiveDbErrors}/${MAX_CONSECUTIVE_ERRORS}): ${error.message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user