feat: start pgboss workers

This commit is contained in:
Rohit
2025-03-10 16:02:21 +05:30
parent cc12f19aa0
commit 65cb201474

View File

@@ -9,15 +9,9 @@ import {
interpretWholeWorkflow, interpretWholeWorkflow,
stopRunningInterpretation stopRunningInterpretation
} from './browser-management/controller'; } from './browser-management/controller';
import dotenv from 'dotenv';
// Load environment variables const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`;
dotenv.config();
// Define connection string
const pgBossConnectionString = 'postgres://postgres:admin1234@localhost:5432/maxun';
// Define interfaces for job data structures
interface InitializeBrowserData { interface InitializeBrowserData {
userId: string; userId: string;
} }
@@ -26,22 +20,8 @@ interface DestroyBrowserData {
browserId: string; browserId: string;
} }
// Initialize pg-boss instance
const pgBoss = new PgBoss({connectionString: pgBossConnectionString, schema: 'public'}); const pgBoss = new PgBoss({connectionString: pgBossConnectionString, schema: 'public'});
// Start pg-boss
pgBoss.start()
.then(() => {
logger.log('info', 'Recording worker started successfully');
// Register all workers
registerWorkers();
})
.catch((error: Error) => {
logger.log('error', `Failed to start recording worker: ${error.message}`);
process.exit(1);
});
/** /**
* Extract data safely from a job (single job or job array) * Extract data safely from a job (single job or job array)
*/ */
@@ -56,11 +36,16 @@ function extractJobData<T>(job: Job<T> | Job<T>[]): T {
} }
/** /**
* Register all browser operation workers * Initialize PgBoss and register all workers
*/ */
function registerWorkers(): void { async function startWorkers() {
try {
logger.log('info', 'Starting PgBoss worker...');
await pgBoss.start();
logger.log('info', 'PgBoss worker started successfully');
// Worker for initializing browser recording // Worker for initializing browser recording
pgBoss.work('initialize-browser-recording', async (job: Job<InitializeBrowserData> | Job<InitializeBrowserData>[]) => { await pgBoss.work('initialize-browser-recording', async (job: Job<InitializeBrowserData> | Job<InitializeBrowserData>[]) => {
try { try {
const data = extractJobData(job); const data = extractJobData(job);
const userId = data.userId; const userId = data.userId;
@@ -77,7 +62,7 @@ function registerWorkers(): void {
}); });
// Worker for stopping a browser // Worker for stopping a browser
pgBoss.work('destroy-browser', async (job: Job<DestroyBrowserData> | Job<DestroyBrowserData>[]) => { await pgBoss.work('destroy-browser', async (job: Job<DestroyBrowserData> | Job<DestroyBrowserData>[]) => {
try { try {
const data = extractJobData(job); const data = extractJobData(job);
const browserId = data.browserId; const browserId = data.browserId;
@@ -94,7 +79,7 @@ function registerWorkers(): void {
}); });
// Worker for interpreting workflow // Worker for interpreting workflow
pgBoss.work('interpret-workflow', async () => { await pgBoss.work('interpret-workflow', async () => {
try { try {
logger.log('info', 'Starting workflow interpretation job'); logger.log('info', 'Starting workflow interpretation job');
await interpretWholeWorkflow(); await interpretWholeWorkflow();
@@ -108,7 +93,7 @@ function registerWorkers(): void {
}); });
// Worker for stopping workflow interpretation // Worker for stopping workflow interpretation
pgBoss.work('stop-interpretation', async () => { await pgBoss.work('stop-interpretation', async () => {
try { try {
logger.log('info', 'Starting stop interpretation job'); logger.log('info', 'Starting stop interpretation job');
await stopRunningInterpretation(); await stopRunningInterpretation();
@@ -122,21 +107,28 @@ function registerWorkers(): void {
}); });
logger.log('info', 'All recording workers registered successfully'); logger.log('info', 'All recording workers registered successfully');
}
// Handle shutdown
process.on('SIGINT', async () => {
logger.log('info', 'Recording worker shutting down...');
try {
await pgBoss.stop();
logger.log('info', 'PgBoss stopped gracefully');
} catch (error: unknown) { } catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error); const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Error stopping PgBoss: ${errorMessage}`); logger.log('error', `Failed to start PgBoss workers: ${errorMessage}`);
process.exit(1);
}
} }
// Start all workers
startWorkers();
// Handle graceful shutdown
process.on('SIGTERM', async () => {
logger.log('info', 'SIGTERM received, shutting down PgBoss...');
await pgBoss.stop();
process.exit(0); process.exit(0);
}); });
process.on('SIGINT', async () => {
logger.log('info', 'SIGINT received, shutting down PgBoss...');
await pgBoss.stop();
process.exit(0);
});
// For use in other files
export { pgBoss }; export { pgBoss };