diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 3ce853b1..c7ad154e 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -1,7 +1,3 @@ -/** - * RESTful API endpoints handling the recording storage. -*/ - import { Router } from 'express'; import logger from "../logger"; import { deleteFile, readFile, readFiles, saveFile } from "../workflow-management/storage"; @@ -10,12 +6,12 @@ import { chromium } from "playwright"; import { browserPool } from "../server"; import fs from "fs"; import { uuid } from "uuidv4"; -// import { workflowQueue } from '../workflow-management/scheduler'; import moment from 'moment-timezone'; import cron from 'node-cron'; import { googleSheetUpdateTasks, processGoogleSheetUpdates } from '../workflow-management/integrations/gsheet'; import { getDecryptedProxyConfig } from './proxy'; import { requireSignIn } from '../middlewares/auth'; +import { workflowQueue } from '../worker'; export const router = Router(); @@ -280,16 +276,16 @@ router.put('/schedule/:fileName/', requireSignIn, async (req, res) => { const runId = uuid(); - // await workflowQueue.add( - // 'run workflow', - // { fileName, runId }, - // { - // repeat: { - // pattern: cronExpression, - // tz: timezone - // } - // } - // ); + await workflowQueue.add( + 'run workflow', + { fileName, runId }, + { + repeat: { + pattern: cronExpression, + tz: timezone + } + } + ); res.status(200).json({ message: 'success', diff --git a/server/src/server.ts b/server/src/server.ts index cfd67d09..645c2e9c 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -14,6 +14,7 @@ import csrf from 'csurf'; import { SERVER_PORT } from "./constants/config"; import { Server } from "socket.io"; import { readdirSync } from "fs" +import { fork } from 'child_process'; const csrfProtection = csrf({ cookie: true }) @@ -60,6 +61,17 @@ readdirSync(path.join(__dirname, 'api')).forEach((r) => { } }); +const workerProcess = fork(path.resolve(__dirname, './worker.ts')); +workerProcess.on('message', (message) => { + console.log(`Message from worker: ${message}`); +}); +workerProcess.on('error', (error) => { + console.error(`Error in worker: ${error}`); +}); +workerProcess.on('exit', (code) => { + console.log(`Worker exited with code: ${code}`); +}); + app.get('/', function (req, res) { return res.send('Maxun server started 🚀'); }); @@ -73,3 +85,9 @@ server.listen(SERVER_PORT, async () => { await syncDB(); logger.log('info', `Server listening on port ${SERVER_PORT}`); }); + +process.on('SIGINT', () => { + console.log('Main app shutting down...'); + workerProcess.kill(); + process.exit(); +}); diff --git a/server/src/worker.ts b/server/src/worker.ts new file mode 100644 index 00000000..3afb39c5 --- /dev/null +++ b/server/src/worker.ts @@ -0,0 +1,55 @@ +import { Queue, Worker } from 'bullmq'; +import IORedis from 'ioredis'; +import logger from './logger'; +import { handleRunRecording } from "./workflow-management/scheduler"; + +const connection = new IORedis({ + host: 'localhost', + port: 6379, + maxRetriesPerRequest: null, +}); + +connection.on('connect', () => { + console.log('Connected to Redis!'); +}); + +connection.on('error', (err) => { + console.error('Redis connection error:', err); +}); + +const workflowQueue = new Queue('workflow', { connection }); + +const worker = new Worker('workflow', async job => { + const { fileName, runId } = job.data; + try { + const result = await handleRunRecording(fileName, runId); + return result; + } catch (error) { + logger.error('Error running workflow:', error); + throw error; + } +}, { connection }); + +worker.on('completed', async (job: any) => { + logger.log(`info`, `Job ${job.id} completed for ${job.data.fileName}_${job.data.runId}`); +}); + +worker.on('failed', async (job: any, err) => { + logger.log(`error`, `Job ${job.id} failed for ${job.data.fileName}_${job.data.runId}:`, err); +}); + +console.log('Worker is running...'); + +async function jobCounts() { + const jobCounts = await workflowQueue.getJobCounts(); + console.log('Jobs:', jobCounts); +} + +jobCounts(); + +process.on('SIGINT', () => { + console.log('Worker shutting down...'); + process.exit(); +}); + +export { workflowQueue, worker }; \ No newline at end of file diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 07d338c1..1fcef079 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -2,64 +2,12 @@ import fs from "fs"; import { uuid } from "uuidv4"; import { chromium } from "playwright"; import { io, Socket } from "socket.io-client"; -import { Queue, Worker } from 'bullmq'; -import IORedis from 'ioredis'; import { readFile, saveFile } from "../storage"; import { createRemoteBrowserForRun, destroyRemoteBrowser } from '../../browser-management/controller'; import logger from '../../logger'; import { browserPool } from "../../server"; import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; -const connection = new IORedis({ - host: 'localhost', - port: 6379, - maxRetriesPerRequest: null, -}); - -connection.on('connect', () => { - console.log('Connected to Redis!'); -}); - -connection.on('error', (err) => { - console.error('Redis connection error:', err); -}); - -const workflowQueue = new Queue('workflow', { connection }); - -export const worker = new Worker('workflow', async job => { - const { fileName, runId } = job.data; - try { - const result = await handleRunRecording(fileName, runId); - return result; - } catch (error) { - logger.error('Error running workflow:', error); - throw error; - } -}, { connection }); - -worker.on('completed', async (job: any) => { - logger.log(`info`, `Job ${job.id} completed for ${job.data.fileName}_${job.data.runId}`); - - await worker.close(); - await workflowQueue.close(); - logger.log(`info`, `Worker and queue have been closed.`); -}); - -worker.on('failed', async (job: any, err) => { - logger.log(`error`, `Job ${job.id} failed for ${job.data.fileName}_${job.data.runId}:`, err); - - await worker.close(); - await workflowQueue.close(); - logger.log(`info`, `Worker and queue have been closed after failure.`); -}); - -async function jobCounts() { - const jobCounts = await workflowQueue.getJobCounts(); - console.log('Jobs:', jobCounts); -} - -jobCounts(); - async function runWorkflow(fileName: string, runId: string) { if (!runId) { runId = uuid(); @@ -205,7 +153,7 @@ function resetRecordingState(browserId: string, fileName: string, runId: string) logger.log(`info`, `reset values for ${browserId}, ${fileName}, and ${runId}`); } -async function handleRunRecording(fileName: string, runId: string) { +export async function handleRunRecording(fileName: string, runId: string) { try { const result = await runWorkflow(fileName, runId); const { browserId, runId: newRunId } = result; @@ -237,4 +185,4 @@ function cleanupSocketListeners(socket: Socket, browserId: string, runId: string logger.log('info', `Cleaned up listeners for browserId: ${browserId}, runId: ${runId}`); } -export { workflowQueue, runWorkflow }; \ No newline at end of file +export { runWorkflow }; \ No newline at end of file