Merge pull request #811 from getmaxun/optim-record
fix(maxun-core): robot browser recording crashes
This commit is contained in:
@@ -123,6 +123,13 @@ export default class Interpreter extends EventEmitter {
|
||||
this.isAborted = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current abort status
|
||||
*/
|
||||
public getIsAborted(): boolean {
|
||||
return this.isAborted;
|
||||
}
|
||||
|
||||
private async applyAdBlocker(page: Page): Promise<void> {
|
||||
if (this.blocker) {
|
||||
try {
|
||||
@@ -610,6 +617,13 @@ export default class Interpreter extends EventEmitter {
|
||||
|
||||
if (methodName === 'waitForLoadState') {
|
||||
try {
|
||||
let args = step.args;
|
||||
|
||||
if (Array.isArray(args) && args.length === 1) {
|
||||
args = [args[0], { timeout: 30000 }];
|
||||
} else if (!Array.isArray(args)) {
|
||||
args = [args, { timeout: 30000 }];
|
||||
}
|
||||
await executeAction(invokee, methodName, step.args);
|
||||
} catch (error) {
|
||||
await executeAction(invokee, methodName, 'domcontentloaded');
|
||||
@@ -670,7 +684,19 @@ export default class Interpreter extends EventEmitter {
|
||||
return;
|
||||
}
|
||||
|
||||
const results = await page.evaluate((cfg) => window.scrapeList(cfg), config);
|
||||
const evaluationPromise = page.evaluate((cfg) => window.scrapeList(cfg), config);
|
||||
const timeoutPromise = new Promise<any[]>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Page evaluation timeout')), 10000)
|
||||
);
|
||||
|
||||
let results;
|
||||
try {
|
||||
results = await Promise.race([evaluationPromise, timeoutPromise]);
|
||||
} catch (error) {
|
||||
debugLog(`Page evaluation failed: ${error.message}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const newResults = results.filter(item => {
|
||||
const uniqueKey = JSON.stringify(item);
|
||||
if (scrapedItems.has(uniqueKey)) return false;
|
||||
@@ -691,43 +717,94 @@ export default class Interpreter extends EventEmitter {
|
||||
return false;
|
||||
};
|
||||
|
||||
// Helper function to detect if a selector is XPath
|
||||
const isXPathSelector = (selector: string): boolean => {
|
||||
return selector.startsWith('//') ||
|
||||
selector.startsWith('/') ||
|
||||
selector.startsWith('./') ||
|
||||
selector.includes('contains(@') ||
|
||||
selector.includes('[count(') ||
|
||||
selector.includes('@class=') ||
|
||||
selector.includes('@id=') ||
|
||||
selector.includes(' and ') ||
|
||||
selector.includes(' or ');
|
||||
};
|
||||
|
||||
// Helper function to wait for selector (CSS or XPath)
|
||||
const waitForSelectorUniversal = async (selector: string, options: any = {}): Promise<ElementHandle | null> => {
|
||||
try {
|
||||
if (isXPathSelector(selector)) {
|
||||
// Use XPath locator
|
||||
const locator = page.locator(`xpath=${selector}`);
|
||||
await locator.waitFor({
|
||||
state: 'attached',
|
||||
timeout: options.timeout || 10000
|
||||
});
|
||||
return await locator.elementHandle();
|
||||
} else {
|
||||
// Use CSS selector
|
||||
return await page.waitForSelector(selector, {
|
||||
state: 'attached',
|
||||
timeout: options.timeout || 10000
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
// Enhanced button finder with retry mechanism
|
||||
const findWorkingButton = async (selectors: string[]): Promise<{
|
||||
button: ElementHandle | null,
|
||||
const findWorkingButton = async (selectors: string[]): Promise<{
|
||||
button: ElementHandle | null,
|
||||
workingSelector: string | null,
|
||||
updatedSelectors: string[]
|
||||
}> => {
|
||||
let updatedSelectors = [...selectors];
|
||||
|
||||
const startTime = Date.now();
|
||||
const MAX_BUTTON_SEARCH_TIME = 15000;
|
||||
let updatedSelectors = [...selectors];
|
||||
|
||||
for (let i = 0; i < selectors.length; i++) {
|
||||
if (Date.now() - startTime > MAX_BUTTON_SEARCH_TIME) {
|
||||
debugLog(`Button search timeout reached (${MAX_BUTTON_SEARCH_TIME}ms), aborting`);
|
||||
break;
|
||||
}
|
||||
const selector = selectors[i];
|
||||
let retryCount = 0;
|
||||
let selectorSuccess = false;
|
||||
|
||||
while (retryCount < MAX_RETRIES && !selectorSuccess) {
|
||||
try {
|
||||
const button = await page.waitForSelector(selector, {
|
||||
state: 'attached',
|
||||
timeout: 10000
|
||||
});
|
||||
|
||||
const button = await waitForSelectorUniversal(selector, { timeout: 2000 });
|
||||
|
||||
if (button) {
|
||||
debugLog('Found working selector:', selector);
|
||||
return {
|
||||
button,
|
||||
return {
|
||||
button,
|
||||
workingSelector: selector,
|
||||
updatedSelectors
|
||||
updatedSelectors
|
||||
};
|
||||
} else {
|
||||
retryCount++;
|
||||
debugLog(`Selector "${selector}" not found: attempt ${retryCount}/${MAX_RETRIES}`);
|
||||
|
||||
if (retryCount < MAX_RETRIES) {
|
||||
await page.waitForTimeout(RETRY_DELAY);
|
||||
} else {
|
||||
debugLog(`Removing failed selector "${selector}" after ${MAX_RETRIES} attempts`);
|
||||
updatedSelectors = updatedSelectors.filter(s => s !== selector);
|
||||
selectorSuccess = true;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
retryCount++;
|
||||
debugLog(`Selector "${selector}" failed: attempt ${retryCount}/${MAX_RETRIES}`);
|
||||
|
||||
debugLog(`Selector "${selector}" error: attempt ${retryCount}/${MAX_RETRIES} - ${error.message}`);
|
||||
|
||||
if (retryCount < MAX_RETRIES) {
|
||||
await page.waitForTimeout(RETRY_DELAY);
|
||||
} else {
|
||||
debugLog(`Removing failed selector "${selector}" after ${MAX_RETRIES} attempts`);
|
||||
updatedSelectors = updatedSelectors.filter(s => s !== selector);
|
||||
selectorSuccess = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1347,9 +1424,35 @@ export default class Interpreter extends EventEmitter {
|
||||
}
|
||||
|
||||
private async ensureScriptsLoaded(page: Page) {
|
||||
const isScriptLoaded = await page.evaluate(() => typeof window.scrape === 'function' && typeof window.scrapeSchema === 'function' && typeof window.scrapeList === 'function' && typeof window.scrapeListAuto === 'function' && typeof window.scrollDown === 'function' && typeof window.scrollUp === 'function');
|
||||
if (!isScriptLoaded) {
|
||||
await page.addInitScript({ path: path.join(__dirname, 'browserSide', 'scraper.js') });
|
||||
try {
|
||||
const evaluationPromise = page.evaluate(() =>
|
||||
typeof window.scrape === 'function' &&
|
||||
typeof window.scrapeSchema === 'function' &&
|
||||
typeof window.scrapeList === 'function' &&
|
||||
typeof window.scrapeListAuto === 'function' &&
|
||||
typeof window.scrollDown === 'function' &&
|
||||
typeof window.scrollUp === 'function'
|
||||
);
|
||||
|
||||
const timeoutPromise = new Promise<boolean>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Script check timeout')), 3000)
|
||||
);
|
||||
|
||||
const isScriptLoaded = await Promise.race([
|
||||
evaluationPromise,
|
||||
timeoutPromise
|
||||
]);
|
||||
|
||||
if (!isScriptLoaded) {
|
||||
await page.addInitScript({ path: path.join(__dirname, 'browserSide', 'scraper.js') });
|
||||
}
|
||||
} catch (error) {
|
||||
this.log(`Script check failed, adding script anyway: ${error.message}`, Level.WARN);
|
||||
try {
|
||||
await page.addInitScript({ path: path.join(__dirname, 'browserSide', 'scraper.js') });
|
||||
} catch (scriptError) {
|
||||
this.log(`Failed to add script: ${scriptError.message}`, Level.ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -710,8 +710,8 @@ async function executeRun(id: string, userId: string) {
|
||||
retries: 5,
|
||||
};
|
||||
|
||||
processAirtableUpdates();
|
||||
processGoogleSheetUpdates();
|
||||
processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`));
|
||||
processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`));
|
||||
} catch (err: any) {
|
||||
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
|
||||
}
|
||||
|
||||
@@ -77,7 +77,11 @@ export const createRemoteBrowserForRun = (userId: string): string => {
|
||||
|
||||
logger.log('info', `createRemoteBrowserForRun: Reserved slot ${id} for user ${userId}`);
|
||||
|
||||
initializeBrowserAsync(id, userId);
|
||||
initializeBrowserAsync(id, userId)
|
||||
.catch((error: any) => {
|
||||
logger.log('error', `Unhandled error in initializeBrowserAsync for browser ${id}: ${error.message}`);
|
||||
browserPool.failBrowserSlot(id);
|
||||
});
|
||||
|
||||
return id;
|
||||
};
|
||||
@@ -110,7 +114,16 @@ export const destroyRemoteBrowser = async (id: string, userId: string): Promise<
|
||||
} catch (switchOffError) {
|
||||
logger.log('warn', `Error switching off browser ${id}: ${switchOffError}`);
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
const namespace = io.of(id);
|
||||
namespace.removeAllListeners();
|
||||
namespace.disconnectSockets(true);
|
||||
logger.log('debug', `Cleaned up socket namespace for browser ${id}`);
|
||||
} catch (namespaceCleanupError: any) {
|
||||
logger.log('warn', `Error cleaning up socket namespace for browser ${id}: ${namespaceCleanupError.message}`);
|
||||
}
|
||||
|
||||
return browserPool.deleteRemoteBrowser(id);
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
@@ -273,11 +286,27 @@ const initializeBrowserAsync = async (id: string, userId: string) => {
|
||||
}
|
||||
|
||||
logger.log('debug', `Starting browser initialization for ${id}`);
|
||||
await browserSession.initialize(userId);
|
||||
logger.log('debug', `Browser initialization completed for ${id}`);
|
||||
|
||||
|
||||
try {
|
||||
await browserSession.initialize(userId);
|
||||
logger.log('debug', `Browser initialization completed for ${id}`);
|
||||
} catch (initError: any) {
|
||||
try {
|
||||
await browserSession.switchOff();
|
||||
logger.log('info', `Cleaned up failed browser initialization for ${id}`);
|
||||
} catch (cleanupError: any) {
|
||||
logger.log('error', `Failed to cleanup browser ${id}: ${cleanupError.message}`);
|
||||
}
|
||||
throw initError;
|
||||
}
|
||||
|
||||
const upgraded = browserPool.upgradeBrowserSlot(id, browserSession);
|
||||
if (!upgraded) {
|
||||
try {
|
||||
await browserSession.switchOff();
|
||||
} catch (cleanupError: any) {
|
||||
logger.log('error', `Failed to cleanup browser after slot upgrade failure: ${cleanupError.message}`);
|
||||
}
|
||||
throw new Error('Failed to upgrade reserved browser slot');
|
||||
}
|
||||
|
||||
|
||||
@@ -102,8 +102,8 @@ async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Pr
|
||||
retries: 5,
|
||||
};
|
||||
|
||||
processAirtableUpdates();
|
||||
processGoogleSheetUpdates();
|
||||
processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`));
|
||||
processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`));
|
||||
} catch (err: any) {
|
||||
logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`);
|
||||
}
|
||||
|
||||
@@ -37,6 +37,12 @@ const pool = new Pool({
|
||||
database: process.env.DB_NAME,
|
||||
password: process.env.DB_PASSWORD,
|
||||
port: process.env.DB_PORT ? parseInt(process.env.DB_PORT, 10) : undefined,
|
||||
max: 50,
|
||||
min: 5,
|
||||
idleTimeoutMillis: 30000,
|
||||
connectionTimeoutMillis: 10000,
|
||||
maxUses: 7500,
|
||||
allowExitOnIdle: true
|
||||
});
|
||||
|
||||
const PgSession = connectPgSimple(session);
|
||||
@@ -215,6 +221,22 @@ if (require.main === module) {
|
||||
});
|
||||
}
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
logger.log('error', `Unhandled promise rejection at: ${promise}, reason: ${reason}`);
|
||||
console.error('Unhandled promise rejection:', reason);
|
||||
});
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
logger.log('error', `Uncaught exception: ${error.message}`, { stack: error.stack });
|
||||
console.error('Uncaught exception:', error);
|
||||
|
||||
if (process.env.NODE_ENV === 'production') {
|
||||
setTimeout(() => {
|
||||
process.exit(1);
|
||||
}, 5000);
|
||||
}
|
||||
});
|
||||
|
||||
if (require.main === module) {
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('Main app shutting down...');
|
||||
|
||||
@@ -118,6 +118,23 @@ export class WorkflowInterpreter {
|
||||
*/
|
||||
private currentRunId: string | null = null;
|
||||
|
||||
/**
|
||||
* Batched persistence system for performance optimization
|
||||
*/
|
||||
private persistenceBuffer: Array<{
|
||||
actionType: string;
|
||||
data: any;
|
||||
listIndex?: number;
|
||||
timestamp: number;
|
||||
creditValidated: boolean;
|
||||
}> = [];
|
||||
|
||||
private persistenceTimer: NodeJS.Timeout | null = null;
|
||||
private readonly BATCH_SIZE = 5;
|
||||
private readonly BATCH_TIMEOUT = 3000;
|
||||
private persistenceInProgress = false;
|
||||
private persistenceRetryCount = 0;
|
||||
|
||||
/**
|
||||
* An array of id's of the pairs from the workflow that are about to be paused.
|
||||
* As "breakpoints".
|
||||
@@ -303,13 +320,39 @@ export class WorkflowInterpreter {
|
||||
|
||||
await this.interpreter.stop();
|
||||
this.socket.emit('log', '----- The interpretation has been stopped -----', false);
|
||||
this.clearState();
|
||||
await this.clearState();
|
||||
} else {
|
||||
logger.log('error', 'Cannot stop: No active interpretation.');
|
||||
}
|
||||
};
|
||||
|
||||
private clearState = () => {
|
||||
public clearState = async (): Promise<void> => {
|
||||
if (this.persistenceBuffer.length > 0) {
|
||||
try {
|
||||
await this.flushPersistenceBuffer();
|
||||
logger.log('debug', 'Successfully flushed final persistence buffer during cleanup');
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to flush final persistence buffer: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.persistenceTimer) {
|
||||
clearTimeout(this.persistenceTimer);
|
||||
this.persistenceTimer = null;
|
||||
}
|
||||
|
||||
if (this.interpreter) {
|
||||
try {
|
||||
if (!this.interpreter.getIsAborted()) {
|
||||
this.interpreter.abort();
|
||||
}
|
||||
await this.interpreter.stop();
|
||||
logger.log('debug', 'mx-cloud interpreter properly stopped during cleanup');
|
||||
} catch (error: any) {
|
||||
logger.log('warn', `Error stopping mx-cloud interpreter during cleanup: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.debugMessages = [];
|
||||
this.interpretationIsPaused = false;
|
||||
this.activeId = null;
|
||||
@@ -324,6 +367,9 @@ export class WorkflowInterpreter {
|
||||
this.binaryData = [];
|
||||
this.currentScrapeListIndex = 0;
|
||||
this.currentRunId = null;
|
||||
this.persistenceBuffer = [];
|
||||
this.persistenceInProgress = false;
|
||||
this.persistenceRetryCount = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -336,61 +382,22 @@ export class WorkflowInterpreter {
|
||||
};
|
||||
|
||||
/**
|
||||
* Persists data to database in real-time during interpretation
|
||||
* Persists extracted data to database with intelligent batching for performance
|
||||
* Falls back to immediate persistence for critical operations
|
||||
* @private
|
||||
*/
|
||||
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
|
||||
if (!this.currentRunId) {
|
||||
logger.log('debug', 'No run ID available for real-time persistence');
|
||||
logger.log('debug', 'No run ID available for persistence');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const run = await Run.findOne({ where: { runId: this.currentRunId } });
|
||||
|
||||
if (!run) {
|
||||
logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentSerializableOutput = run.serializableOutput ?
|
||||
JSON.parse(JSON.stringify(run.serializableOutput)) :
|
||||
{ scrapeSchema: [], scrapeList: [] };
|
||||
this.addToPersistenceBatch(actionType, data, listIndex, true);
|
||||
|
||||
if (actionType === 'scrapeSchema') {
|
||||
const newSchemaData = Array.isArray(data) ? data : [data];
|
||||
const updatedOutput = {
|
||||
...currentSerializableOutput,
|
||||
scrapeSchema: newSchemaData
|
||||
};
|
||||
|
||||
await run.update({
|
||||
serializableOutput: updatedOutput
|
||||
});
|
||||
|
||||
logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`);
|
||||
|
||||
} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
|
||||
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
|
||||
currentSerializableOutput.scrapeList = [];
|
||||
}
|
||||
|
||||
const updatedList = [...currentSerializableOutput.scrapeList];
|
||||
updatedList[listIndex] = data;
|
||||
|
||||
const updatedOutput = {
|
||||
...currentSerializableOutput,
|
||||
scrapeList: updatedList
|
||||
};
|
||||
|
||||
await run.update({
|
||||
serializableOutput: updatedOutput
|
||||
});
|
||||
|
||||
logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`);
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`);
|
||||
if (actionType === 'scrapeSchema' || this.persistenceBuffer.length >= this.BATCH_SIZE) {
|
||||
await this.flushPersistenceBuffer();
|
||||
} else {
|
||||
this.scheduleBatchFlush();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -548,7 +555,6 @@ export class WorkflowInterpreter {
|
||||
}
|
||||
|
||||
logger.log('debug', `Interpretation finished`);
|
||||
this.clearState();
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -569,4 +575,121 @@ export class WorkflowInterpreter {
|
||||
this.socket = socket;
|
||||
this.subscribeToPausing();
|
||||
};
|
||||
|
||||
/**
|
||||
* Adds data to persistence buffer for batched processing
|
||||
* @private
|
||||
*/
|
||||
private addToPersistenceBatch(actionType: string, data: any, listIndex?: number, creditValidated: boolean = false): void {
|
||||
this.persistenceBuffer.push({
|
||||
actionType,
|
||||
data,
|
||||
listIndex,
|
||||
timestamp: Date.now(),
|
||||
creditValidated
|
||||
});
|
||||
|
||||
logger.log('debug', `Added ${actionType} to persistence buffer (${this.persistenceBuffer.length} items)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a batched flush if not already scheduled
|
||||
* @private
|
||||
*/
|
||||
private scheduleBatchFlush(): void {
|
||||
if (!this.persistenceTimer && !this.persistenceInProgress) {
|
||||
this.persistenceTimer = setTimeout(async () => {
|
||||
await this.flushPersistenceBuffer();
|
||||
}, this.BATCH_TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes persistence buffer to database in a single transaction
|
||||
* @private
|
||||
*/
|
||||
private async flushPersistenceBuffer(): Promise<void> {
|
||||
if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.persistenceTimer) {
|
||||
clearTimeout(this.persistenceTimer);
|
||||
this.persistenceTimer = null;
|
||||
}
|
||||
|
||||
this.persistenceInProgress = true;
|
||||
const batchToProcess = [...this.persistenceBuffer];
|
||||
this.persistenceBuffer = [];
|
||||
|
||||
try {
|
||||
const sequelize = require('../../storage/db').default;
|
||||
await sequelize.transaction(async (transaction: any) => {
|
||||
const { Run } = require('../../models');
|
||||
const run = await Run.findOne({
|
||||
where: { runId: this.currentRunId! },
|
||||
transaction
|
||||
});
|
||||
|
||||
if (!run) {
|
||||
logger.log('warn', `Run not found for batched persistence: ${this.currentRunId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentSerializableOutput = run.serializableOutput ?
|
||||
JSON.parse(JSON.stringify(run.serializableOutput)) :
|
||||
{ scrapeSchema: [], scrapeList: [] };
|
||||
|
||||
let hasUpdates = false;
|
||||
|
||||
for (const item of batchToProcess) {
|
||||
if (item.actionType === 'scrapeSchema') {
|
||||
const newSchemaData = Array.isArray(item.data) ? item.data : [item.data];
|
||||
currentSerializableOutput.scrapeSchema = newSchemaData;
|
||||
hasUpdates = true;
|
||||
} else if (item.actionType === 'scrapeList' && typeof item.listIndex === 'number') {
|
||||
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
|
||||
currentSerializableOutput.scrapeList = [];
|
||||
}
|
||||
currentSerializableOutput.scrapeList[item.listIndex] = item.data;
|
||||
hasUpdates = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasUpdates) {
|
||||
await run.update({
|
||||
serializableOutput: currentSerializableOutput
|
||||
}, { transaction });
|
||||
|
||||
logger.log('debug', `Batched persistence: Updated run ${this.currentRunId} with ${batchToProcess.length} items`);
|
||||
}
|
||||
});
|
||||
|
||||
this.persistenceRetryCount = 0;
|
||||
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to flush persistence buffer for run ${this.currentRunId}: ${error.message}`);
|
||||
|
||||
if (!this.persistenceRetryCount) {
|
||||
this.persistenceRetryCount = 0;
|
||||
}
|
||||
|
||||
if (this.persistenceRetryCount < 3) {
|
||||
this.persistenceBuffer.unshift(...batchToProcess);
|
||||
this.persistenceRetryCount++;
|
||||
|
||||
const backoffDelay = Math.min(5000 * Math.pow(2, this.persistenceRetryCount), 30000);
|
||||
setTimeout(async () => {
|
||||
await this.flushPersistenceBuffer();
|
||||
}, backoffDelay);
|
||||
|
||||
logger.log('warn', `Scheduling persistence retry ${this.persistenceRetryCount}/3 in ${backoffDelay}ms`);
|
||||
} else {
|
||||
logger.log('error', `Max persistence retries exceeded for run ${this.currentRunId}, dropping ${batchToProcess.length} items`);
|
||||
this.persistenceRetryCount = 0;
|
||||
}
|
||||
} finally {
|
||||
this.persistenceInProgress = false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -454,33 +454,36 @@ function isValidUrl(str: string): boolean {
|
||||
}
|
||||
|
||||
export const processAirtableUpdates = async () => {
|
||||
while (true) {
|
||||
const maxProcessingTime = 60000;
|
||||
const startTime = Date.now();
|
||||
|
||||
while (Date.now() - startTime < maxProcessingTime) {
|
||||
let hasPendingTasks = false;
|
||||
|
||||
|
||||
for (const runId in airtableUpdateTasks) {
|
||||
const task = airtableUpdateTasks[runId];
|
||||
|
||||
|
||||
if (task.status === 'pending') {
|
||||
hasPendingTasks = true;
|
||||
console.log(`Processing Airtable update for run: ${runId}`);
|
||||
|
||||
|
||||
try {
|
||||
await updateAirtable(task.robotId, task.runId);
|
||||
console.log(`Successfully updated Airtable for runId: ${runId}`);
|
||||
airtableUpdateTasks[runId].status = 'completed';
|
||||
delete airtableUpdateTasks[runId];
|
||||
delete airtableUpdateTasks[runId];
|
||||
} catch (error: any) {
|
||||
console.error(`Failed to update Airtable for run ${task.runId}:`, error);
|
||||
|
||||
|
||||
if (task.retries < MAX_RETRIES) {
|
||||
airtableUpdateTasks[runId].retries += 1;
|
||||
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries + 1}`);
|
||||
} else {
|
||||
airtableUpdateTasks[runId].status = 'failed';
|
||||
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
|
||||
logger.log('error', `Permanent failure for run ${runId}: ${error.message}`);
|
||||
console.log(`Max retries reached for runId: ${runId}. Removing task.`);
|
||||
delete airtableUpdateTasks[runId];
|
||||
}
|
||||
}
|
||||
} else if (task.status === 'completed' || task.status === 'failed') {
|
||||
delete airtableUpdateTasks[runId];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,8 +491,10 @@ export const processAirtableUpdates = async () => {
|
||||
console.log('No pending Airtable update tasks, exiting processor');
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
console.log('Waiting for 5 seconds before checking again...');
|
||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||
}
|
||||
|
||||
console.log('Airtable processing completed or timed out');
|
||||
};
|
||||
@@ -286,8 +286,12 @@ export async function writeDataToSheet(
|
||||
}
|
||||
|
||||
export const processGoogleSheetUpdates = async () => {
|
||||
while (true) {
|
||||
const maxProcessingTime = 60000;
|
||||
const startTime = Date.now();
|
||||
|
||||
while (Date.now() - startTime < maxProcessingTime) {
|
||||
let hasPendingTasks = false;
|
||||
|
||||
for (const runId in googleSheetUpdateTasks) {
|
||||
const task = googleSheetUpdateTasks[runId];
|
||||
console.log(`Processing task for runId: ${runId}, status: ${task.status}`);
|
||||
@@ -297,7 +301,6 @@ export const processGoogleSheetUpdates = async () => {
|
||||
try {
|
||||
await updateGoogleSheet(task.robotId, task.runId);
|
||||
console.log(`Successfully updated Google Sheet for runId: ${runId}`);
|
||||
googleSheetUpdateTasks[runId].status = 'completed';
|
||||
delete googleSheetUpdateTasks[runId];
|
||||
} catch (error: any) {
|
||||
console.error(`Failed to update Google Sheets for run ${task.runId}:`, error);
|
||||
@@ -305,10 +308,12 @@ export const processGoogleSheetUpdates = async () => {
|
||||
googleSheetUpdateTasks[runId].retries += 1;
|
||||
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`);
|
||||
} else {
|
||||
googleSheetUpdateTasks[runId].status = 'failed';
|
||||
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
|
||||
console.log(`Max retries reached for runId: ${runId}. Removing task.`);
|
||||
delete googleSheetUpdateTasks[runId];
|
||||
}
|
||||
}
|
||||
} else if (task.status === 'completed' || task.status === 'failed') {
|
||||
delete googleSheetUpdateTasks[runId];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,4 +325,6 @@ export const processGoogleSheetUpdates = async () => {
|
||||
console.log('Waiting for 5 seconds before checking again...');
|
||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||
}
|
||||
|
||||
console.log('Google Sheets processing completed or timed out');
|
||||
};
|
||||
@@ -277,8 +277,8 @@ async function executeRun(id: string, userId: string) {
|
||||
retries: 5,
|
||||
};
|
||||
|
||||
processAirtableUpdates();
|
||||
processGoogleSheetUpdates();
|
||||
processAirtableUpdates().catch(err => logger.log('error', `Airtable update error: ${err.message}`));
|
||||
processGoogleSheetUpdates().catch(err => logger.log('error', `Google Sheets update error: ${err.message}`));
|
||||
} catch (err: any) {
|
||||
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user