feat: add abort checks
This commit is contained in:
@@ -64,6 +64,8 @@ export default class Interpreter extends EventEmitter {
|
|||||||
private concurrency: Concurrency;
|
private concurrency: Concurrency;
|
||||||
|
|
||||||
private stopper: Function | null = null;
|
private stopper: Function | null = null;
|
||||||
|
|
||||||
|
private isAborted: boolean = false;
|
||||||
|
|
||||||
private log: typeof log;
|
private log: typeof log;
|
||||||
|
|
||||||
@@ -114,6 +116,13 @@ export default class Interpreter extends EventEmitter {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the abort flag to immediately stop all operations
|
||||||
|
*/
|
||||||
|
public abort(): void {
|
||||||
|
this.isAborted = true;
|
||||||
|
}
|
||||||
|
|
||||||
private async applyAdBlocker(page: Page): Promise<void> {
|
private async applyAdBlocker(page: Page): Promise<void> {
|
||||||
if (this.blocker) {
|
if (this.blocker) {
|
||||||
try {
|
try {
|
||||||
@@ -372,6 +381,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
* @param steps Array of actions.
|
* @param steps Array of actions.
|
||||||
*/
|
*/
|
||||||
private async carryOutSteps(page: Page, steps: What[]): Promise<void> {
|
private async carryOutSteps(page: Page, steps: What[]): Promise<void> {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted, stopping execution', Level.WARN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines overloaded (or added) methods/actions usable in the workflow.
|
* Defines overloaded (or added) methods/actions usable in the workflow.
|
||||||
* If a method overloads any existing method of the Page class, it accepts the same set
|
* If a method overloads any existing method of the Page class, it accepts the same set
|
||||||
@@ -433,6 +447,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
},
|
},
|
||||||
|
|
||||||
scrapeSchema: async (schema: Record<string, { selector: string; tag: string, attribute: string; shadow: string}>) => {
|
scrapeSchema: async (schema: Record<string, { selector: string; tag: string, attribute: string; shadow: string}>) => {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted, stopping scrapeSchema', Level.WARN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (this.options.debugChannel?.setActionType) {
|
if (this.options.debugChannel?.setActionType) {
|
||||||
this.options.debugChannel.setActionType('scrapeSchema');
|
this.options.debugChannel.setActionType('scrapeSchema');
|
||||||
}
|
}
|
||||||
@@ -468,6 +487,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
},
|
},
|
||||||
|
|
||||||
scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }) => {
|
scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }) => {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted, stopping scrapeList', Level.WARN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (this.options.debugChannel?.setActionType) {
|
if (this.options.debugChannel?.setActionType) {
|
||||||
this.options.debugChannel.setActionType('scrapeList');
|
this.options.debugChannel.setActionType('scrapeList');
|
||||||
}
|
}
|
||||||
@@ -622,6 +646,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
limit?: number,
|
limit?: number,
|
||||||
pagination: any
|
pagination: any
|
||||||
}) {
|
}) {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted, stopping pagination', Level.WARN);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
let allResults: Record<string, any>[] = [];
|
let allResults: Record<string, any>[] = [];
|
||||||
let previousHeight = 0;
|
let previousHeight = 0;
|
||||||
let scrapedItems: Set<string> = new Set<string>();
|
let scrapedItems: Set<string> = new Set<string>();
|
||||||
@@ -635,6 +664,12 @@ export default class Interpreter extends EventEmitter {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const scrapeCurrentPage = async () => {
|
const scrapeCurrentPage = async () => {
|
||||||
|
// Check abort flag before scraping current page
|
||||||
|
if (this.isAborted) {
|
||||||
|
debugLog("Workflow aborted, stopping scrapeCurrentPage");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const results = await page.evaluate((cfg) => window.scrapeList(cfg), config);
|
const results = await page.evaluate((cfg) => window.scrapeList(cfg), config);
|
||||||
const newResults = results.filter(item => {
|
const newResults = results.filter(item => {
|
||||||
const uniqueKey = JSON.stringify(item);
|
const uniqueKey = JSON.stringify(item);
|
||||||
@@ -723,7 +758,12 @@ export default class Interpreter extends EventEmitter {
|
|||||||
let unchangedResultCounter = 0;
|
let unchangedResultCounter = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted during pagination loop', Level.WARN);
|
||||||
|
return allResults;
|
||||||
|
}
|
||||||
|
|
||||||
switch (config.pagination.type) {
|
switch (config.pagination.type) {
|
||||||
case 'scrollDown': {
|
case 'scrollDown': {
|
||||||
let previousResultCount = allResults.length;
|
let previousResultCount = allResults.length;
|
||||||
@@ -969,6 +1009,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
// const MAX_NO_NEW_ITEMS = 2;
|
// const MAX_NO_NEW_ITEMS = 2;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted during pagination loop', Level.WARN);
|
||||||
|
return allResults;
|
||||||
|
}
|
||||||
|
|
||||||
// Find working button with retry mechanism
|
// Find working button with retry mechanism
|
||||||
const { button: loadMoreButton, workingSelector, updatedSelectors } = await findWorkingButton(availableSelectors);
|
const { button: loadMoreButton, workingSelector, updatedSelectors } = await findWorkingButton(availableSelectors);
|
||||||
|
|
||||||
@@ -1120,6 +1165,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async runLoop(p: Page, workflow: Workflow) {
|
private async runLoop(p: Page, workflow: Workflow) {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted in runLoop', Level.WARN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let workflowCopy: Workflow = JSON.parse(JSON.stringify(workflow));
|
let workflowCopy: Workflow = JSON.parse(JSON.stringify(workflow));
|
||||||
|
|
||||||
workflowCopy = this.removeSpecialSelectors(workflowCopy);
|
workflowCopy = this.removeSpecialSelectors(workflowCopy);
|
||||||
@@ -1150,6 +1200,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker
|
const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted during step execution', Level.WARN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Circuit breaker to prevent infinite loops
|
// Circuit breaker to prevent infinite loops
|
||||||
if (++loopIterations > MAX_LOOP_ITERATIONS) {
|
if (++loopIterations > MAX_LOOP_ITERATIONS) {
|
||||||
this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR);
|
this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR);
|
||||||
@@ -1232,6 +1287,11 @@ export default class Interpreter extends EventEmitter {
|
|||||||
}
|
}
|
||||||
lastAction = action;
|
lastAction = action;
|
||||||
|
|
||||||
|
if (this.isAborted) {
|
||||||
|
this.log('Workflow aborted before action execution', Level.WARN);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
console.log("Carrying out:", action.what);
|
console.log("Carrying out:", action.what);
|
||||||
await this.carryOutSteps(p, action.what);
|
await this.carryOutSteps(p, action.what);
|
||||||
|
|||||||
Reference in New Issue
Block a user