feat: add promise timeout, socket cleanup

This commit is contained in:
Rohit Rajan
2025-11-29 14:40:59 +05:30
parent 2a1d461357
commit 762654395b
5 changed files with 640 additions and 1383 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -31,22 +31,50 @@ export const initializeRemoteBrowserForRecording = (userId: string, mode: string
if (activeId) {
const remoteBrowser = browserPool.getRemoteBrowser(activeId);
remoteBrowser?.updateSocket(socket);
await remoteBrowser?.makeAndEmitScreenshot();
if (remoteBrowser?.isDOMStreamingActive) {
remoteBrowser?.makeAndEmitDOMSnapshot();
}
} else {
const browserSession = new RemoteBrowser(socket, userId, id);
browserSession.interpreter.subscribeToPausing();
await browserSession.initialize(userId);
await browserSession.registerEditorEvents();
if (mode === "dom") {
await browserSession.subscribeToDOM();
logger.info('DOM streaming started for scraping browser in recording mode');
} else {
await browserSession.subscribeToScreencast();
logger.info('Screenshot streaming started for local browser in recording mode');
}
browserPool.addRemoteBrowser(id, browserSession, userId, false, "recording");
try {
await browserSession.initialize(userId);
await browserSession.registerEditorEvents();
await browserSession.subscribeToDOM();
logger.info('DOM streaming started for remote browser in recording mode');
browserPool.addRemoteBrowser(id, browserSession, userId, false, "recording");
} catch (initError: any) {
logger.error(`Failed to initialize browser for recording: ${initError.message}`);
logger.info('Sending browser failure notification to frontend');
socket.emit('dom-mode-error', {
userId: userId,
error: 'Failed to start the browser, please try again in some time.'
});
socket.emit('error', {
userId: userId,
message: 'Failed to start the browser, please try again in some time.',
details: initError.message
});
await new Promise(resolve => setTimeout(resolve, 100));
try {
await browserSession.switchOff();
logger.debug('Cleaned up failed browser session');
} catch (cleanupError: any) {
logger.warn(`Failed to cleanup browser session: ${cleanupError.message}`);
}
logger.info('Browser initialization failed, user notified');
return id;
}
}
socket.emit('loaded');
});
@@ -69,7 +97,7 @@ export const createRemoteBrowserForRun = (userId: string): string => {
const id = uuid();
const slotReserved = browserPool.reserveBrowserSlot(id, userId, "run");
const slotReserved = browserPool.reserveBrowserSlotAtomic(id, userId, "run");
if (!slotReserved) {
logger.log('warn', `Cannot create browser for user ${userId}: no available slots`);
throw new Error('User has reached maximum browser limit');
@@ -94,45 +122,78 @@ export const createRemoteBrowserForRun = (userId: string): string => {
* @category BrowserManagement-Controller
*/
export const destroyRemoteBrowser = async (id: string, userId: string): Promise<boolean> => {
const DESTROY_TIMEOUT = 30000;
const destroyPromise = (async () => {
try {
const browserSession = browserPool.getRemoteBrowser(id);
if (!browserSession) {
logger.log('info', `Browser with id: ${id} not found, may have already been destroyed`);
return true;
}
logger.log('debug', `Switching off the browser with id: ${id}`);
try {
await browserSession.switchOff();
} catch (switchOffError) {
logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`);
}
try {
const namespace = io.of(id);
const sockets = await namespace.fetchSockets();
for (const socket of sockets) {
socket.disconnect(true);
}
namespace.removeAllListeners();
await new Promise(resolve => setTimeout(resolve, 100));
const nsps = (io as any)._nsps;
if (nsps && nsps.has(`/${id}`)) {
const ns = nsps.get(`/${id}`);
if (ns && ns.sockets && ns.sockets.size === 0) {
nsps.delete(`/${id}`);
logger.log('debug', `Deleted empty namespace /${id} from io._nsps Map`);
} else {
logger.log('warn', `Namespace /${id} still has ${ns?.sockets?.size || 0} sockets, skipping manual deletion`);
}
}
logger.log('debug', `Cleaned up socket namespace for browser ${id}`);
} catch (namespaceCleanupError: any) {
logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`);
}
return browserPool.deleteRemoteBrowser(id);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to destroy browser ${id}: ${errorMessage}`);
try {
return browserPool.deleteRemoteBrowser(id);
} catch (deleteError) {
logger.log('error', `Failed to delete browser ${id} from pool: ${deleteError}`);
return false;
}
}
})();
try {
const browserSession = browserPool.getRemoteBrowser(id);
if (!browserSession) {
logger.log('info', `Browser with id: ${id} not found, may have already been destroyed`);
return true;
}
logger.log('debug', `Switching off the browser with id: ${id}`);
try {
await browserSession.stopCurrentInterpretation();
} catch (stopError) {
logger.log('warn', `Error stopping interpretation for browser ${id}: ${stopError}`);
}
try {
await browserSession.switchOff();
} catch (switchOffError) {
logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`);
}
const timeoutPromise = new Promise<boolean>((_, reject) =>
setTimeout(() => reject(new Error(`Browser destruction timed out after ${DESTROY_TIMEOUT}ms`)), DESTROY_TIMEOUT)
);
try {
const namespace = io.of(id);
namespace.removeAllListeners();
namespace.disconnectSockets(true);
logger.log('debug', `Cleaned up socket namespace for browser ${id}`);
} catch (namespaceCleanupError: any) {
logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`);
}
return browserPool.deleteRemoteBrowser(id);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Failed to destroy browser ${id}: ${errorMessage}`);
return await Promise.race([destroyPromise, timeoutPromise]);
} catch (timeoutError: any) {
logger.log('error', `Browser ${id} destruction timeout: ${timeoutError.message} - force removing from pool`);
try {
return browserPool.deleteRemoteBrowser(id);
} catch (deleteError) {
logger.log('error', `Failed to delete browser ${id} from pool: ${deleteError}`);
logger.log('error', `Failed to force delete browser ${id} after timeout: ${deleteError}`);
return false;
}
}
@@ -229,8 +290,8 @@ export const interpretWholeWorkflow = async (userId: string) => {
export const stopRunningInterpretation = async (userId: string) => {
const id = getActiveBrowserIdByState(userId, "recording");
if (id) {
const browser = browserPool.getRemoteBrowser(id);
await browser?.stopCurrentInterpretation();
const browserSession = browserPool.getRemoteBrowser(id);
await browserSession?.switchOff();
} else {
logger.log('error', 'Cannot stop interpretation: No active browser or generator.');
}
@@ -264,7 +325,31 @@ const initializeBrowserAsync = async (id: string, userId: string) => {
browserPool.failBrowserSlot(id);
});
const socket = await waitForConnection;
const connectWithRetry = async (maxRetries: number = 3): Promise<Socket | null> => {
let retryCount = 0;
while (retryCount < maxRetries) {
try {
const socket = await waitForConnection;
if (socket || retryCount === maxRetries - 1) {
return socket;
}
} catch (error: any) {
logger.log('warn', `Connection attempt ${retryCount + 1} failed for browser ${id}: ${error.message}`);
}
retryCount++;
if (retryCount < maxRetries) {
const delay = Math.pow(2, retryCount) * 1000;
logger.log('info', `Retrying connection for browser ${id} in ${delay}ms (attempt ${retryCount + 1}/${maxRetries})`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
return null;
};
const socket = await connectWithRetry(3);
try {
let browserSession: RemoteBrowser;
@@ -288,9 +373,17 @@ const initializeBrowserAsync = async (id: string, userId: string) => {
logger.log('debug', `Starting browser initialization for ${id}`);
try {
await browserSession.initialize(userId);
logger.log('debug', `Browser initialization completed for ${id}`);
const BROWSER_INIT_TIMEOUT = 45000;
logger.log('info', `Browser initialization starting with ${BROWSER_INIT_TIMEOUT/1000}s timeout`);
const initPromise = browserSession.initialize(userId);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('Browser initialization timeout')), BROWSER_INIT_TIMEOUT);
});
await Promise.race([initPromise, timeoutPromise]);
} catch (initError: any) {
logger.log('error', `Browser initialization failed for ${id}: ${initError.message}`);
try {
await browserSession.switchOff();
logger.log('info', `Cleaned up failed browser initialization for ${id}`);

View File

@@ -7,9 +7,7 @@ import { Socket } from 'socket.io';
import logger from "../logger";
import { Coordinates, ScrollDeltas, KeyboardInput, DatePickerEventData } from '../types';
import { browserPool } from "../server";
import { WorkflowGenerator } from "../workflow-management/classes/Generator";
import { Page } from "playwright";
import { throttle } from "../../../src/helpers/inputHelpers";
import { CustomActions } from "../../../src/shared/types";
import { WhereWhatPair } from "maxun-core";
import { RemoteBrowser } from './classes/RemoteBrowser';
@@ -899,4 +897,37 @@ const registerInputHandlers = (socket: Socket, userId: string) => {
socket.on("dom:addpair", (data) => onDOMWorkflowPair(data, userId));
};
export default registerInputHandlers;
/**
* Removes all input handler socket listeners to prevent memory leaks
* Must be called when socket disconnects or browser session ends
* @param socket websocket with established connection
* @returns void
* @category BrowserManagement
*/
const removeInputHandlers = (socket: Socket) => {
try {
socket.removeAllListeners("input:mousedown");
socket.removeAllListeners("input:wheel");
socket.removeAllListeners("input:mousemove");
socket.removeAllListeners("input:keydown");
socket.removeAllListeners("input:keyup");
socket.removeAllListeners("input:url");
socket.removeAllListeners("input:refresh");
socket.removeAllListeners("input:back");
socket.removeAllListeners("input:forward");
socket.removeAllListeners("input:date");
socket.removeAllListeners("input:dropdown");
socket.removeAllListeners("input:time");
socket.removeAllListeners("input:datetime-local");
socket.removeAllListeners("action");
socket.removeAllListeners("dom:input");
socket.removeAllListeners("dom:click");
socket.removeAllListeners("dom:keypress");
socket.removeAllListeners("dom:addpair");
socket.removeAllListeners("removeAction");
} catch (error: any) {
console.warn(`Error removing input handlers: ${error.message}`);
}
};
export { registerInputHandlers, removeInputHandlers };

View File

@@ -1,6 +1,6 @@
import { Namespace, Socket } from 'socket.io';
import logger from "../logger";
import registerInputHandlers from '../browser-management/inputHandlers';
import { registerInputHandlers, removeInputHandlers } from '../browser-management/inputHandlers';
/**
* Opens a websocket canal for duplex data transfer and registers all handlers for this data for the recording session.
@@ -17,7 +17,11 @@ export const createSocketConnection = (
const onConnection = async (socket: Socket) => {
logger.log('info', "Client connected " + socket.id);
registerInputHandlers(socket, userId);
socket.on('disconnect', () => logger.log('info', "Client disconnected " + socket.id));
socket.on('disconnect', () => {
logger.log('info', "Client disconnected " + socket.id);
removeInputHandlers(socket);
logger.log('debug', "Input handlers cleaned up for socket " + socket.id);
});
callback(socket);
}

View File

@@ -71,6 +71,8 @@ export class WorkflowGenerator {
private poolId: string | null = null;
private pageCloseListeners: Map<Page, () => void> = new Map();
/**
* The public constructor of the WorkflowGenerator.
* Takes socket for communication as a parameter and registers some important events on it.
@@ -884,6 +886,29 @@ export class WorkflowGenerator {
}
};
/**
* Removes all socket listeners to prevent memory leaks
* Must be called before re-registering listeners or during cleanup
* @private
*/
private removeSocketListeners(): void {
try {
this.socket.removeAllListeners('setGetList');
this.socket.removeAllListeners('listSelector');
this.socket.removeAllListeners('setPaginationMode');
this.socket.removeAllListeners('dom-mode-enabled');
this.socket.removeAllListeners('screenshot-mode-enabled');
this.socket.removeAllListeners('save');
this.socket.removeAllListeners('new-recording');
this.socket.removeAllListeners('activeIndex');
this.socket.removeAllListeners('decision');
this.socket.removeAllListeners('updatePair');
logger.log('debug', 'Removed all Generator socket listeners');
} catch (error: any) {
logger.warn(`Error removing Generator socket listeners: ${error.message}`);
}
}
/**
* Removes an action with the given actionId from the workflow.
* Only removes the specific action from the what array, not the entire pair.
@@ -938,6 +963,39 @@ export class WorkflowGenerator {
this.initializeDOMListeners();
};
/**
* Cleanup method to release resources and prevent memory leaks
* Must be called when the generator is no longer needed
*/
public cleanup(): void {
try {
this.removeSocketListeners();
for (const [page, listener] of this.pageCloseListeners.entries()) {
try {
if (!page.isClosed()) {
page.removeListener('close', listener);
}
} catch (error: any) {
logger.warn(`Error removing page close listener: ${error.message}`);
}
}
this.pageCloseListeners.clear();
this.workflowRecord = { workflow: [] };
this.generatedData = {
lastUsedSelector: '',
lastIndex: null,
lastAction: '',
lastUsedSelectorTagName: '',
lastUsedSelectorInnerText: '',
};
logger.log('debug', 'Generator cleanup completed');
} catch (error: any) {
logger.error(`Error during Generator cleanup: ${error.message}`);
}
}
/**
* Returns the currently generated workflow without all the generated flag actions.
* @param workflow The workflow for removing the generated flag actions from.