From 69a6be756f1804f3e6cd2c203833d35277243daf Mon Sep 17 00:00:00 2001 From: karishmas6 Date: Thu, 12 Sep 2024 17:41:49 +0530 Subject: [PATCH] feat: close worker & queue --- .../workflow-management/scheduler/index.ts | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 347f1921..0975b709 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -2,6 +2,7 @@ import { Queue, Worker } from 'bullmq'; import IORedis from 'ioredis'; import { deleteFile, readFile, readFiles, saveFile } from "../storage"; import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserId } from '../../browser-management/controller'; +import { RemoteBrowser } from '../../browser-management/classes/RemoteBrowser'; import logger from '../../logger'; import { browserPool } from "../../server"; import fs from "fs"; @@ -35,12 +36,24 @@ export const worker = new Worker('workflow', async job => { } }, { connection }); -worker.on('completed', (job: any) => { +// Listen for job completion and close worker/queue +worker.on('completed', async (job: any) => { console.log(`Job ${job.id} completed for ${job.data.fileName}_${job.data.runId}`); + + // Gracefully close the worker and queue + await worker.close(); + await workflowQueue.close(); + console.log('Worker and queue have been closed.'); }); -worker.on('failed', (job: any, err) => { +// Listen for job failure and close worker/queue +worker.on('failed', async (job: any, err) => { console.error(`Job ${job.id} failed for ${job.data.fileName}_${job.data.runId}:`, err); + + // Gracefully close the worker and queue + await worker.close(); + await workflowQueue.close(); + console.log('Worker and queue have been closed after failure.'); }); async function runWorkflow(fileName: string, runId: string) { @@ -54,9 +67,6 @@ async function runWorkflow(fileName: string, runId: string) { browser: chromium, launchOptions: { headless: true } }); - logger.log(`debug`, `Created browser with ID: ${browserId}`); - - const run_meta = { status: 'SCHEDULED', name: fileName, @@ -78,8 +88,6 @@ async function runWorkflow(fileName: string, runId: string) { logger.log('debug', `Scheduled run with name: ${fileName}_${runId}.json`); - logger.log('debug', `Active in run : ${getActiveBrowserId()}`); - // Phase 2: Running return await executeRun(fileName, runId); @@ -113,7 +121,6 @@ async function executeRun(fileName: string, runId: string) { // Interpret the run in active browser - logger.log('debug', `Active in exec : ${getActiveBrowserId()}`); const browser = browserPool.getRemoteBrowser(parsedRun.browserId); if (!browser) { throw new Error('Could not access browser');