diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index ae224b9e..2937c40b 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -123,6 +123,13 @@ export default class Interpreter extends EventEmitter { this.isAborted = true; } + /** + * Returns the current abort status + */ + public getIsAborted(): boolean { + return this.isAborted; + } + private async applyAdBlocker(page: Page): Promise { if (this.blocker) { try { @@ -610,6 +617,13 @@ export default class Interpreter extends EventEmitter { if (methodName === 'waitForLoadState') { try { + let args = step.args; + + if (Array.isArray(args) && args.length === 1) { + args = [args[0], { timeout: 30000 }]; + } else if (!Array.isArray(args)) { + args = [args, { timeout: 30000 }]; + } await executeAction(invokee, methodName, step.args); } catch (error) { await executeAction(invokee, methodName, 'domcontentloaded'); @@ -670,7 +684,19 @@ export default class Interpreter extends EventEmitter { return; } - const results = await page.evaluate((cfg) => window.scrapeList(cfg), config); + const evaluationPromise = page.evaluate((cfg) => window.scrapeList(cfg), config); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Page evaluation timeout')), 10000) + ); + + let results; + try { + results = await Promise.race([evaluationPromise, timeoutPromise]); + } catch (error) { + debugLog(`Page evaluation failed: ${error.message}`); + return; + } + const newResults = results.filter(item => { const uniqueKey = JSON.stringify(item); if (scrapedItems.has(uniqueKey)) return false; @@ -691,43 +717,94 @@ export default class Interpreter extends EventEmitter { return false; }; + // Helper function to detect if a selector is XPath + const isXPathSelector = (selector: string): boolean => { + return selector.startsWith('//') || + selector.startsWith('/') || + selector.startsWith('./') || + selector.includes('contains(@') || + selector.includes('[count(') || + selector.includes('@class=') || + selector.includes('@id=') || + selector.includes(' and ') || + selector.includes(' or '); + }; + + // Helper function to wait for selector (CSS or XPath) + const waitForSelectorUniversal = async (selector: string, options: any = {}): Promise => { + try { + if (isXPathSelector(selector)) { + // Use XPath locator + const locator = page.locator(`xpath=${selector}`); + await locator.waitFor({ + state: 'attached', + timeout: options.timeout || 10000 + }); + return await locator.elementHandle(); + } else { + // Use CSS selector + return await page.waitForSelector(selector, { + state: 'attached', + timeout: options.timeout || 10000 + }); + } + } catch (error) { + return null; + } + }; + // Enhanced button finder with retry mechanism - const findWorkingButton = async (selectors: string[]): Promise<{ - button: ElementHandle | null, + const findWorkingButton = async (selectors: string[]): Promise<{ + button: ElementHandle | null, workingSelector: string | null, updatedSelectors: string[] }> => { - let updatedSelectors = [...selectors]; - + const startTime = Date.now(); + const MAX_BUTTON_SEARCH_TIME = 15000; + let updatedSelectors = [...selectors]; + for (let i = 0; i < selectors.length; i++) { + if (Date.now() - startTime > MAX_BUTTON_SEARCH_TIME) { + debugLog(`Button search timeout reached (${MAX_BUTTON_SEARCH_TIME}ms), aborting`); + break; + } const selector = selectors[i]; let retryCount = 0; let selectorSuccess = false; while (retryCount < MAX_RETRIES && !selectorSuccess) { try { - const button = await page.waitForSelector(selector, { - state: 'attached', - timeout: 10000 - }); - + const button = await waitForSelectorUniversal(selector, { timeout: 2000 }); + if (button) { debugLog('Found working selector:', selector); - return { - button, + return { + button, workingSelector: selector, - updatedSelectors + updatedSelectors }; + } else { + retryCount++; + debugLog(`Selector "${selector}" not found: attempt ${retryCount}/${MAX_RETRIES}`); + + if (retryCount < MAX_RETRIES) { + await page.waitForTimeout(RETRY_DELAY); + } else { + debugLog(`Removing failed selector "${selector}" after ${MAX_RETRIES} attempts`); + updatedSelectors = updatedSelectors.filter(s => s !== selector); + selectorSuccess = true; + } } } catch (error) { retryCount++; - debugLog(`Selector "${selector}" failed: attempt ${retryCount}/${MAX_RETRIES}`); - + debugLog(`Selector "${selector}" error: attempt ${retryCount}/${MAX_RETRIES} - ${error.message}`); + if (retryCount < MAX_RETRIES) { await page.waitForTimeout(RETRY_DELAY); } else { debugLog(`Removing failed selector "${selector}" after ${MAX_RETRIES} attempts`); updatedSelectors = updatedSelectors.filter(s => s !== selector); + selectorSuccess = true; } } } @@ -1347,9 +1424,35 @@ export default class Interpreter extends EventEmitter { } private async ensureScriptsLoaded(page: Page) { - const isScriptLoaded = await page.evaluate(() => typeof window.scrape === 'function' && typeof window.scrapeSchema === 'function' && typeof window.scrapeList === 'function' && typeof window.scrapeListAuto === 'function' && typeof window.scrollDown === 'function' && typeof window.scrollUp === 'function'); - if (!isScriptLoaded) { - await page.addInitScript({ path: path.join(__dirname, 'browserSide', 'scraper.js') }); + try { + const evaluationPromise = page.evaluate(() => + typeof window.scrape === 'function' && + typeof window.scrapeSchema === 'function' && + typeof window.scrapeList === 'function' && + typeof window.scrapeListAuto === 'function' && + typeof window.scrollDown === 'function' && + typeof window.scrollUp === 'function' + ); + + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Script check timeout')), 3000) + ); + + const isScriptLoaded = await Promise.race([ + evaluationPromise, + timeoutPromise + ]); + + if (!isScriptLoaded) { + await page.addInitScript({ path: path.join(__dirname, 'browserSide', 'scraper.js') }); + } + } catch (error) { + this.log(`Script check failed, adding script anyway: ${error.message}`, Level.WARN); + try { + await page.addInitScript({ path: path.join(__dirname, 'browserSide', 'scraper.js') }); + } catch (scriptError) { + this.log(`Failed to add script: ${scriptError.message}`, Level.ERROR); + } } } diff --git a/server/src/api/record.ts b/server/src/api/record.ts index cdaf89f9..5d9a68cd 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -710,8 +710,8 @@ async function executeRun(id: string, userId: string) { retries: 5, }; - processAirtableUpdates(); - processGoogleSheetUpdates(); + processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); } diff --git a/server/src/browser-management/controller.ts b/server/src/browser-management/controller.ts index 1c6ecb5c..a6db615e 100644 --- a/server/src/browser-management/controller.ts +++ b/server/src/browser-management/controller.ts @@ -77,7 +77,11 @@ export const createRemoteBrowserForRun = (userId: string): string => { logger.log('info', `createRemoteBrowserForRun: Reserved slot ${id} for user ${userId}`); - initializeBrowserAsync(id, userId); + initializeBrowserAsync(id, userId) + .catch((error: any) => { + logger.log('error', `Unhandled error in initializeBrowserAsync for browser ${id}: ${error.message}`); + browserPool.failBrowserSlot(id); + }); return id; }; @@ -110,7 +114,16 @@ export const destroyRemoteBrowser = async (id: string, userId: string): Promise< } catch (switchOffError) { logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`); } - + + try { + const namespace = io.of(id); + namespace.removeAllListeners(); + namespace.disconnectSockets(true); + logger.log('debug', `Cleaned up socket namespace for browser ${id}`); + } catch (namespaceCleanupError: any) { + logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`); + } + return browserPool.deleteRemoteBrowser(id); } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); @@ -273,11 +286,27 @@ const initializeBrowserAsync = async (id: string, userId: string) => { } logger.log('debug', `Starting browser initialization for ${id}`); - await browserSession.initialize(userId); - logger.log('debug', `Browser initialization completed for ${id}`); - + + try { + await browserSession.initialize(userId); + logger.log('debug', `Browser initialization completed for ${id}`); + } catch (initError: any) { + try { + await browserSession.switchOff(); + logger.log('info', `Cleaned up failed browser initialization for ${id}`); + } catch (cleanupError: any) { + logger.log('error', `Failed to cleanup browser ${id}: ${cleanupError.message}`); + } + throw initError; + } + const upgraded = browserPool.upgradeBrowserSlot(id, browserSession); if (!upgraded) { + try { + await browserSession.switchOff(); + } catch (cleanupError: any) { + logger.log('error', `Failed to cleanup browser after slot upgrade failure: ${cleanupError.message}`); + } throw new Error('Failed to upgrade reserved browser slot'); } diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index ccc93131..b501f6c9 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -102,8 +102,8 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr retries: 5, }; - processAirtableUpdates(); - processGoogleSheetUpdates(); + processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); } diff --git a/server/src/server.ts b/server/src/server.ts index c49b367b..5aa8efed 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -37,6 +37,12 @@ const pool = new Pool({ database: process.env.DB_NAME, password: process.env.DB_PASSWORD, port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined, + max: 50, + min: 5, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 10000, + maxUses: 7500, + allowExitOnIdle: true }); const PgSession = connectPgSimple(session); @@ -215,6 +221,22 @@ if (require.main === module) { }); } +process.on('unhandledRejection', (reason, promise) => { + logger.log('error', `Unhandled promise rejection at: ${promise}, reason: ${reason}`); + console.error('Unhandled promise rejection:', reason); +}); + +process.on('uncaughtException', (error) => { + logger.log('error', `Uncaught exception: ${error.message}`, { stack: error.stack }); + console.error('Uncaught exception:', error); + + if (process.env.NODE_ENV === 'production') { + setTimeout(() => { + process.exit(1); + }, 5000); + } +}); + if (require.main === module) { process.on('SIGINT', async () => { console.log('Main app shutting down...'); diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 0ed19f19..8c1f7e5e 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -118,6 +118,23 @@ export class WorkflowInterpreter { */ private currentRunId: string | null = null; + /** + * Batched persistence system for performance optimization + */ + private persistenceBuffer: Array<{ + actionType: string; + data: any; + listIndex?: number; + timestamp: number; + creditValidated: boolean; + }> = []; + + private persistenceTimer: NodeJS.Timeout | null = null; + private readonly BATCH_SIZE = 5; + private readonly BATCH_TIMEOUT = 3000; + private persistenceInProgress = false; + private persistenceRetryCount = 0; + /** * An array of id's of the pairs from the workflow that are about to be paused. * As "breakpoints". @@ -303,13 +320,39 @@ export class WorkflowInterpreter { await this.interpreter.stop(); this.socket.emit('log', '----- The interpretation has been stopped -----', false); - this.clearState(); + await this.clearState(); } else { logger.log('error', 'Cannot stop: No active interpretation.'); } }; - private clearState = () => { + public clearState = async (): Promise => { + if (this.persistenceBuffer.length > 0) { + try { + await this.flushPersistenceBuffer(); + logger.log('debug', 'Successfully flushed final persistence buffer during cleanup'); + } catch (error: any) { + logger.log('error', `Failed to flush final persistence buffer: ${error.message}`); + } + } + + if (this.persistenceTimer) { + clearTimeout(this.persistenceTimer); + this.persistenceTimer = null; + } + + if (this.interpreter) { + try { + if (!this.interpreter.getIsAborted()) { + this.interpreter.abort(); + } + await this.interpreter.stop(); + logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup'); + } catch (error: any) { + logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`); + } + } + this.debugMessages = []; this.interpretationIsPaused = false; this.activeId = null; @@ -324,6 +367,9 @@ export class WorkflowInterpreter { this.binaryData = []; this.currentScrapeListIndex = 0; this.currentRunId = null; + this.persistenceBuffer = []; + this.persistenceInProgress = false; + this.persistenceRetryCount = 0; } /** @@ -336,61 +382,22 @@ export class WorkflowInterpreter { }; /** - * Persists data to database in real-time during interpretation + * Persists extracted data to database with intelligent batching for performance + * Falls back to immediate persistence for critical operations * @private */ private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise => { if (!this.currentRunId) { - logger.log('debug', 'No run ID available for real-time persistence'); + logger.log('debug', 'No run ID available for persistence'); return; } - try { - const run = await Run.findOne({ where: { runId: this.currentRunId } }); - - if (!run) { - logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`); - return; - } - - const currentSerializableOutput = run.serializableOutput ? - JSON.parse(JSON.stringify(run.serializableOutput)) : - { scrapeSchema: [], scrapeList: [] }; + this.addToPersistenceBatch(actionType, data, listIndex, true); - if (actionType === 'scrapeSchema') { - const newSchemaData = Array.isArray(data) ? data : [data]; - const updatedOutput = { - ...currentSerializableOutput, - scrapeSchema: newSchemaData - }; - - await run.update({ - serializableOutput: updatedOutput - }); - - logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`); - - } else if (actionType === 'scrapeList' && typeof listIndex === 'number') { - if (!Array.isArray(currentSerializableOutput.scrapeList)) { - currentSerializableOutput.scrapeList = []; - } - - const updatedList = [...currentSerializableOutput.scrapeList]; - updatedList[listIndex] = data; - - const updatedOutput = { - ...currentSerializableOutput, - scrapeList: updatedList - }; - - await run.update({ - serializableOutput: updatedOutput - }); - - logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`); - } - } catch (error: any) { - logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`); + if (actionType === 'scrapeSchema' || this.persistenceBuffer.length >= this.BATCH_SIZE) { + await this.flushPersistenceBuffer(); + } else { + this.scheduleBatchFlush(); } }; @@ -548,7 +555,6 @@ export class WorkflowInterpreter { } logger.log('debug', `Interpretation finished`); - this.clearState(); return result; } @@ -569,4 +575,121 @@ export class WorkflowInterpreter { this.socket = socket; this.subscribeToPausing(); }; + + /** + * Adds data to persistence buffer for batched processing + * @private + */ + private addToPersistenceBatch(actionType: string, data: any, listIndex?: number, creditValidated: boolean = false): void { + this.persistenceBuffer.push({ + actionType, + data, + listIndex, + timestamp: Date.now(), + creditValidated + }); + + logger.log('debug', `Added ${actionType} to persistence buffer (${this.persistenceBuffer.length} items)`); + } + + /** + * Schedules a batched flush if not already scheduled + * @private + */ + private scheduleBatchFlush(): void { + if (!this.persistenceTimer && !this.persistenceInProgress) { + this.persistenceTimer = setTimeout(async () => { + await this.flushPersistenceBuffer(); + }, this.BATCH_TIMEOUT); + } + } + + /** + * Flushes persistence buffer to database in a single transaction + * @private + */ + private async flushPersistenceBuffer(): Promise { + if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { + return; + } + + if (this.persistenceTimer) { + clearTimeout(this.persistenceTimer); + this.persistenceTimer = null; + } + + this.persistenceInProgress = true; + const batchToProcess = [...this.persistenceBuffer]; + this.persistenceBuffer = []; + + try { + const sequelize = require('../../storage/db').default; + await sequelize.transaction(async (transaction: any) => { + const { Run } = require('../../models'); + const run = await Run.findOne({ + where: { runId: this.currentRunId! }, + transaction + }); + + if (!run) { + logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`); + return; + } + + const currentSerializableOutput = run.serializableOutput ? + JSON.parse(JSON.stringify(run.serializableOutput)) : + { scrapeSchema: [], scrapeList: [] }; + + let hasUpdates = false; + + for (const item of batchToProcess) { + if (item.actionType === 'scrapeSchema') { + const newSchemaData = Array.isArray(item.data) ? item.data : [item.data]; + currentSerializableOutput.scrapeSchema = newSchemaData; + hasUpdates = true; + } else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') { + if (!Array.isArray(currentSerializableOutput.scrapeList)) { + currentSerializableOutput.scrapeList = []; + } + currentSerializableOutput.scrapeList[item.listIndex] = item.data; + hasUpdates = true; + } + } + + if (hasUpdates) { + await run.update({ + serializableOutput: currentSerializableOutput + }, { transaction }); + + logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`); + } + }); + + this.persistenceRetryCount = 0; + + } catch (error: any) { + logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`); + + if (!this.persistenceRetryCount) { + this.persistenceRetryCount = 0; + } + + if (this.persistenceRetryCount < 3) { + this.persistenceBuffer.unshift(...batchToProcess); + this.persistenceRetryCount++; + + const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000); + setTimeout(async () => { + await this.flushPersistenceBuffer(); + }, backoffDelay); + + logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`); + } else { + logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`); + this.persistenceRetryCount = 0; + } + } finally { + this.persistenceInProgress = false; + } + }; } diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index 5f72c836..e1f27264 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -454,33 +454,36 @@ function isValidUrl(str: string): boolean { } export const processAirtableUpdates = async () => { - while (true) { + const maxProcessingTime = 60000; + const startTime = Date.now(); + + while (Date.now() - startTime < maxProcessingTime) { let hasPendingTasks = false; - + for (const runId in airtableUpdateTasks) { const task = airtableUpdateTasks[runId]; - + if (task.status === 'pending') { hasPendingTasks = true; console.log(`Processing Airtable update for run: ${runId}`); - + try { await updateAirtable(task.robotId, task.runId); console.log(`Successfully updated Airtable for runId: ${runId}`); - airtableUpdateTasks[runId].status = 'completed'; - delete airtableUpdateTasks[runId]; + delete airtableUpdateTasks[runId]; } catch (error: any) { console.error(`Failed to update Airtable for run ${task.runId}:`, error); - + if (task.retries < MAX_RETRIES) { airtableUpdateTasks[runId].retries += 1; console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries + 1}`); } else { - airtableUpdateTasks[runId].status = 'failed'; - console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`); - logger.log('error', `Permanent failure for run ${runId}: ${error.message}`); + console.log(`Max retries reached for runId: ${runId}. Removing task.`); + delete airtableUpdateTasks[runId]; } } + } else if (task.status === 'completed' || task.status === 'failed') { + delete airtableUpdateTasks[runId]; } } @@ -488,8 +491,10 @@ export const processAirtableUpdates = async () => { console.log('No pending Airtable update tasks, exiting processor'); break; } - + console.log('Waiting for 5 seconds before checking again...'); await new Promise(resolve => setTimeout(resolve, 5000)); } + + console.log('Airtable processing completed or timed out'); }; \ No newline at end of file diff --git a/server/src/workflow-management/integrations/gsheet.ts b/server/src/workflow-management/integrations/gsheet.ts index 2a29bdcc..fcf9b95c 100644 --- a/server/src/workflow-management/integrations/gsheet.ts +++ b/server/src/workflow-management/integrations/gsheet.ts @@ -286,8 +286,12 @@ export async function writeDataToSheet( } export const processGoogleSheetUpdates = async () => { - while (true) { + const maxProcessingTime = 60000; + const startTime = Date.now(); + + while (Date.now() - startTime < maxProcessingTime) { let hasPendingTasks = false; + for (const runId in googleSheetUpdateTasks) { const task = googleSheetUpdateTasks[runId]; console.log(`Processing task for runId: ${runId}, status: ${task.status}`); @@ -297,7 +301,6 @@ export const processGoogleSheetUpdates = async () => { try { await updateGoogleSheet(task.robotId, task.runId); console.log(`Successfully updated Google Sheet for runId: ${runId}`); - googleSheetUpdateTasks[runId].status = 'completed'; delete googleSheetUpdateTasks[runId]; } catch (error: any) { console.error(`Failed to update Google Sheets for run ${task.runId}:`, error); @@ -305,10 +308,12 @@ export const processGoogleSheetUpdates = async () => { googleSheetUpdateTasks[runId].retries += 1; console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`); } else { - googleSheetUpdateTasks[runId].status = 'failed'; - console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`); + console.log(`Max retries reached for runId: ${runId}. Removing task.`); + delete googleSheetUpdateTasks[runId]; } } + } else if (task.status === 'completed' || task.status === 'failed') { + delete googleSheetUpdateTasks[runId]; } } @@ -320,4 +325,6 @@ export const processGoogleSheetUpdates = async () => { console.log('Waiting for 5 seconds before checking again...'); await new Promise(resolve => setTimeout(resolve, 5000)); } + + console.log('Google Sheets processing completed or timed out'); }; \ No newline at end of file diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 195e1888..ce272689 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -277,8 +277,8 @@ async function executeRun(id: string, userId: string) { retries: 5, }; - processAirtableUpdates(); - processGoogleSheetUpdates(); + processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`)); + processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`)); } catch (err: any) { logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); }