diff --git a/server/src/browser-management/classes/BrowserPool.ts b/server/src/browser-management/classes/BrowserPool.ts index 286ef681..2ddd01ed 100644 --- a/server/src/browser-management/classes/BrowserPool.ts +++ b/server/src/browser-management/classes/BrowserPool.ts @@ -645,14 +645,6 @@ export class BrowserPool { } }; - /** - * Legacy method - kept for backwards compatibility but now uses atomic version - * @deprecated Use reserveBrowserSlotAtomic instead - */ - public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => { - return this.reserveBrowserSlotAtomic(id, userId, state); - }; - /** * Upgrades a reserved slot to an actual browser instance. * diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts index da3f9dd4..63c7bad4 100644 --- a/server/src/schedule-worker.ts +++ b/server/src/schedule-worker.ts @@ -6,7 +6,6 @@ import logger from './logger'; import Robot from './models/Robot'; import { handleRunRecording } from './workflow-management/scheduler'; import { computeNextRun } from './utils/schedule'; -import { v4 as uuid } from "uuid"; if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) { throw new Error('One or more required environment variables are missing.'); @@ -16,7 +15,7 @@ const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIC const pgBoss = new PgBoss({ connectionString: pgBossConnectionString, - max: 5, + max: 3, expireInHours: 23, }); @@ -36,7 +35,7 @@ async function processScheduledWorkflow(job: Job) { try { // Execute the workflow using the existing handleRunRecording function - const result = await handleRunRecording(id, userId); + await handleRunRecording(id, userId); // Update the robot's schedule with last run and next run times const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); @@ -143,11 +142,11 @@ pgBoss.on('error', (error) => { process.on('SIGTERM', async () => { logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...'); await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss scheduler stopped, ready for termination'); }); process.on('SIGINT', async () => { logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...'); await pgBoss.stop(); - process.exit(0); + logger.log('info', 'PgBoss scheduler stopped, waiting for main process cleanup...'); }); diff --git a/server/src/storage/pgboss.ts b/server/src/storage/pgboss.ts index 9a6eedd1..7e657ed1 100644 --- a/server/src/storage/pgboss.ts +++ b/server/src/storage/pgboss.ts @@ -26,11 +26,7 @@ const pgBossConnectionString = `postgres://${process.env.DB_USER}:${encodeURICom */ export const pgBossClient = new PgBoss({ connectionString: pgBossConnectionString, - max: 3, // Small pool since we only send jobs - ssl: { - require: true, - rejectUnauthorized: false, - }, + max: 3, }); let isStarted = false; diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 8a73a25f..07f99dfe 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -143,8 +143,10 @@ export class WorkflowInterpreter { }> = []; private persistenceTimer: NodeJS.Timeout | null = null; + private persistenceRetryTimer: NodeJS.Timeout | null = null; private readonly BATCH_SIZE = 5; private readonly BATCH_TIMEOUT = 3000; + private readonly MAX_PERSISTENCE_RETRIES = 3; private persistenceInProgress = false; private persistenceRetryCount = 0; @@ -172,6 +174,23 @@ export class WorkflowInterpreter { this.currentRunId = runId || null; } + /** + * Removes pausing-related socket listeners to prevent memory leaks + * Must be called before re-registering listeners or during cleanup + * @private + */ + private removePausingListeners(): void { + try { + this.socket.removeAllListeners('pause'); + this.socket.removeAllListeners('resume'); + this.socket.removeAllListeners('step'); + this.socket.removeAllListeners('breakpoints'); + logger.log('debug', 'Removed pausing socket listeners'); + } catch (error: any) { + logger.warn(`Error removing pausing listeners: ${error.message}`); + } + } + /** * Subscribes to the events that are used to control the interpretation. * The events are pause, resume, step and breakpoints. @@ -179,6 +198,8 @@ export class WorkflowInterpreter { * @returns void */ public subscribeToPausing = () => { + this.removePausingListeners(); + this.socket.on('pause', () => { this.interpretationIsPaused = true; }); @@ -363,6 +384,11 @@ export class WorkflowInterpreter { this.persistenceTimer = null; } + if (this.persistenceRetryTimer) { + clearTimeout(this.persistenceRetryTimer); + this.persistenceRetryTimer = null; + } + if (this.interpreter) { try { if (!this.interpreter.getIsAborted()) { @@ -370,11 +396,18 @@ export class WorkflowInterpreter { } await this.interpreter.stop(); logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup'); + + if (typeof this.interpreter.cleanup === 'function') { + await this.interpreter.cleanup(); + logger.log('debug', 'mx-cloud interpreter cleanup completed'); + } } catch (error: any) { logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`); } } + this.removePausingListeners(); + this.debugMessages = []; this.interpretationIsPaused = false; this.activeId = null; @@ -815,16 +848,22 @@ export class WorkflowInterpreter { this.persistenceRetryCount = 0; } - if (this.persistenceRetryCount < 3) { + if (this.persistenceRetryCount < this.MAX_PERSISTENCE_RETRIES) { this.persistenceBuffer.unshift(...batchToProcess); this.persistenceRetryCount++; const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000); - setTimeout(async () => { + + if (this.persistenceRetryTimer) { + clearTimeout(this.persistenceRetryTimer); + } + + this.persistenceRetryTimer = setTimeout(async () => { + this.persistenceRetryTimer = null; await this.flushPersistenceBuffer(); }, backoffDelay); - logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`); + logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/${this.MAX_PERSISTENCE_RETRIES} in ${backoffDelay}ms`); } else { logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`); this.persistenceRetryCount = 0;