fix(core): clean socket handlers causing memory leaks

This commit is contained in:
amhsirak
2025-11-28 15:03:43 +05:30
parent ad8df66ecd
commit bf1a6f1aa7

View File

@@ -460,8 +460,9 @@ export default class Interpreter extends EventEmitter {
for (const link of links) { for (const link of links) {
// eslint-disable-next-line // eslint-disable-next-line
this.concurrency.addJob(async () => { this.concurrency.addJob(async () => {
let newPage = null;
try { try {
const newPage = await context.newPage(); newPage = await context.newPage();
await newPage.goto(link); await newPage.goto(link);
await newPage.waitForLoadState('networkidle'); await newPage.waitForLoadState('networkidle');
await this.runLoop(newPage, this.initializedWorkflow!); await this.runLoop(newPage, this.initializedWorkflow!);
@@ -470,6 +471,14 @@ export default class Interpreter extends EventEmitter {
// but newPage(), goto() and waitForLoadState() don't (and will kill // but newPage(), goto() and waitForLoadState() don't (and will kill
// the interpreter by throwing). // the interpreter by throwing).
this.log(<Error>e, Level.ERROR); this.log(<Error>e, Level.ERROR);
} finally {
if (newPage && !newPage.isClosed()) {
try {
await newPage.close();
} catch (closeError) {
this.log('Failed to close enqueued page', Level.WARN);
}
}
} }
}); });
} }
@@ -1463,41 +1472,57 @@ export default class Interpreter extends EventEmitter {
* User-requested concurrency should be entirely managed by the concurrency manager, * User-requested concurrency should be entirely managed by the concurrency manager,
* e.g. via `enqueueLinks`. * e.g. via `enqueueLinks`.
*/ */
p.on('popup', (popup) => { const popupHandler = (popup) => {
this.concurrency.addJob(() => this.runLoop(popup, workflowCopy)); this.concurrency.addJob(() => this.runLoop(popup, workflowCopy));
}); };
p.on('popup', popupHandler);
/* eslint no-constant-condition: ["warn", { "checkLoops": false }] */ /* eslint no-constant-condition: ["warn", { "checkLoops": false }] */
let loopIterations = 0; let loopIterations = 0;
const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker
// Cleanup function to remove popup listener
const cleanup = () => {
try {
if (!p.isClosed()) {
p.removeListener('popup', popupHandler);
}
} catch (cleanupError) {
}
};
while (true) { while (true) {
if (this.isAborted) { if (this.isAborted) {
this.log('Workflow aborted during step execution', Level.WARN); this.log('Workflow aborted during step execution', Level.WARN);
cleanup();
return; return;
} }
// Circuit breaker to prevent infinite loops // Circuit breaker to prevent infinite loops
if (++loopIterations > MAX_LOOP_ITERATIONS) { if (++loopIterations > MAX_LOOP_ITERATIONS) {
this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR); this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR);
cleanup();
return; return;
} }
// Checks whether the page was closed from outside, // Checks whether the page was closed from outside,
// or the workflow execution has been stopped via `interpreter.stop()` // or the workflow execution has been stopped via `interpreter.stop()`
if (p.isClosed() || !this.stopper) { if (p.isClosed() || !this.stopper) {
cleanup();
return; return;
} }
try { try {
await p.waitForLoadState(); await p.waitForLoadState();
} catch (e) { } catch (e) {
cleanup();
await p.close(); await p.close();
return; return;
} }
if (workflowCopy.length === 0) { if (workflowCopy.length === 0) {
this.log('All actions completed. Workflow finished.', Level.LOG); this.log('All actions completed. Workflow finished.', Level.LOG);
cleanup();
return; return;
} }
@@ -1589,6 +1614,7 @@ export default class Interpreter extends EventEmitter {
} }
} else { } else {
//await this.disableAdBlocker(p); //await this.disableAdBlocker(p);
cleanup();
return; return;
} }
} }
@@ -1681,4 +1707,44 @@ export default class Interpreter extends EventEmitter {
throw new Error('Cannot stop, there is no running workflow!'); throw new Error('Cannot stop, there is no running workflow!');
} }
} }
/**
* Cleanup method to release resources and prevent memory leaks
* Call this when the interpreter is no longer needed
*/
public async cleanup(): Promise<void> {
try {
// Stop any running workflows first
if (this.stopper) {
try {
await this.stop();
} catch (error: any) {
this.log(`Error stopping workflow during cleanup: ${error.message}`, Level.WARN);
}
}
// Clear ad-blocker resources
if (this.blocker) {
try {
this.blocker = null;
this.log('Ad-blocker resources cleared', Level.DEBUG);
} catch (error: any) {
this.log(`Error cleaning up ad-blocker: ${error.message}`, Level.WARN);
}
}
// Clear accumulated data to free memory
this.cumulativeResults = [];
this.namedResults = {};
this.serializableDataByType = { scrapeList: {}, scrapeSchema: {} };
// Reset state
this.isAborted = false;
this.initializedWorkflow = null;
this.log('Interpreter cleanup completed', Level.DEBUG);
} catch (error: any) {
this.log(`Error during interpreter cleanup: ${error.message}`, Level.ERROR);
throw error;
}
}
} }