diff --git a/server/src/server.ts b/server/src/server.ts index 0a107b88..5b729b72 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -1,6 +1,7 @@ import express from 'express'; import path from 'path'; import http from 'http'; +import { Server } from "socket.io"; import cors from 'cors'; import dotenv from 'dotenv'; dotenv.config(); @@ -10,7 +11,6 @@ import logger from './logger'; import { connectDB, syncDB } from './storage/db' import cookieParser from 'cookie-parser'; import { SERVER_PORT } from "./constants/config"; -import { Server } from "socket.io"; import { readdirSync } from "fs" import { fork } from 'child_process'; import { capture } from "./utils/analytics"; @@ -75,9 +75,8 @@ const server = http.createServer(app); /** * Globally exported singleton instance of socket.io for socket communication with the client. - * @type {Server} */ -export const io = new Server(server); +export let io: Server; /** * {@link BrowserPool} globally exported singleton instance for managing browsers. @@ -112,34 +111,6 @@ const recordingWorkerPath = path.resolve(__dirname, isProduction ? './pgboss-wor let workerProcess: any; let recordingWorkerProcess: any; -if (!isProduction) { - workerProcess = fork(workerPath, [], { - execArgv: ['--inspect=5859'], - }); - workerProcess.on('message', (message: any) => { - console.log(`Message from worker: ${message}`); - }); - workerProcess.on('error', (error: any) => { - console.error(`Error in worker: ${error}`); - }); - workerProcess.on('exit', (code: any) => { - console.log(`Worker exited with code: ${code}`); - }); - - recordingWorkerProcess = fork(recordingWorkerPath, [], { - execArgv: ['--inspect=5860'], - }); - recordingWorkerProcess.on('message', (message: any) => { - console.log(`Message from recording worker: ${message}`); - }); - recordingWorkerProcess.on('error', (error: any) => { - console.error(`Error in recording worker: ${error}`); - }); - recordingWorkerProcess.on('exit', (code: any) => { - console.log(`Recording worker exited with code: ${code}`); - }); -} - app.get('/', function (req, res) { capture( 'maxun-oss-server-run', { @@ -160,66 +131,113 @@ app.use((req, res, next) => { next(); }); -io.of('/queued-run').on('connection', (socket) => { - const userId = socket.handshake.query.userId as string; - - if (userId) { - socket.join(`user-${userId}`); - logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`); - - socket.on('disconnect', () => { - logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`); - }); - } else { - logger.log('warn', `Client connected to queued-run namespace without userId: ${socket.id}`); - socket.disconnect(); - } -}); +if (require.main === module) { + setInterval(() => { + processQueuedRuns(); + }, 5000); +} -setInterval(() => { - processQueuedRuns(); -}, 5000); +if (require.main === module) { + server.listen(SERVER_PORT, '0.0.0.0', async () => { + try { + await connectDB(); + await syncDB(); + + io = new Server(server); + + io.of('/queued-run').on('connection', (socket) => { + const userId = socket.handshake.query.userId as string; + if (userId) { + socket.join(`user-${userId}`); + logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`); -server.listen(SERVER_PORT, '0.0.0.0', async () => { - try { - await connectDB(); - await syncDB(); - logger.log('info', `Server listening on port ${SERVER_PORT}`); - } catch (error: any) { - logger.log('error', `Failed to connect to the database: ${error.message}`); - process.exit(1); - } -}); + socket.on('disconnect', () => { + logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`); + }); + } else { + logger.log('warn', `Client connected to queued-run namespace without userId: ${socket.id}`); + socket.disconnect(); + } + }); + + if (!isProduction) { + if (process.platform === 'win32') { + workerProcess = fork(workerPath, [], { + execArgv: ['--inspect=5859'], + }); + workerProcess.on('message', (message: any) => { + console.log(`Message from worker: ${message}`); + }); + workerProcess.on('error', (error: any) => { + console.error(`Error in worker: ${error}`); + }); + workerProcess.on('exit', (code: any) => { + console.log(`Worker exited with code: ${code}`); + }); -process.on('SIGINT', async () => { - console.log('Main app shutting down...'); - try { - await Run.update( - { - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Process interrupted during execution - worker shutdown' - }, - { - where: { status: 'running' } + recordingWorkerProcess = fork(recordingWorkerPath, [], { + execArgv: ['--inspect=5860'], + }); + recordingWorkerProcess.on('message', (message: any) => { + console.log(`Message from recording worker: ${message}`); + }); + recordingWorkerProcess.on('error', (error: any) => { + console.error(`Error in recording worker: ${error}`); + }); + recordingWorkerProcess.on('exit', (code: any) => { + console.log(`Recording worker exited with code: ${code}`); + }); + } else { + // Run in same process for non-Windows + try { + await import('./schedule-worker'); + await import('./pgboss-worker'); + console.log('Workers started in main process for memory sharing'); + } catch (error) { + console.error('Failed to start workers in main process:', error); + } + } } - ); - } catch (error: any) { - console.error('Error updating runs:', error); - } + + logger.log('info', `Server listening on port ${SERVER_PORT}`); + } catch (error: any) { + logger.log('error', `Failed to connect to the database: ${error.message}`); + process.exit(1); + } + }); +} - try { - console.log('Closing PostgreSQL connection pool...'); - await pool.end(); - console.log('PostgreSQL connection pool closed'); - } catch (error) { - console.error('Error closing PostgreSQL connection pool:', error); - } +if (require.main === module) { + process.on('SIGINT', async () => { + console.log('Main app shutting down...'); + try { + await Run.update( + { + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Process interrupted during execution - worker shutdown' + }, + { + where: { status: 'running' } + } + ); + } catch (error: any) { + console.error('Error updating runs:', error); + } - if (!isProduction) { - if (workerProcess) workerProcess.kill(); - if (recordingWorkerProcess) recordingWorkerProcess.kill(); - } - process.exit(); -}); \ No newline at end of file + try { + console.log('Closing PostgreSQL connection pool...'); + await pool.end(); + console.log('PostgreSQL connection pool closed'); + } catch (error) { + console.error('Error closing PostgreSQL connection pool:', error); + } + + if (!isProduction && process.platform === 'win32') { + if (workerProcess) workerProcess.kill(); + if (recordingWorkerProcess) recordingWorkerProcess.kill(); + } + process.exit(); + }); +} \ No newline at end of file