diff --git a/server/src/server.ts b/server/src/server.ts index 4a33b67b..88dc74c5 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -8,7 +8,7 @@ dotenv.config(); import { record, workflow, storage, auth, integration, proxy, webhook } from './routes'; import { BrowserPool } from "./browser-management/classes/BrowserPool"; import logger from './logger'; -import { connectDB, syncDB } from './storage/db' +import sequelize, { connectDB, syncDB } from './storage/db' import cookieParser from 'cookie-parser'; import { SERVER_PORT } from "./constants/config"; import { readdirSync } from "fs" @@ -22,6 +22,7 @@ import session from 'express-session'; import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage'; import { startWorkers } from './pgboss-worker'; import { stopPgBossClient, startPgBossClient } from './storage/pgboss' +import Run from './models/Run'; const app = express(); app.use(cors({ @@ -38,7 +39,7 @@ const pool = new Pool({ password: process.env.DB_PASSWORD, port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined, max: 10, - min: 0, + min: 0, idleTimeoutMillis: 30000, connectionTimeoutMillis: 10000, maxUses: 7500, @@ -83,13 +84,22 @@ const server = http.createServer(app); /** * Globally exported singleton instance of socket.io for socket communication with the client. */ -export let io: Server; +export let io = new Server(server, { + cleanupEmptyChildNamespaces: true, + pingTimeout: 60000, + pingInterval: 25000, + maxHttpBufferSize: 1e8, + transports: ['websocket', 'polling'], + allowEIO3: true +}); /** * {@link BrowserPool} globally exported singleton instance for managing browsers. */ export const browserPool = new BrowserPool(); +export const recentRecoveries = new Map(); + app.use(cookieParser()) app.use('/webhook', webhook); @@ -139,30 +149,38 @@ app.use((req, res, next) => { }); if (require.main === module) { - setInterval(() => { - processQueuedRuns(); - }, 5000); -} + const serverIntervals: NodeJS.Timeout[] = []; + + const processQueuedRunsInterval = setInterval(async () => { + try { + await processQueuedRuns(); + } catch (error: any) { + logger.log('error', `Error in processQueuedRuns interval: ${error.message}`); + } + }, 5000); + serverIntervals.push(processQueuedRunsInterval); + + const browserPoolCleanupInterval = setInterval(() => { + browserPool.cleanupStaleBrowserSlots(); + }, 60000); + serverIntervals.push(browserPoolCleanupInterval); -if (require.main === module) { server.listen(SERVER_PORT, '0.0.0.0', async () => { try { await connectDB(); await syncDB(); - + logger.log('info', 'Cleaning up stale browser slots...'); browserPool.cleanupStaleBrowserSlots(); - // Recover orphaned runs from potential crashes await recoverOrphanedRuns(); - // Start pgBoss client for job submission + await startPgBossClient(); - // Start pgBoss workers AFTER recovery is complete await startWorkers(); - + io = new Server(server); - + io.of('/queued-run').on('connection', (socket) => { const userId = socket.handshake.query.userId as string; @@ -170,6 +188,15 @@ if (require.main === module) { socket.join(`user-${userId}`); logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`); + if (recentRecoveries.has(userId)) { + const recoveries = recentRecoveries.get(userId)!; + recoveries.forEach(recoveryData => { + socket.emit('run-recovered', recoveryData); + logger.log('info', `Sent stored recovery notification for run: ${recoveryData.runId} to user: ${userId}`); + }); + recentRecoveries.delete(userId); + } + socket.on('disconnect', () => { logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`); }); @@ -178,8 +205,9 @@ if (require.main === module) { socket.disconnect(); } }); - + if (!isProduction) { + // Development mode if (process.platform === 'win32') { workerProcess = fork(workerPath, [], { execArgv: ['--inspect=5859'], @@ -207,7 +235,7 @@ if (require.main === module) { console.log(`Recording worker exited with code: ${code}`); }); } else { - // Run in same process for non-Windows + // Run in same process for non-Windows development try { await import('./schedule-worker'); await import('./pgboss-worker'); @@ -216,48 +244,177 @@ if (require.main === module) { console.error('Failed to start workers in main process:', error); } } + } else { + // Production mode - run workers in same process for memory sharing + try { + await import('./schedule-worker'); + await import('./pgboss-worker'); + logger.log('info', 'Workers started in main process'); + } catch (error: any) { + logger.log('error', `Failed to start workers: ${error.message}`); + process.exit(1); + } } - - logger.log('info', `Server listening on port ${SERVER_PORT}`); + + logger.log('info', `Server listening on port ${SERVER_PORT}`); } catch (error: any) { logger.log('error', `Failed to connect to the database: ${error.message}`); process.exit(1); } }); -} -process.on('unhandledRejection', (reason, promise) => { - logger.log('error', `Unhandled promise rejection at: ${promise}, reason: ${reason}`); - console.error('Unhandled promise rejection:', reason); -}); - -process.on('uncaughtException', (error) => { - logger.log('error', `Uncaught exception: ${error.message}`, { stack: error.stack }); - console.error('Uncaught exception:', error); - - if (process.env.NODE_ENV === 'production') { - setTimeout(() => { - process.exit(1); - }, 5000); - } -}); - -if (require.main === module) { process.on('SIGINT', async () => { console.log('Main app shutting down...'); + let shutdownSuccessful = true; + + await new Promise(resolve => setTimeout(resolve, 2000)); try { - console.log('Closing PostgreSQL connection pool...'); - await pool.end(); - console.log('PostgreSQL connection pool closed'); - } catch (error) { - console.error('Error closing PostgreSQL connection pool:', error); + const runningBrowsers = browserPool.getAllBrowsers(); + + for (const [browserId, browser] of runningBrowsers) { + try { + if (browser && browser.interpreter) { + const hasData = (browser.interpreter.serializableDataByType?.scrapeSchema?.length > 0) || + (browser.interpreter.serializableDataByType?.scrapeList?.length > 0) || + (browser.interpreter.binaryData?.length > 0); + + if (hasData) { + const run = await Run.findOne({ where: { browserId, status: 'running' } }); + if (run) { + const limitedData = { + scrapeSchemaOutput: browser.interpreter.serializableDataByType?.scrapeSchema + ? { "schema-tabular": browser.interpreter.serializableDataByType.scrapeSchema } + : {}, + scrapeListOutput: browser.interpreter.serializableDataByType?.scrapeList || {}, + binaryOutput: browser.interpreter.binaryData || [] + }; + + const binaryOutputRecord = limitedData.binaryOutput.reduce((acc: Record, item: any, index: number) => { + acc[`item-${index}`] = item; + return acc; + }, {}); + + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: 'Process interrupted during execution - partial data preserved', + serializableOutput: { + scrapeSchema: Object.values(limitedData.scrapeSchemaOutput), + scrapeList: Object.values(limitedData.scrapeListOutput), + }, + binaryOutput: binaryOutputRecord + }); + } + } + } + } catch (browserError: any) { + shutdownSuccessful = false; + } + } + } catch (error: any) { + shutdownSuccessful = false; } - if (!isProduction && process.platform === 'win32') { - if (workerProcess) workerProcess.kill(); - if (recordingWorkerProcess) recordingWorkerProcess.kill(); + serverIntervals.forEach(clearInterval); + + try { + const allBrowsers = browserPool.getAllBrowsers(); + for (const [browserId, browser] of allBrowsers) { + try { + if (browser) { + await browser.switchOff(); + } + } catch (browserCleanupError: any) { + console.error(`Error shutting down browser ${browserId}:`, browserCleanupError.message); + } + } + } catch (error: any) { + console.error('Error during browser cleanup:', error.message); } - process.exit(); + + if (!isProduction) { + try { + if (workerProcess) { + workerProcess.kill('SIGTERM'); + } + if (recordingWorkerProcess) { + recordingWorkerProcess.kill('SIGTERM'); + } + } catch (workerError: any) { + console.error('Error terminating worker processes:', workerError.message); + } + + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + try { + await new Promise((resolve) => { + io.close(() => { + resolve(); + }); + }); + } catch (ioError: any) { + shutdownSuccessful = false; + } + + try { + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } catch (serverError: any) { + console.error('Error closing HTTP server:', serverError.message); + shutdownSuccessful = false; + } + + try { + await pool.end(); + } catch (poolError: any) { + console.error('Error closing PostgreSQL connection pool:', poolError.message); + shutdownSuccessful = false; + } + + try { + await stopPgBossClient(); + } catch (pgBossError: any) { + console.error('Error closing PgBoss client connection:', pgBossError.message); + shutdownSuccessful = false; + } + + try { + await sequelize.close(); + } catch (sequelizeError: any) { + console.error('Error closing Sequelize connection:', sequelizeError.message); + shutdownSuccessful = false; + } + + console.log(`Shutdown ${shutdownSuccessful ? 'completed successfully' : 'completed with errors'}`); + process.exit(shutdownSuccessful ? 0 : 1); }); -} \ No newline at end of file + + process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled promise rejection:', reason); + + if (process.env.NODE_ENV === 'production') { + setTimeout(() => { + process.exit(1); + }, 1000); + } + }); + + process.on('uncaughtException', (error) => { + console.error('Uncaught exception:', error); + + if (process.env.NODE_ENV === 'production') { + setTimeout(() => { + process.exit(1); + }, 5000); + } + }); +}