diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts new file mode 100644 index 00000000..51db3bcf --- /dev/null +++ b/server/src/pgboss-worker.ts @@ -0,0 +1,142 @@ +/** + * Recording worker using PgBoss for asynchronous browser recording operations + */ +import PgBoss, { Job } from 'pg-boss'; +import logger from './logger'; +import { + initializeRemoteBrowserForRecording, + destroyRemoteBrowser, + interpretWholeWorkflow, + stopRunningInterpretation +} from './browser-management/controller'; +import dotenv from 'dotenv'; + +// Load environment variables +dotenv.config(); + +// Define connection string +const pgBossConnectionString = 'postgres://postgres:admin1234@localhost:5432/maxun'; + +// Define interfaces for job data structures +interface InitializeBrowserData { + userId: string; +} + +interface DestroyBrowserData { + browserId: string; +} + +// Initialize pg-boss instance +const pgBoss = new PgBoss({connectionString: pgBossConnectionString, schema: 'public'}); + +// Start pg-boss +pgBoss.start() + .then(() => { + logger.log('info', 'Recording worker started successfully'); + + // Register all workers + registerWorkers(); + }) + .catch((error: Error) => { + logger.log('error', `Failed to start recording worker: ${error.message}`); + process.exit(1); + }); + +/** + * Extract data safely from a job (single job or job array) + */ +function extractJobData(job: Job | Job[]): T { + if (Array.isArray(job)) { + if (job.length === 0) { + throw new Error('Empty job array received'); + } + return job[0].data; + } + return job.data; +} + +/** + * Register all browser operation workers + */ +function registerWorkers(): void { + // Worker for initializing browser recording + pgBoss.work('initialize-browser-recording', async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const userId = data.userId; + + logger.log('info', `Starting browser initialization job for user: ${userId}`); + const browserId = initializeRemoteBrowserForRecording(userId); + logger.log('info', `Browser recording job completed with browserId: ${browserId}`); + return { browserId }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Browser recording job failed: ${errorMessage}`); + throw error; + } + }); + + // Worker for stopping a browser + pgBoss.work('destroy-browser', async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const browserId = data.browserId; + + logger.log('info', `Starting browser destruction job for browser: ${browserId}`); + const success = await destroyRemoteBrowser(browserId); + logger.log('info', `Browser destruction job completed with result: ${success}`); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Destroy browser job failed: ${errorMessage}`); + throw error; + } + }); + + // Worker for interpreting workflow + pgBoss.work('interpret-workflow', async () => { + try { + logger.log('info', 'Starting workflow interpretation job'); + await interpretWholeWorkflow(); + logger.log('info', 'Workflow interpretation job completed'); + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Interpret workflow job failed: ${errorMessage}`); + throw error; + } + }); + + // Worker for stopping workflow interpretation + pgBoss.work('stop-interpretation', async () => { + try { + logger.log('info', 'Starting stop interpretation job'); + await stopRunningInterpretation(); + logger.log('info', 'Stop interpretation job completed'); + return { success: true }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Stop interpretation job failed: ${errorMessage}`); + throw error; + } + }); + + logger.log('info', 'All recording workers registered successfully'); +} + +// Handle shutdown +process.on('SIGINT', async () => { + logger.log('info', 'Recording worker shutting down...'); + + try { + await pgBoss.stop(); + logger.log('info', 'PgBoss stopped gracefully'); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Error stopping PgBoss: ${errorMessage}`); + } + + process.exit(0); +}); + +export { pgBoss }; \ No newline at end of file