From f3964ceffa4a1e26338e981ec513dbc2ab4434a1 Mon Sep 17 00:00:00 2001 From: karishmas6 Date: Mon, 7 Oct 2024 01:16:49 +0530 Subject: [PATCH] feat: move bullmq worker --- server/src/worker.ts | 61 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 server/src/worker.ts diff --git a/server/src/worker.ts b/server/src/worker.ts new file mode 100644 index 00000000..da923763 --- /dev/null +++ b/server/src/worker.ts @@ -0,0 +1,61 @@ +import fs from "fs"; +import { uuid } from "uuidv4"; +import { chromium } from "playwright"; +import { io, Socket } from "socket.io-client"; +import { Queue, Worker } from 'bullmq'; +import IORedis from 'ioredis'; +import { readFile, saveFile } from "../storage"; +import { createRemoteBrowserForRun, destroyRemoteBrowser } from '../../browser-management/controller'; +import logger from '../../logger'; +import { browserPool } from "../../server"; +import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integrations/gsheet"; + +const connection = new IORedis({ + host: 'localhost', + port: 6379, + maxRetriesPerRequest: null, +}); + +connection.on('connect', () => { + console.log('Connected to Redis!'); +}); + +connection.on('error', (err) => { + console.error('Redis connection error:', err); +}); + +const workflowQueue = new Queue('workflow', { connection }); + +export const worker = new Worker('workflow', async job => { + const { fileName, runId } = job.data; + try { + const result = await handleRunRecording(fileName, runId); + return result; + } catch (error) { + logger.error('Error running workflow:', error); + throw error; + } +}, { connection }); + +worker.on('completed', async (job: any) => { + logger.log(`info`, `Job ${job.id} completed for ${job.data.fileName}_${job.data.runId}`); + + await worker.close(); + await workflowQueue.close(); + logger.log(`info`, `Worker and queue have been closed.`); +}); + +worker.on('failed', async (job: any, err) => { + logger.log(`error`, `Job ${job.id} failed for ${job.data.fileName}_${job.data.runId}:`, err); + + await worker.close(); + await workflowQueue.close(); + logger.log(`info`, `Worker and queue have been closed after failure.`); +}); + +async function jobCounts() { + const jobCounts = await workflowQueue.getJobCounts(); + console.log('Jobs:', jobCounts); +} + +jobCounts(); \ No newline at end of file