fix: add persist timer, rm ssl
This commit is contained in:
@@ -645,14 +645,6 @@ export class BrowserPool {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Legacy method - kept for backwards compatibility but now uses atomic version
|
|
||||||
* @deprecated Use reserveBrowserSlotAtomic instead
|
|
||||||
*/
|
|
||||||
public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => {
|
|
||||||
return this.reserveBrowserSlotAtomic(id, userId, state);
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upgrades a reserved slot to an actual browser instance.
|
* Upgrades a reserved slot to an actual browser instance.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import logger from './logger';
|
|||||||
import Robot from './models/Robot';
|
import Robot from './models/Robot';
|
||||||
import { handleRunRecording } from './workflow-management/scheduler';
|
import { handleRunRecording } from './workflow-management/scheduler';
|
||||||
import { computeNextRun } from './utils/schedule';
|
import { computeNextRun } from './utils/schedule';
|
||||||
import { v4 as uuid } from "uuid";
|
|
||||||
|
|
||||||
if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
|
if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
|
||||||
throw new Error('One or more required environment variables are missing.');
|
throw new Error('One or more required environment variables are missing.');
|
||||||
@@ -16,7 +15,7 @@ const pgBossConnectionString = `postgresql://${process.env.DB_USER}:${encodeURIC
|
|||||||
|
|
||||||
const pgBoss = new PgBoss({
|
const pgBoss = new PgBoss({
|
||||||
connectionString: pgBossConnectionString,
|
connectionString: pgBossConnectionString,
|
||||||
max: 5,
|
max: 3,
|
||||||
expireInHours: 23,
|
expireInHours: 23,
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -36,7 +35,7 @@ async function processScheduledWorkflow(job: Job<ScheduledWorkflowData>) {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// Execute the workflow using the existing handleRunRecording function
|
// Execute the workflow using the existing handleRunRecording function
|
||||||
const result = await handleRunRecording(id, userId);
|
await handleRunRecording(id, userId);
|
||||||
|
|
||||||
// Update the robot's schedule with last run and next run times
|
// Update the robot's schedule with last run and next run times
|
||||||
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': id } });
|
||||||
@@ -143,11 +142,11 @@ pgBoss.on('error', (error) => {
|
|||||||
process.on('SIGTERM', async () => {
|
process.on('SIGTERM', async () => {
|
||||||
logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...');
|
logger.log('info', 'SIGTERM received, shutting down PgBoss scheduler...');
|
||||||
await pgBoss.stop();
|
await pgBoss.stop();
|
||||||
process.exit(0);
|
logger.log('info', 'PgBoss scheduler stopped, ready for termination');
|
||||||
});
|
});
|
||||||
|
|
||||||
process.on('SIGINT', async () => {
|
process.on('SIGINT', async () => {
|
||||||
logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...');
|
logger.log('info', 'SIGINT received, shutting down PgBoss scheduler...');
|
||||||
await pgBoss.stop();
|
await pgBoss.stop();
|
||||||
process.exit(0);
|
logger.log('info', 'PgBoss scheduler stopped, waiting for main process cleanup...');
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -26,11 +26,7 @@ const pgBossConnectionString = `postgres://${process.env.DB_USER}:${encodeURICom
|
|||||||
*/
|
*/
|
||||||
export const pgBossClient = new PgBoss({
|
export const pgBossClient = new PgBoss({
|
||||||
connectionString: pgBossConnectionString,
|
connectionString: pgBossConnectionString,
|
||||||
max: 3, // Small pool since we only send jobs
|
max: 3,
|
||||||
ssl: {
|
|
||||||
require: true,
|
|
||||||
rejectUnauthorized: false,
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let isStarted = false;
|
let isStarted = false;
|
||||||
|
|||||||
@@ -143,8 +143,10 @@ export class WorkflowInterpreter {
|
|||||||
}> = [];
|
}> = [];
|
||||||
|
|
||||||
private persistenceTimer: NodeJS.Timeout | null = null;
|
private persistenceTimer: NodeJS.Timeout | null = null;
|
||||||
|
private persistenceRetryTimer: NodeJS.Timeout | null = null;
|
||||||
private readonly BATCH_SIZE = 5;
|
private readonly BATCH_SIZE = 5;
|
||||||
private readonly BATCH_TIMEOUT = 3000;
|
private readonly BATCH_TIMEOUT = 3000;
|
||||||
|
private readonly MAX_PERSISTENCE_RETRIES = 3;
|
||||||
private persistenceInProgress = false;
|
private persistenceInProgress = false;
|
||||||
private persistenceRetryCount = 0;
|
private persistenceRetryCount = 0;
|
||||||
|
|
||||||
@@ -172,6 +174,23 @@ export class WorkflowInterpreter {
|
|||||||
this.currentRunId = runId || null;
|
this.currentRunId = runId || null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes pausing-related socket listeners to prevent memory leaks
|
||||||
|
* Must be called before re-registering listeners or during cleanup
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
private removePausingListeners(): void {
|
||||||
|
try {
|
||||||
|
this.socket.removeAllListeners('pause');
|
||||||
|
this.socket.removeAllListeners('resume');
|
||||||
|
this.socket.removeAllListeners('step');
|
||||||
|
this.socket.removeAllListeners('breakpoints');
|
||||||
|
logger.log('debug', 'Removed pausing socket listeners');
|
||||||
|
} catch (error: any) {
|
||||||
|
logger.warn(`Error removing pausing listeners: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribes to the events that are used to control the interpretation.
|
* Subscribes to the events that are used to control the interpretation.
|
||||||
* The events are pause, resume, step and breakpoints.
|
* The events are pause, resume, step and breakpoints.
|
||||||
@@ -179,6 +198,8 @@ export class WorkflowInterpreter {
|
|||||||
* @returns void
|
* @returns void
|
||||||
*/
|
*/
|
||||||
public subscribeToPausing = () => {
|
public subscribeToPausing = () => {
|
||||||
|
this.removePausingListeners();
|
||||||
|
|
||||||
this.socket.on('pause', () => {
|
this.socket.on('pause', () => {
|
||||||
this.interpretationIsPaused = true;
|
this.interpretationIsPaused = true;
|
||||||
});
|
});
|
||||||
@@ -363,6 +384,11 @@ export class WorkflowInterpreter {
|
|||||||
this.persistenceTimer = null;
|
this.persistenceTimer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.persistenceRetryTimer) {
|
||||||
|
clearTimeout(this.persistenceRetryTimer);
|
||||||
|
this.persistenceRetryTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (this.interpreter) {
|
if (this.interpreter) {
|
||||||
try {
|
try {
|
||||||
if (!this.interpreter.getIsAborted()) {
|
if (!this.interpreter.getIsAborted()) {
|
||||||
@@ -370,11 +396,18 @@ export class WorkflowInterpreter {
|
|||||||
}
|
}
|
||||||
await this.interpreter.stop();
|
await this.interpreter.stop();
|
||||||
logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup');
|
logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup');
|
||||||
|
|
||||||
|
if (typeof this.interpreter.cleanup === 'function') {
|
||||||
|
await this.interpreter.cleanup();
|
||||||
|
logger.log('debug', 'mx-cloud interpreter cleanup completed');
|
||||||
|
}
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`);
|
logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.removePausingListeners();
|
||||||
|
|
||||||
this.debugMessages = [];
|
this.debugMessages = [];
|
||||||
this.interpretationIsPaused = false;
|
this.interpretationIsPaused = false;
|
||||||
this.activeId = null;
|
this.activeId = null;
|
||||||
@@ -815,16 +848,22 @@ export class WorkflowInterpreter {
|
|||||||
this.persistenceRetryCount = 0;
|
this.persistenceRetryCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.persistenceRetryCount < 3) {
|
if (this.persistenceRetryCount < this.MAX_PERSISTENCE_RETRIES) {
|
||||||
this.persistenceBuffer.unshift(...batchToProcess);
|
this.persistenceBuffer.unshift(...batchToProcess);
|
||||||
this.persistenceRetryCount++;
|
this.persistenceRetryCount++;
|
||||||
|
|
||||||
const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000);
|
const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000);
|
||||||
setTimeout(async () => {
|
|
||||||
|
if (this.persistenceRetryTimer) {
|
||||||
|
clearTimeout(this.persistenceRetryTimer);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.persistenceRetryTimer = setTimeout(async () => {
|
||||||
|
this.persistenceRetryTimer = null;
|
||||||
await this.flushPersistenceBuffer();
|
await this.flushPersistenceBuffer();
|
||||||
}, backoffDelay);
|
}, backoffDelay);
|
||||||
|
|
||||||
logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`);
|
logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/${this.MAX_PERSISTENCE_RETRIES} in ${backoffDelay}ms`);
|
||||||
} else {
|
} else {
|
||||||
logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`);
|
logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`);
|
||||||
this.persistenceRetryCount = 0;
|
this.persistenceRetryCount = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user