From de76e27a4a3512674720d628d46195399265086f Mon Sep 17 00:00:00 2001 From: karishmas6 Date: Thu, 12 Sep 2024 21:01:46 +0530 Subject: [PATCH] feat: handle recording --- .../workflow-management/scheduler/index.ts | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 2f81e7a2..d2e7745a 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -4,10 +4,12 @@ import { deleteFile, readFile, readFiles, saveFile } from "../storage"; import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserId } from '../../browser-management/controller'; import { RemoteBrowser } from '../../browser-management/classes/RemoteBrowser'; import logger from '../../logger'; -import { browserPool, io } from "../../server"; +import { browserPool } from "../../server"; import fs from "fs"; import { uuid } from "uuidv4"; import { chromium } from "playwright"; +import { io, Socket } from "socket.io-client"; + const connection = new IORedis({ host: 'localhost', @@ -28,7 +30,7 @@ const workflowQueue = new Queue('workflow', { connection }); export const worker = new Worker('workflow', async job => { const { fileName, runId } = job.data; try { - const result = await runWorkflow(fileName, runId); + const result = await handleRunRecording(fileName, runId); return result; } catch (error) { console.error('Error running workflow:', error); @@ -56,6 +58,9 @@ worker.on('failed', async (job: any, err) => { console.log('Worker and queue have been closed after failure.'); }); +const existingJobs = workflowQueue.getRepeatableJobs(); +logger.log(`info`, `jobs ${existingJobs}`) + async function runWorkflow(fileName: string, runId: string) { if (!runId) { runId = uuid(); @@ -87,6 +92,11 @@ async function runWorkflow(fileName: string, runId: string) { logger.log('debug', `Scheduled run with name: ${fileName}_${runId}.json`); + return { + browserId, + runId + } + } catch (e) { const { message } = e as Error; logger.log('info', `Error while scheduling a run with name: ${fileName}_${runId}.json`); @@ -169,4 +179,42 @@ async function executeRun(fileName: string, runId: string) { return false; } } + + +async function handleRunRecording(fileName: string, runId: string) { + try { + const result = await runWorkflow(fileName, runId); + const { browserId, runId: newRunId } = result; + + // Type guard to ensure browserId and newRunId are defined + if (!browserId || !newRunId) { + throw new Error('browserId or runId is undefined'); + } + + // Initialize socket connection + const socket = io(`http://localhost:8080/${browserId}`, { + transports: ['websocket'], + rejectUnauthorized: false + }); + + socket.on('ready-for-run', () => readyForRunHandler(browserId, fileName, newRunId)); + + // Log or notify that the recording is running + logger.log('info', `Running recording: ${fileName}`); + + // Cleanup should only happen after the run is completed + socket.on('disconnect', () => { + cleanupSocketListeners(socket, browserId, newRunId); + }); + + } catch (error: any) { + console.error('Error running recording:', error); + } +} + +function cleanupSocketListeners(socket: Socket, browserId: string, runId: string) { + socket.off('ready-for-run', () => readyForRunHandler(browserId, '', runId)); + logger.log('info', `Cleaned up listeners for browserId: ${browserId}, runId: ${runId}`); +} + export { workflowQueue, runWorkflow }; \ No newline at end of file