From 65cb201474f16529dca2dce82fc9754ce2960023 Mon Sep 17 00:00:00 2001 From: Rohit Date: Mon, 10 Mar 2025 16:02:21 +0530 Subject: [PATCH] feat: start pgboss workers --- server/src/pgboss-worker.ts | 184 +++++++++++++++++------------------- 1 file changed, 88 insertions(+), 96 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 51db3bcf..5d2ea86d 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -9,15 +9,9 @@ import { interpretWholeWorkflow, stopRunningInterpretation } from './browser-management/controller'; -import dotenv from 'dotenv'; -// Load environment variables -dotenv.config(); +const pgBossConnectionString = `postgres://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`; -// Define connection string -const pgBossConnectionString = 'postgres://postgres:admin1234@localhost:5432/maxun'; - -// Define interfaces for job data structures interface InitializeBrowserData { userId: string; } @@ -26,22 +20,8 @@ interface DestroyBrowserData { browserId: string; } -// Initialize pg-boss instance const pgBoss = new PgBoss({connectionString: pgBossConnectionString, schema: 'public'}); -// Start pg-boss -pgBoss.start() - .then(() => { - logger.log('info', 'Recording worker started successfully'); - - // Register all workers - registerWorkers(); - }) - .catch((error: Error) => { - logger.log('error', `Failed to start recording worker: ${error.message}`); - process.exit(1); - }); - /** * Extract data safely from a job (single job or job array) */ @@ -56,87 +36,99 @@ function extractJobData(job: Job | Job[]): T { } /** - * Register all browser operation workers + * Initialize PgBoss and register all workers */ -function registerWorkers(): void { - // Worker for initializing browser recording - pgBoss.work('initialize-browser-recording', async (job: Job | Job[]) => { - try { - const data = extractJobData(job); - const userId = data.userId; - - logger.log('info', `Starting browser initialization job for user: ${userId}`); - const browserId = initializeRemoteBrowserForRecording(userId); - logger.log('info', `Browser recording job completed with browserId: ${browserId}`); - return { browserId }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Browser recording job failed: ${errorMessage}`); - throw error; - } - }); - - // Worker for stopping a browser - pgBoss.work('destroy-browser', async (job: Job | Job[]) => { - try { - const data = extractJobData(job); - const browserId = data.browserId; - - logger.log('info', `Starting browser destruction job for browser: ${browserId}`); - const success = await destroyRemoteBrowser(browserId); - logger.log('info', `Browser destruction job completed with result: ${success}`); - return { success }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Destroy browser job failed: ${errorMessage}`); - throw error; - } - }); - - // Worker for interpreting workflow - pgBoss.work('interpret-workflow', async () => { - try { - logger.log('info', 'Starting workflow interpretation job'); - await interpretWholeWorkflow(); - logger.log('info', 'Workflow interpretation job completed'); - return { success: true }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Interpret workflow job failed: ${errorMessage}`); - throw error; - } - }); - - // Worker for stopping workflow interpretation - pgBoss.work('stop-interpretation', async () => { - try { - logger.log('info', 'Starting stop interpretation job'); - await stopRunningInterpretation(); - logger.log('info', 'Stop interpretation job completed'); - return { success: true }; - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Stop interpretation job failed: ${errorMessage}`); - throw error; - } - }); - - logger.log('info', 'All recording workers registered successfully'); -} - -// Handle shutdown -process.on('SIGINT', async () => { - logger.log('info', 'Recording worker shutting down...'); - +async function startWorkers() { try { - await pgBoss.stop(); - logger.log('info', 'PgBoss stopped gracefully'); + logger.log('info', 'Starting PgBoss worker...'); + await pgBoss.start(); + logger.log('info', 'PgBoss worker started successfully'); + + // Worker for initializing browser recording + await pgBoss.work('initialize-browser-recording', async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const userId = data.userId; + + logger.log('info', `Starting browser initialization job for user: ${userId}`); + const browserId = initializeRemoteBrowserForRecording(userId); + logger.log('info', `Browser recording job completed with browserId: ${browserId}`); + return { browserId }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Browser recording job failed: ${errorMessage}`); + throw error; + } + }); + + // Worker for stopping a browser + await pgBoss.work('destroy-browser', async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const browserId = data.browserId; + + logger.log('info', `Starting browser destruction job for browser: ${browserId}`); + const success = await destroyRemoteBrowser(browserId); + logger.log('info', `Browser destruction job completed with result: ${success}`); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Destroy browser job failed: ${errorMessage}`); + throw error; + } + }); + + // Worker for interpreting workflow + await pgBoss.work('interpret-workflow', async () => { + try { + logger.log('info', 'Starting workflow interpretation job'); + await interpretWholeWorkflow(); + logger.log('info', 'Workflow interpretation job completed'); + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Interpret workflow job failed: ${errorMessage}`); + throw error; + } + }); + + // Worker for stopping workflow interpretation + await pgBoss.work('stop-interpretation', async () => { + try { + logger.log('info', 'Starting stop interpretation job'); + await stopRunningInterpretation(); + logger.log('info', 'Stop interpretation job completed'); + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Stop interpretation job failed: ${errorMessage}`); + throw error; + } + }); + + logger.log('info', 'All recording workers registered successfully'); } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Error stopping PgBoss: ${errorMessage}`); + logger.log('error', `Failed to start PgBoss workers: ${errorMessage}`); + process.exit(1); } - +} + +// Start all workers +startWorkers(); + +// Handle graceful shutdown +process.on('SIGTERM', async () => { + logger.log('info', 'SIGTERM received, shutting down PgBoss...'); + await pgBoss.stop(); process.exit(0); }); +process.on('SIGINT', async () => { + logger.log('info', 'SIGINT received, shutting down PgBoss...'); + await pgBoss.stop(); + process.exit(0); +}); + +// For use in other files export { pgBoss }; \ No newline at end of file