fix: add partial data recovery
This commit is contained in:
@@ -8,7 +8,7 @@ dotenv.config();
|
|||||||
import { record, workflow, storage, auth, integration, proxy, webhook } from './routes';
|
import { record, workflow, storage, auth, integration, proxy, webhook } from './routes';
|
||||||
import { BrowserPool } from "./browser-management/classes/BrowserPool";
|
import { BrowserPool } from "./browser-management/classes/BrowserPool";
|
||||||
import logger from './logger';
|
import logger from './logger';
|
||||||
import { connectDB, syncDB } from './storage/db'
|
import sequelize, { connectDB, syncDB } from './storage/db'
|
||||||
import cookieParser from 'cookie-parser';
|
import cookieParser from 'cookie-parser';
|
||||||
import { SERVER_PORT } from "./constants/config";
|
import { SERVER_PORT } from "./constants/config";
|
||||||
import { readdirSync } from "fs"
|
import { readdirSync } from "fs"
|
||||||
@@ -22,6 +22,7 @@ import session from 'express-session';
|
|||||||
import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage';
|
import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage';
|
||||||
import { startWorkers } from './pgboss-worker';
|
import { startWorkers } from './pgboss-worker';
|
||||||
import { stopPgBossClient, startPgBossClient } from './storage/pgboss'
|
import { stopPgBossClient, startPgBossClient } from './storage/pgboss'
|
||||||
|
import Run from './models/Run';
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
app.use(cors({
|
app.use(cors({
|
||||||
@@ -38,7 +39,7 @@ const pool = new Pool({
|
|||||||
password: process.env.DB_PASSWORD,
|
password: process.env.DB_PASSWORD,
|
||||||
port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined,
|
port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined,
|
||||||
max: 10,
|
max: 10,
|
||||||
min: 0,
|
min: 0,
|
||||||
idleTimeoutMillis: 30000,
|
idleTimeoutMillis: 30000,
|
||||||
connectionTimeoutMillis: 10000,
|
connectionTimeoutMillis: 10000,
|
||||||
maxUses: 7500,
|
maxUses: 7500,
|
||||||
@@ -83,13 +84,22 @@ const server = http.createServer(app);
|
|||||||
/**
|
/**
|
||||||
* Globally exported singleton instance of socket.io for socket communication with the client.
|
* Globally exported singleton instance of socket.io for socket communication with the client.
|
||||||
*/
|
*/
|
||||||
export let io: Server;
|
export let io = new Server(server, {
|
||||||
|
cleanupEmptyChildNamespaces: true,
|
||||||
|
pingTimeout: 60000,
|
||||||
|
pingInterval: 25000,
|
||||||
|
maxHttpBufferSize: 1e8,
|
||||||
|
transports: ['websocket', 'polling'],
|
||||||
|
allowEIO3: true
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link BrowserPool} globally exported singleton instance for managing browsers.
|
* {@link BrowserPool} globally exported singleton instance for managing browsers.
|
||||||
*/
|
*/
|
||||||
export const browserPool = new BrowserPool();
|
export const browserPool = new BrowserPool();
|
||||||
|
|
||||||
|
export const recentRecoveries = new Map<string, any[]>();
|
||||||
|
|
||||||
app.use(cookieParser())
|
app.use(cookieParser())
|
||||||
|
|
||||||
app.use('/webhook', webhook);
|
app.use('/webhook', webhook);
|
||||||
@@ -139,30 +149,38 @@ app.use((req, res, next) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (require.main === module) {
|
if (require.main === module) {
|
||||||
setInterval(() => {
|
const serverIntervals: NodeJS.Timeout[] = [];
|
||||||
processQueuedRuns();
|
|
||||||
}, 5000);
|
const processQueuedRunsInterval = setInterval(async () => {
|
||||||
}
|
try {
|
||||||
|
await processQueuedRuns();
|
||||||
|
} catch (error: any) {
|
||||||
|
logger.log('error', `Error in processQueuedRuns interval: ${error.message}`);
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
serverIntervals.push(processQueuedRunsInterval);
|
||||||
|
|
||||||
|
const browserPoolCleanupInterval = setInterval(() => {
|
||||||
|
browserPool.cleanupStaleBrowserSlots();
|
||||||
|
}, 60000);
|
||||||
|
serverIntervals.push(browserPoolCleanupInterval);
|
||||||
|
|
||||||
if (require.main === module) {
|
|
||||||
server.listen(SERVER_PORT, '0.0.0.0', async () => {
|
server.listen(SERVER_PORT, '0.0.0.0', async () => {
|
||||||
try {
|
try {
|
||||||
await connectDB();
|
await connectDB();
|
||||||
await syncDB();
|
await syncDB();
|
||||||
|
|
||||||
logger.log('info', 'Cleaning up stale browser slots...');
|
logger.log('info', 'Cleaning up stale browser slots...');
|
||||||
browserPool.cleanupStaleBrowserSlots();
|
browserPool.cleanupStaleBrowserSlots();
|
||||||
|
|
||||||
// Recover orphaned runs from potential crashes
|
|
||||||
await recoverOrphanedRuns();
|
await recoverOrphanedRuns();
|
||||||
// Start pgBoss client for job submission
|
|
||||||
await startPgBossClient();
|
await startPgBossClient();
|
||||||
|
|
||||||
// Start pgBoss workers AFTER recovery is complete
|
|
||||||
await startWorkers();
|
await startWorkers();
|
||||||
|
|
||||||
io = new Server(server);
|
io = new Server(server);
|
||||||
|
|
||||||
io.of('/queued-run').on('connection', (socket) => {
|
io.of('/queued-run').on('connection', (socket) => {
|
||||||
const userId = socket.handshake.query.userId as string;
|
const userId = socket.handshake.query.userId as string;
|
||||||
|
|
||||||
@@ -170,6 +188,15 @@ if (require.main === module) {
|
|||||||
socket.join(`user-${userId}`);
|
socket.join(`user-${userId}`);
|
||||||
logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`);
|
logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`);
|
||||||
|
|
||||||
|
if (recentRecoveries.has(userId)) {
|
||||||
|
const recoveries = recentRecoveries.get(userId)!;
|
||||||
|
recoveries.forEach(recoveryData => {
|
||||||
|
socket.emit('run-recovered', recoveryData);
|
||||||
|
logger.log('info', `Sent stored recovery notification for run: ${recoveryData.runId} to user: ${userId}`);
|
||||||
|
});
|
||||||
|
recentRecoveries.delete(userId);
|
||||||
|
}
|
||||||
|
|
||||||
socket.on('disconnect', () => {
|
socket.on('disconnect', () => {
|
||||||
logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`);
|
logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`);
|
||||||
});
|
});
|
||||||
@@ -178,8 +205,9 @@ if (require.main === module) {
|
|||||||
socket.disconnect();
|
socket.disconnect();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!isProduction) {
|
if (!isProduction) {
|
||||||
|
// Development mode
|
||||||
if (process.platform === 'win32') {
|
if (process.platform === 'win32') {
|
||||||
workerProcess = fork(workerPath, [], {
|
workerProcess = fork(workerPath, [], {
|
||||||
execArgv: ['--inspect=5859'],
|
execArgv: ['--inspect=5859'],
|
||||||
@@ -207,7 +235,7 @@ if (require.main === module) {
|
|||||||
console.log(`Recording worker exited with code: ${code}`);
|
console.log(`Recording worker exited with code: ${code}`);
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// Run in same process for non-Windows
|
// Run in same process for non-Windows development
|
||||||
try {
|
try {
|
||||||
await import('./schedule-worker');
|
await import('./schedule-worker');
|
||||||
await import('./pgboss-worker');
|
await import('./pgboss-worker');
|
||||||
@@ -216,48 +244,177 @@ if (require.main === module) {
|
|||||||
console.error('Failed to start workers in main process:', error);
|
console.error('Failed to start workers in main process:', error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Production mode - run workers in same process for memory sharing
|
||||||
|
try {
|
||||||
|
await import('./schedule-worker');
|
||||||
|
await import('./pgboss-worker');
|
||||||
|
logger.log('info', 'Workers started in main process');
|
||||||
|
} catch (error: any) {
|
||||||
|
logger.log('error', `Failed to start workers: ${error.message}`);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.log('info', `Server listening on port ${SERVER_PORT}`);
|
logger.log('info', `Server listening on port ${SERVER_PORT}`);
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
logger.log('error', `Failed to connect to the database: ${error.message}`);
|
logger.log('error', `Failed to connect to the database: ${error.message}`);
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
process.on('unhandledRejection', (reason, promise) => {
|
|
||||||
logger.log('error', `Unhandled promise rejection at: ${promise}, reason: ${reason}`);
|
|
||||||
console.error('Unhandled promise rejection:', reason);
|
|
||||||
});
|
|
||||||
|
|
||||||
process.on('uncaughtException', (error) => {
|
|
||||||
logger.log('error', `Uncaught exception: ${error.message}`, { stack: error.stack });
|
|
||||||
console.error('Uncaught exception:', error);
|
|
||||||
|
|
||||||
if (process.env.NODE_ENV === 'production') {
|
|
||||||
setTimeout(() => {
|
|
||||||
process.exit(1);
|
|
||||||
}, 5000);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (require.main === module) {
|
|
||||||
process.on('SIGINT', async () => {
|
process.on('SIGINT', async () => {
|
||||||
console.log('Main app shutting down...');
|
console.log('Main app shutting down...');
|
||||||
|
let shutdownSuccessful = true;
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 2000));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
console.log('Closing PostgreSQL connection pool...');
|
const runningBrowsers = browserPool.getAllBrowsers();
|
||||||
await pool.end();
|
|
||||||
console.log('PostgreSQL connection pool closed');
|
for (const [browserId, browser] of runningBrowsers) {
|
||||||
} catch (error) {
|
try {
|
||||||
console.error('Error closing PostgreSQL connection pool:', error);
|
if (browser && browser.interpreter) {
|
||||||
|
const hasData = (browser.interpreter.serializableDataByType?.scrapeSchema?.length > 0) ||
|
||||||
|
(browser.interpreter.serializableDataByType?.scrapeList?.length > 0) ||
|
||||||
|
(browser.interpreter.binaryData?.length > 0);
|
||||||
|
|
||||||
|
if (hasData) {
|
||||||
|
const run = await Run.findOne({ where: { browserId, status: 'running' } });
|
||||||
|
if (run) {
|
||||||
|
const limitedData = {
|
||||||
|
scrapeSchemaOutput: browser.interpreter.serializableDataByType?.scrapeSchema
|
||||||
|
? { "schema-tabular": browser.interpreter.serializableDataByType.scrapeSchema }
|
||||||
|
: {},
|
||||||
|
scrapeListOutput: browser.interpreter.serializableDataByType?.scrapeList || {},
|
||||||
|
binaryOutput: browser.interpreter.binaryData || []
|
||||||
|
};
|
||||||
|
|
||||||
|
const binaryOutputRecord = limitedData.binaryOutput.reduce((acc: Record<string, any>, item: any, index: number) => {
|
||||||
|
acc[`item-${index}`] = item;
|
||||||
|
return acc;
|
||||||
|
}, {});
|
||||||
|
|
||||||
|
await run.update({
|
||||||
|
status: 'failed',
|
||||||
|
finishedAt: new Date().toLocaleString(),
|
||||||
|
log: 'Process interrupted during execution - partial data preserved',
|
||||||
|
serializableOutput: {
|
||||||
|
scrapeSchema: Object.values(limitedData.scrapeSchemaOutput),
|
||||||
|
scrapeList: Object.values(limitedData.scrapeListOutput),
|
||||||
|
},
|
||||||
|
binaryOutput: binaryOutputRecord
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (browserError: any) {
|
||||||
|
shutdownSuccessful = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error: any) {
|
||||||
|
shutdownSuccessful = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isProduction && process.platform === 'win32') {
|
serverIntervals.forEach(clearInterval);
|
||||||
if (workerProcess) workerProcess.kill();
|
|
||||||
if (recordingWorkerProcess) recordingWorkerProcess.kill();
|
try {
|
||||||
|
const allBrowsers = browserPool.getAllBrowsers();
|
||||||
|
for (const [browserId, browser] of allBrowsers) {
|
||||||
|
try {
|
||||||
|
if (browser) {
|
||||||
|
await browser.switchOff();
|
||||||
|
}
|
||||||
|
} catch (browserCleanupError: any) {
|
||||||
|
console.error(`Error shutting down browser ${browserId}:`, browserCleanupError.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error('Error during browser cleanup:', error.message);
|
||||||
}
|
}
|
||||||
process.exit();
|
|
||||||
|
if (!isProduction) {
|
||||||
|
try {
|
||||||
|
if (workerProcess) {
|
||||||
|
workerProcess.kill('SIGTERM');
|
||||||
|
}
|
||||||
|
if (recordingWorkerProcess) {
|
||||||
|
recordingWorkerProcess.kill('SIGTERM');
|
||||||
|
}
|
||||||
|
} catch (workerError: any) {
|
||||||
|
console.error('Error terminating worker processes:', workerError.message);
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
io.close(() => {
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} catch (ioError: any) {
|
||||||
|
shutdownSuccessful = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
server.close((err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
} else {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} catch (serverError: any) {
|
||||||
|
console.error('Error closing HTTP server:', serverError.message);
|
||||||
|
shutdownSuccessful = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await pool.end();
|
||||||
|
} catch (poolError: any) {
|
||||||
|
console.error('Error closing PostgreSQL connection pool:', poolError.message);
|
||||||
|
shutdownSuccessful = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await stopPgBossClient();
|
||||||
|
} catch (pgBossError: any) {
|
||||||
|
console.error('Error closing PgBoss client connection:', pgBossError.message);
|
||||||
|
shutdownSuccessful = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await sequelize.close();
|
||||||
|
} catch (sequelizeError: any) {
|
||||||
|
console.error('Error closing Sequelize connection:', sequelizeError.message);
|
||||||
|
shutdownSuccessful = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Shutdown ${shutdownSuccessful ? 'completed successfully' : 'completed with errors'}`);
|
||||||
|
process.exit(shutdownSuccessful ? 0 : 1);
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
process.on('unhandledRejection', (reason, promise) => {
|
||||||
|
console.error('Unhandled promise rejection:', reason);
|
||||||
|
|
||||||
|
if (process.env.NODE_ENV === 'production') {
|
||||||
|
setTimeout(() => {
|
||||||
|
process.exit(1);
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
process.on('uncaughtException', (error) => {
|
||||||
|
console.error('Uncaught exception:', error);
|
||||||
|
|
||||||
|
if (process.env.NODE_ENV === 'production') {
|
||||||
|
setTimeout(() => {
|
||||||
|
process.exit(1);
|
||||||
|
}, 5000);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user