Merge branch 'develop' into bulk-scraping
This commit is contained in:
@@ -9,6 +9,8 @@ import { chromium } from 'playwright-extra';
|
||||
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
|
||||
import { PlaywrightBlocker } from '@cliqz/adblocker-playwright';
|
||||
import fetch from 'cross-fetch';
|
||||
import { throttle } from 'lodash';
|
||||
import sharp from 'sharp';
|
||||
|
||||
import logger from '../../logger';
|
||||
import { InterpreterSettings, RemoteBrowserOptions } from "../../types";
|
||||
@@ -16,8 +18,30 @@ import { WorkflowGenerator } from "../../workflow-management/classes/Generator";
|
||||
import { WorkflowInterpreter } from "../../workflow-management/classes/Interpreter";
|
||||
import { getDecryptedProxyConfig } from '../../routes/proxy';
|
||||
import { getInjectableScript } from 'idcac-playwright';
|
||||
|
||||
chromium.use(stealthPlugin());
|
||||
|
||||
const MEMORY_CONFIG = {
|
||||
gcInterval: 60000, // 1 minute
|
||||
maxHeapSize: 2048 * 1024 * 1024, // 2GB
|
||||
heapUsageThreshold: 0.85 // 85%
|
||||
};
|
||||
|
||||
const SCREENCAST_CONFIG: {
|
||||
format: "jpeg" | "png";
|
||||
maxWidth: number;
|
||||
maxHeight: number;
|
||||
targetFPS: number;
|
||||
compressionQuality: number;
|
||||
maxQueueSize: number;
|
||||
} = {
|
||||
format: 'jpeg',
|
||||
maxWidth: 900,
|
||||
maxHeight: 400,
|
||||
targetFPS: 30,
|
||||
compressionQuality: 0.8,
|
||||
maxQueueSize: 2
|
||||
};
|
||||
|
||||
/**
|
||||
* This class represents a remote browser instance.
|
||||
@@ -78,6 +102,11 @@ export class RemoteBrowser {
|
||||
*/
|
||||
public interpreter: WorkflowInterpreter;
|
||||
|
||||
|
||||
private screenshotQueue: Buffer[] = [];
|
||||
private isProcessingScreenshot = false;
|
||||
private screencastInterval: NodeJS.Timeout | null = null
|
||||
|
||||
/**
|
||||
* Initializes a new instances of the {@link Generator} and {@link WorkflowInterpreter} classes and
|
||||
* assigns the socket instance everywhere.
|
||||
@@ -90,6 +119,46 @@ export class RemoteBrowser {
|
||||
this.generator = new WorkflowGenerator(socket);
|
||||
}
|
||||
|
||||
private initializeMemoryManagement(): void {
|
||||
setInterval(() => {
|
||||
const memoryUsage = process.memoryUsage();
|
||||
const heapUsageRatio = memoryUsage.heapUsed / MEMORY_CONFIG.maxHeapSize;
|
||||
|
||||
if (heapUsageRatio > MEMORY_CONFIG.heapUsageThreshold) {
|
||||
logger.warn('High memory usage detected, triggering cleanup');
|
||||
this.performMemoryCleanup();
|
||||
}
|
||||
|
||||
// Clear screenshot queue if it's too large
|
||||
if (this.screenshotQueue.length > SCREENCAST_CONFIG.maxQueueSize) {
|
||||
this.screenshotQueue = this.screenshotQueue.slice(-SCREENCAST_CONFIG.maxQueueSize);
|
||||
}
|
||||
}, MEMORY_CONFIG.gcInterval);
|
||||
}
|
||||
|
||||
private async performMemoryCleanup(): Promise<void> {
|
||||
this.screenshotQueue = [];
|
||||
this.isProcessingScreenshot = false;
|
||||
|
||||
if (global.gc) {
|
||||
global.gc();
|
||||
}
|
||||
|
||||
// Reset CDP session if needed
|
||||
if (this.client) {
|
||||
try {
|
||||
await this.stopScreencast();
|
||||
this.client = null;
|
||||
if (this.currentPage) {
|
||||
this.client = await this.currentPage.context().newCDPSession(this.currentPage);
|
||||
await this.startScreencast();
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error resetting CDP session:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalizes URLs to prevent navigation loops while maintaining consistent format
|
||||
*/
|
||||
@@ -157,7 +226,7 @@ export class RemoteBrowser {
|
||||
'Mozilla/5.0 (Windows NT 11.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.5938.62 Safari/537.36',
|
||||
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:118.0) Gecko/20100101 Firefox/118.0',
|
||||
];
|
||||
|
||||
|
||||
return userAgents[Math.floor(Math.random() * userAgents.length)];
|
||||
}
|
||||
|
||||
@@ -178,7 +247,7 @@ export class RemoteBrowser {
|
||||
"--disable-extensions",
|
||||
"--no-sandbox",
|
||||
"--disable-dev-shm-usage",
|
||||
],
|
||||
],
|
||||
}));
|
||||
const proxyConfig = await getDecryptedProxyConfig(userId);
|
||||
let proxyOptions: { server: string, username?: string, password?: string } = { server: '' };
|
||||
@@ -251,11 +320,11 @@ export class RemoteBrowser {
|
||||
this.client = await this.currentPage.context().newCDPSession(this.currentPage);
|
||||
await blocker.disableBlockingInPage(this.currentPage);
|
||||
console.log('Adblocker initialized');
|
||||
} catch (error: any) {
|
||||
} catch (error: any) {
|
||||
console.warn('Failed to initialize adblocker, continuing without it:', error.message);
|
||||
// Still need to set up the CDP session even if blocker fails
|
||||
this.client = await this.currentPage.context().newCDPSession(this.currentPage);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -319,7 +388,7 @@ export class RemoteBrowser {
|
||||
return;
|
||||
}
|
||||
this.client.on('Page.screencastFrame', ({ data: base64, sessionId }) => {
|
||||
this.emitScreenshot(base64)
|
||||
this.emitScreenshot(Buffer.from(base64, 'base64'))
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
if (!this.client) {
|
||||
@@ -339,16 +408,49 @@ export class RemoteBrowser {
|
||||
* If an interpretation was running it will be stopped.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
public switchOff = async (): Promise<void> => {
|
||||
await this.interpreter.stopInterpretation();
|
||||
if (this.browser) {
|
||||
await this.stopScreencast();
|
||||
await this.browser.close();
|
||||
} else {
|
||||
logger.log('error', 'Browser wasn\'t initialized');
|
||||
logger.log('error', 'Switching off the browser failed');
|
||||
public async switchOff(): Promise<void> {
|
||||
try {
|
||||
await this.interpreter.stopInterpretation();
|
||||
|
||||
if (this.screencastInterval) {
|
||||
clearInterval(this.screencastInterval);
|
||||
}
|
||||
|
||||
if (this.client) {
|
||||
await this.stopScreencast();
|
||||
}
|
||||
|
||||
if (this.browser) {
|
||||
await this.browser.close();
|
||||
}
|
||||
|
||||
this.screenshotQueue = [];
|
||||
//this.performanceMonitor.reset();
|
||||
|
||||
} catch (error) {
|
||||
logger.error('Error during browser shutdown:', error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private async optimizeScreenshot(screenshot: Buffer): Promise<Buffer> {
|
||||
try {
|
||||
return await sharp(screenshot)
|
||||
.jpeg({
|
||||
quality: Math.round(SCREENCAST_CONFIG.compressionQuality * 100),
|
||||
progressive: true
|
||||
})
|
||||
.resize({
|
||||
width: SCREENCAST_CONFIG.maxWidth,
|
||||
height: SCREENCAST_CONFIG.maxHeight,
|
||||
fit: 'inside',
|
||||
withoutEnlargement: true
|
||||
})
|
||||
.toBuffer();
|
||||
} catch (error) {
|
||||
logger.error('Screenshot optimization failed:', error);
|
||||
return screenshot;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes and emits a single screenshot to the client side.
|
||||
@@ -358,7 +460,7 @@ export class RemoteBrowser {
|
||||
try {
|
||||
const screenshot = await this.currentPage?.screenshot();
|
||||
if (screenshot) {
|
||||
this.emitScreenshot(screenshot.toString('base64'));
|
||||
this.emitScreenshot(screenshot);
|
||||
}
|
||||
} catch (e) {
|
||||
const { message } = e as Error;
|
||||
@@ -490,37 +592,85 @@ export class RemoteBrowser {
|
||||
* Should be called only once after the browser is fully initialized.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
private startScreencast = async (): Promise<void> => {
|
||||
private async startScreencast(): Promise<void> {
|
||||
if (!this.client) {
|
||||
logger.log('warn', 'client is not initialized');
|
||||
logger.warn('Client is not initialized');
|
||||
return;
|
||||
}
|
||||
await this.client.send('Page.startScreencast', { format: 'jpeg', quality: 75 });
|
||||
logger.log('info', `Browser started with screencasting a page.`);
|
||||
};
|
||||
|
||||
/**
|
||||
* Unsubscribes the current page from the screencast session.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
private stopScreencast = async (): Promise<void> => {
|
||||
if (!this.client) {
|
||||
logger.log('error', 'client is not initialized');
|
||||
logger.log('error', 'Screencast stop failed');
|
||||
} else {
|
||||
await this.client.send('Page.stopScreencast');
|
||||
logger.log('info', `Browser stopped with screencasting.`);
|
||||
try {
|
||||
await this.client.send('Page.startScreencast', {
|
||||
format: SCREENCAST_CONFIG.format,
|
||||
});
|
||||
|
||||
// Set up screencast frame handler
|
||||
this.client.on('Page.screencastFrame', async ({ data, sessionId }) => {
|
||||
try {
|
||||
const buffer = Buffer.from(data, 'base64');
|
||||
await this.emitScreenshot(buffer);
|
||||
await this.client?.send('Page.screencastFrameAck', { sessionId });
|
||||
} catch (error) {
|
||||
logger.error('Screencast frame processing failed:', error);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info('Screencast started successfully');
|
||||
} catch (error) {
|
||||
logger.error('Failed to start screencast:', error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private async stopScreencast(): Promise<void> {
|
||||
if (!this.client) {
|
||||
logger.error('Client is not initialized');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await this.client.send('Page.stopScreencast');
|
||||
this.screenshotQueue = [];
|
||||
this.isProcessingScreenshot = false;
|
||||
logger.info('Screencast stopped successfully');
|
||||
} catch (error) {
|
||||
logger.error('Failed to stop screencast:', error);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Helper for emitting the screenshot of browser's active page through websocket.
|
||||
* @param payload the screenshot binary data
|
||||
* @returns void
|
||||
*/
|
||||
private emitScreenshot = (payload: any): void => {
|
||||
const dataWithMimeType = ('data:image/jpeg;base64,').concat(payload);
|
||||
this.socket.emit('screencast', dataWithMimeType);
|
||||
logger.log('debug', `Screenshot emitted`);
|
||||
private emitScreenshot = async (payload: Buffer): Promise<void> => {
|
||||
if (this.isProcessingScreenshot) {
|
||||
if (this.screenshotQueue.length < SCREENCAST_CONFIG.maxQueueSize) {
|
||||
this.screenshotQueue.push(payload);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.isProcessingScreenshot = true;
|
||||
|
||||
try {
|
||||
const optimizedScreenshot = await this.optimizeScreenshot(payload);
|
||||
const base64Data = optimizedScreenshot.toString('base64');
|
||||
const dataWithMimeType = `data:image/jpeg;base64,${base64Data}`;
|
||||
|
||||
this.socket.emit('screencast', dataWithMimeType);
|
||||
logger.debug('Screenshot emitted');
|
||||
} catch (error) {
|
||||
logger.error('Screenshot emission failed:', error);
|
||||
} finally {
|
||||
this.isProcessingScreenshot = false;
|
||||
|
||||
if (this.screenshotQueue.length > 0) {
|
||||
const nextScreenshot = this.screenshotQueue.shift();
|
||||
if (nextScreenshot) {
|
||||
setTimeout(() => this.emitScreenshot(nextScreenshot), 1000 / SCREENCAST_CONFIG.targetFPS);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -384,7 +384,7 @@ router.get(
|
||||
httpOnly: false,
|
||||
maxAge: 60000,
|
||||
});
|
||||
res.redirect(process.env.PUBLIC_URL as string || "http://localhost:5173");
|
||||
res.redirect(`${process.env.PUBLIC_URL}/robots/${robotId}/integrate` as string || `http://localhost:5173/robots/${robotId}/integrate`);
|
||||
} catch (error: any) {
|
||||
res.status(500).json({ message: `Google OAuth error: ${error.message}` });
|
||||
}
|
||||
|
||||
@@ -6,14 +6,26 @@ import { InterpreterSettings } from "../../types";
|
||||
import { decrypt } from "../../utils/auth";
|
||||
|
||||
/**
|
||||
* Decrypts any encrypted inputs in the workflow.
|
||||
* Decrypts any encrypted inputs in the workflow. If checkLimit is true, it will also handle the limit validation for scrapeList action.
|
||||
* @param workflow The workflow to decrypt.
|
||||
* @param checkLimit If true, it will handle the limit validation for scrapeList action.
|
||||
*/
|
||||
function decryptWorkflow(workflow: WorkflowFile): WorkflowFile {
|
||||
const decryptedWorkflow = JSON.parse(JSON.stringify(workflow)) as WorkflowFile;
|
||||
function processWorkflow(workflow: WorkflowFile, checkLimit: boolean = false): WorkflowFile {
|
||||
const processedWorkflow = JSON.parse(JSON.stringify(workflow)) as WorkflowFile;
|
||||
|
||||
decryptedWorkflow.workflow.forEach((pair) => {
|
||||
processedWorkflow.workflow.forEach((pair) => {
|
||||
pair.what.forEach((action) => {
|
||||
// Handle limit validation for scrapeList action
|
||||
if (action.action === 'scrapeList' && checkLimit && Array.isArray(action.args) && action.args.length > 0) {
|
||||
const scrapeConfig = action.args[0];
|
||||
if (scrapeConfig && typeof scrapeConfig === 'object' && 'limit' in scrapeConfig) {
|
||||
if (typeof scrapeConfig.limit === 'number' && scrapeConfig.limit > 5) {
|
||||
scrapeConfig.limit = 5;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle decryption for type and press actions
|
||||
if ((action.action === 'type' || action.action === 'press') && Array.isArray(action.args) && action.args.length > 1) {
|
||||
try {
|
||||
const encryptedValue = action.args[1];
|
||||
@@ -33,7 +45,7 @@ function decryptWorkflow(workflow: WorkflowFile): WorkflowFile {
|
||||
});
|
||||
});
|
||||
|
||||
return decryptedWorkflow;
|
||||
return processedWorkflow;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -156,7 +168,7 @@ export class WorkflowInterpreter {
|
||||
const params = settings.params ? settings.params : null;
|
||||
delete settings.params;
|
||||
|
||||
const decryptedWorkflow = decryptWorkflow(workflow);
|
||||
const processedWorkflow = processWorkflow(workflow, true);
|
||||
|
||||
const options = {
|
||||
...settings,
|
||||
@@ -178,7 +190,7 @@ export class WorkflowInterpreter {
|
||||
}
|
||||
}
|
||||
|
||||
const interpreter = new Interpreter(decryptedWorkflow, options);
|
||||
const interpreter = new Interpreter(processedWorkflow, options);
|
||||
this.interpreter = interpreter;
|
||||
|
||||
interpreter.on('flag', async (page, resume) => {
|
||||
@@ -253,7 +265,7 @@ export class WorkflowInterpreter {
|
||||
const params = settings.params ? settings.params : null;
|
||||
delete settings.params;
|
||||
|
||||
const decryptedWorkflow = decryptWorkflow(workflow);
|
||||
const processedWorkflow = processWorkflow(workflow);
|
||||
|
||||
const options = {
|
||||
...settings,
|
||||
@@ -277,7 +289,7 @@ export class WorkflowInterpreter {
|
||||
}
|
||||
}
|
||||
|
||||
const interpreter = new Interpreter(decryptedWorkflow, options);
|
||||
const interpreter = new Interpreter(processedWorkflow, options);
|
||||
this.interpreter = interpreter;
|
||||
|
||||
interpreter.on('flag', async (page, resume) => {
|
||||
|
||||
@@ -175,7 +175,17 @@ export const getElementInformation = async (
|
||||
info.innerText = targetElement.textContent ?? '';
|
||||
} else if (targetElement.tagName === 'IMG') {
|
||||
info.imageUrl = (targetElement as HTMLImageElement).src;
|
||||
} else {
|
||||
} else if (targetElement?.tagName === 'SELECT') {
|
||||
const selectElement = targetElement as HTMLSelectElement;
|
||||
info.innerText = selectElement.options[selectElement.selectedIndex]?.text ?? '';
|
||||
info.attributes = {
|
||||
...info.attributes,
|
||||
selectedValue: selectElement.value,
|
||||
};
|
||||
} else if (targetElement?.tagName === 'INPUT' && (targetElement as HTMLInputElement).type === 'time' || (targetElement as HTMLInputElement).type === 'date') {
|
||||
info.innerText = (targetElement as HTMLInputElement).value;
|
||||
}
|
||||
else {
|
||||
info.hasOnlyText = targetElement.children.length === 0 &&
|
||||
(targetElement.textContent !== null &&
|
||||
targetElement.textContent.trim().length > 0);
|
||||
|
||||
Reference in New Issue
Block a user