Merge pull request #774 from getmaxun/extract-reliable
feat(maxun-core): extraction and platform stability
This commit is contained in:
@@ -64,6 +64,8 @@ export default class Interpreter extends EventEmitter {
|
||||
private concurrency: Concurrency;
|
||||
|
||||
private stopper: Function | null = null;
|
||||
|
||||
private isAborted: boolean = false;
|
||||
|
||||
private log: typeof log;
|
||||
|
||||
@@ -114,6 +116,13 @@ export default class Interpreter extends EventEmitter {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the abort flag to immediately stop all operations
|
||||
*/
|
||||
public abort(): void {
|
||||
this.isAborted = true;
|
||||
}
|
||||
|
||||
private async applyAdBlocker(page: Page): Promise<void> {
|
||||
if (this.blocker) {
|
||||
try {
|
||||
@@ -372,6 +381,11 @@ export default class Interpreter extends EventEmitter {
|
||||
* @param steps Array of actions.
|
||||
*/
|
||||
private async carryOutSteps(page: Page, steps: What[]): Promise<void> {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted, stopping execution', Level.WARN);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines overloaded (or added) methods/actions usable in the workflow.
|
||||
* If a method overloads any existing method of the Page class, it accepts the same set
|
||||
@@ -433,6 +447,11 @@ export default class Interpreter extends EventEmitter {
|
||||
},
|
||||
|
||||
scrapeSchema: async (schema: Record<string, { selector: string; tag: string, attribute: string; shadow: string}>) => {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted, stopping scrapeSchema', Level.WARN);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.options.debugChannel?.setActionType) {
|
||||
this.options.debugChannel.setActionType('scrapeSchema');
|
||||
}
|
||||
@@ -468,6 +487,11 @@ export default class Interpreter extends EventEmitter {
|
||||
},
|
||||
|
||||
scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }) => {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted, stopping scrapeList', Level.WARN);
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.options.debugChannel?.setActionType) {
|
||||
this.options.debugChannel.setActionType('scrapeList');
|
||||
}
|
||||
@@ -622,6 +646,11 @@ export default class Interpreter extends EventEmitter {
|
||||
limit?: number,
|
||||
pagination: any
|
||||
}) {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted, stopping pagination', Level.WARN);
|
||||
return [];
|
||||
}
|
||||
|
||||
let allResults: Record<string, any>[] = [];
|
||||
let previousHeight = 0;
|
||||
let scrapedItems: Set<string> = new Set<string>();
|
||||
@@ -635,6 +664,12 @@ export default class Interpreter extends EventEmitter {
|
||||
};
|
||||
|
||||
const scrapeCurrentPage = async () => {
|
||||
// Check abort flag before scraping current page
|
||||
if (this.isAborted) {
|
||||
debugLog("Workflow aborted, stopping scrapeCurrentPage");
|
||||
return;
|
||||
}
|
||||
|
||||
const results = await page.evaluate((cfg) => window.scrapeList(cfg), config);
|
||||
const newResults = results.filter(item => {
|
||||
const uniqueKey = JSON.stringify(item);
|
||||
@@ -723,7 +758,12 @@ export default class Interpreter extends EventEmitter {
|
||||
let unchangedResultCounter = 0;
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
while (true) {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted during pagination loop', Level.WARN);
|
||||
return allResults;
|
||||
}
|
||||
|
||||
switch (config.pagination.type) {
|
||||
case 'scrollDown': {
|
||||
let previousResultCount = allResults.length;
|
||||
@@ -981,6 +1021,11 @@ export default class Interpreter extends EventEmitter {
|
||||
// const MAX_NO_NEW_ITEMS = 2;
|
||||
|
||||
while (true) {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted during pagination loop', Level.WARN);
|
||||
return allResults;
|
||||
}
|
||||
|
||||
// Find working button with retry mechanism
|
||||
const { button: loadMoreButton, workingSelector, updatedSelectors } = await findWorkingButton(availableSelectors);
|
||||
|
||||
@@ -1144,6 +1189,11 @@ export default class Interpreter extends EventEmitter {
|
||||
}
|
||||
|
||||
private async runLoop(p: Page, workflow: Workflow) {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted in runLoop', Level.WARN);
|
||||
return;
|
||||
}
|
||||
|
||||
let workflowCopy: Workflow = JSON.parse(JSON.stringify(workflow));
|
||||
|
||||
workflowCopy = this.removeSpecialSelectors(workflowCopy);
|
||||
@@ -1174,6 +1224,11 @@ export default class Interpreter extends EventEmitter {
|
||||
const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker
|
||||
|
||||
while (true) {
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted during step execution', Level.WARN);
|
||||
return;
|
||||
}
|
||||
|
||||
// Circuit breaker to prevent infinite loops
|
||||
if (++loopIterations > MAX_LOOP_ITERATIONS) {
|
||||
this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR);
|
||||
@@ -1256,6 +1311,11 @@ export default class Interpreter extends EventEmitter {
|
||||
}
|
||||
lastAction = action;
|
||||
|
||||
if (this.isAborted) {
|
||||
this.log('Workflow aborted before action execution', Level.WARN);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
console.log("Carrying out:", action.what);
|
||||
await this.carryOutSteps(p, action.what);
|
||||
|
||||
@@ -597,65 +597,53 @@ async function executeRun(id: string, userId: string) {
|
||||
}
|
||||
|
||||
const workflow = AddGeneratedFlags(recording.recording);
|
||||
|
||||
browser.interpreter.setRunId(id);
|
||||
|
||||
const interpretationInfo = await browser.interpreter.InterpretRecording(
|
||||
workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings
|
||||
);
|
||||
|
||||
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
|
||||
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
|
||||
|
||||
const categorizedOutput = {
|
||||
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
|
||||
scrapeList: interpretationInfo.scrapeListOutput || {},
|
||||
};
|
||||
|
||||
await destroyRemoteBrowser(plainRun.browserId, userId);
|
||||
|
||||
const updatedRun = await run.update({
|
||||
...run,
|
||||
status: 'success',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
browserId: plainRun.browserId,
|
||||
log: interpretationInfo.log.join('\n'),
|
||||
serializableOutput: {
|
||||
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
|
||||
scrapeList: Object.values(categorizedOutput.scrapeList),
|
||||
},
|
||||
binaryOutput: uploadedBinaryOutput,
|
||||
});
|
||||
|
||||
let totalSchemaItemsExtracted = 0;
|
||||
let totalListItemsExtracted = 0;
|
||||
let extractedScreenshotsCount = 0;
|
||||
|
||||
if (categorizedOutput.scrapeSchema) {
|
||||
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalSchemaItemsExtracted += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (categorizedOutput.scrapeList) {
|
||||
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
totalListItemsExtracted += listResult.length;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (uploadedBinaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
|
||||
}
|
||||
|
||||
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
|
||||
|
||||
console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`);
|
||||
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
|
||||
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
|
||||
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
|
||||
let totalSchemaItemsExtracted = 0;
|
||||
let totalListItemsExtracted = 0;
|
||||
let extractedScreenshotsCount = 0;
|
||||
|
||||
const finalRun = await Run.findOne({ where: { runId: id } });
|
||||
if (finalRun) {
|
||||
if (finalRun.serializableOutput) {
|
||||
if (finalRun.serializableOutput.scrapeSchema) {
|
||||
Object.values(finalRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalSchemaItemsExtracted += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (finalRun.serializableOutput.scrapeList) {
|
||||
Object.values(finalRun.serializableOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
totalListItemsExtracted += listResult.length;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (finalRun.binaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(finalRun.binaryOutput).length;
|
||||
}
|
||||
}
|
||||
|
||||
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
|
||||
|
||||
capture('maxun-oss-run-created-api',{
|
||||
runId: id,
|
||||
@@ -668,7 +656,6 @@ async function executeRun(id: string, userId: string) {
|
||||
}
|
||||
)
|
||||
|
||||
// Trigger webhooks for run completion
|
||||
const webhookPayload = {
|
||||
robot_id: plainRun.robotMetaId,
|
||||
run_id: plainRun.runId,
|
||||
@@ -677,8 +664,8 @@ async function executeRun(id: string, userId: string) {
|
||||
started_at: plainRun.startedAt,
|
||||
finished_at: new Date().toLocaleString(),
|
||||
extracted_data: {
|
||||
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
|
||||
captured_lists: categorizedOutput.scrapeList,
|
||||
captured_texts: finalRun?.serializableOutput?.scrapeSchema ? Object.values(finalRun.serializableOutput.scrapeSchema).flat() : [],
|
||||
captured_lists: finalRun?.serializableOutput?.scrapeList || {},
|
||||
total_rows: totalRowsExtracted,
|
||||
captured_texts_count: totalSchemaItemsExtracted,
|
||||
captured_lists_count: totalListItemsExtracted,
|
||||
|
||||
@@ -36,6 +36,14 @@ interface BrowserPoolInfo {
|
||||
* Can be "reserved", "initializing", "ready" or "failed".
|
||||
*/
|
||||
status?: "reserved" | "initializing" | "ready" | "failed",
|
||||
/**
|
||||
* Timestamp when the browser slot was created/reserved
|
||||
*/
|
||||
createdAt?: number,
|
||||
/**
|
||||
* Timestamp when the browser was last accessed
|
||||
*/
|
||||
lastAccessed?: number,
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -66,6 +74,12 @@ export class BrowserPool {
|
||||
*/
|
||||
private userToBrowserMap: Map<string, string[]> = new Map();
|
||||
|
||||
/**
|
||||
* Locks for atomic operations to prevent race conditions
|
||||
* Key format: "userId-state", Value: timestamp when lock was acquired
|
||||
*/
|
||||
private reservationLocks: Map<string, number> = new Map();
|
||||
|
||||
/**
|
||||
* Adds a remote browser instance to the pool for a specific user.
|
||||
* If the user already has two browsers, the oldest browser will be closed and replaced.
|
||||
@@ -570,7 +584,7 @@ export class BrowserPool {
|
||||
};
|
||||
|
||||
/**
|
||||
* Reserves a browser slot immediately without creating the actual browser.
|
||||
* Reserves a browser slot atomically to prevent race conditions.
|
||||
* This ensures slot counting is accurate for rapid successive requests.
|
||||
*
|
||||
* @param id browser ID to reserve
|
||||
@@ -578,31 +592,65 @@ export class BrowserPool {
|
||||
* @param state browser state ("recording" or "run")
|
||||
* @returns true if slot was reserved, false if user has reached limit
|
||||
*/
|
||||
public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => {
|
||||
// Check if user has available slots first
|
||||
if (!this.hasAvailableBrowserSlots(userId, state)) {
|
||||
logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`);
|
||||
public reserveBrowserSlotAtomic = (id: string, userId: string, state: BrowserState = "run"): boolean => {
|
||||
const lockKey = `${userId}-${state}`;
|
||||
|
||||
if (this.reservationLocks.has(lockKey)) {
|
||||
logger.log('debug', `Reservation already in progress for user ${userId} state ${state}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
this.reservationLocks.set(lockKey, Date.now());
|
||||
|
||||
if (!this.hasAvailableBrowserSlots(userId, state)) {
|
||||
logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Reserve the slot with null browser
|
||||
this.pool[id] = {
|
||||
browser: null,
|
||||
active: false,
|
||||
userId,
|
||||
state,
|
||||
status: "reserved"
|
||||
};
|
||||
if (this.pool[id]) {
|
||||
logger.log('debug', `Browser slot ${id} already exists`);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Update the user-to-browser mapping
|
||||
let userBrowserIds = this.userToBrowserMap.get(userId) || [];
|
||||
if (!userBrowserIds.includes(id)) {
|
||||
userBrowserIds.push(id);
|
||||
this.userToBrowserMap.set(userId, userBrowserIds);
|
||||
const now = Date.now();
|
||||
|
||||
this.pool[id] = {
|
||||
browser: null,
|
||||
active: false,
|
||||
userId,
|
||||
state,
|
||||
status: "reserved",
|
||||
createdAt: now,
|
||||
lastAccessed: now
|
||||
};
|
||||
|
||||
const userBrowserIds = this.userToBrowserMap.get(userId) || [];
|
||||
if (!userBrowserIds.includes(id)) {
|
||||
userBrowserIds.push(id);
|
||||
this.userToBrowserMap.set(userId, userBrowserIds);
|
||||
}
|
||||
|
||||
logger.log('info', `Atomically reserved browser slot ${id} for user ${userId} in state ${state}`);
|
||||
return true;
|
||||
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Error during atomic slot reservation: ${error.message}`);
|
||||
if (this.pool[id] && this.pool[id].status === "reserved") {
|
||||
this.deleteRemoteBrowser(id);
|
||||
}
|
||||
return false;
|
||||
} finally {
|
||||
this.reservationLocks.delete(lockKey);
|
||||
}
|
||||
};
|
||||
|
||||
logger.log('info', `Reserved browser slot ${id} for user ${userId} in state ${state}`);
|
||||
return true;
|
||||
/**
|
||||
* Legacy method - kept for backwards compatibility but now uses atomic version
|
||||
* @deprecated Use reserveBrowserSlotAtomic instead
|
||||
*/
|
||||
public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => {
|
||||
return this.reserveBrowserSlotAtomic(id, userId, state);
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -630,17 +678,89 @@ export class BrowserPool {
|
||||
};
|
||||
|
||||
/**
|
||||
* Marks a reserved slot as failed and removes it.
|
||||
* Marks a reserved slot as failed and removes it with proper cleanup.
|
||||
*
|
||||
* @param id browser ID to mark as failed
|
||||
*/
|
||||
public failBrowserSlot = (id: string): void => {
|
||||
if (this.pool[id]) {
|
||||
logger.log('info', `Marking browser slot ${id} as failed`);
|
||||
|
||||
// Attempt to cleanup browser resources before deletion
|
||||
const browserInfo = this.pool[id];
|
||||
if (browserInfo.browser) {
|
||||
try {
|
||||
// Try to close browser if it exists
|
||||
browserInfo.browser.switchOff?.().catch((error: any) => {
|
||||
logger.log('warn', `Error closing failed browser ${id}: ${error.message}`);
|
||||
});
|
||||
} catch (error: any) {
|
||||
logger.log('warn', `Error during browser cleanup for ${id}: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.deleteRemoteBrowser(id);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Cleanup stale browser slots that have been in reserved/initializing state too long
|
||||
* This prevents resource leaks from failed initializations
|
||||
*/
|
||||
public cleanupStaleBrowserSlots = (): void => {
|
||||
const now = Date.now();
|
||||
const staleThreshold = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
const staleSlots: string[] = [];
|
||||
|
||||
for (const [id, info] of Object.entries(this.pool)) {
|
||||
const isStale = info.status === "reserved" || info.status === "initializing";
|
||||
const createdAt = info.createdAt || 0;
|
||||
const age = now - createdAt;
|
||||
|
||||
if (isStale && info.browser === null && age > staleThreshold) {
|
||||
staleSlots.push(id);
|
||||
}
|
||||
}
|
||||
|
||||
staleSlots.forEach(id => {
|
||||
const info = this.pool[id];
|
||||
logger.log('warn', `Cleaning up stale browser slot ${id} with status ${info.status}, age: ${Math.round((now - (info.createdAt || 0)) / 1000)}s`);
|
||||
this.failBrowserSlot(id);
|
||||
});
|
||||
|
||||
if (staleSlots.length > 0) {
|
||||
logger.log('info', `Cleaned up ${staleSlots.length} stale browser slots`);
|
||||
}
|
||||
|
||||
this.cleanupStaleReservationLocks();
|
||||
};
|
||||
|
||||
/**
|
||||
* Cleans up reservation locks that are older than 1 minute
|
||||
* This prevents locks from being held indefinitely due to crashes
|
||||
*/
|
||||
private cleanupStaleReservationLocks = (): void => {
|
||||
const now = Date.now();
|
||||
const lockTimeout = 60 * 1000; // 1 minute
|
||||
|
||||
const staleLocks: string[] = [];
|
||||
|
||||
for (const [lockKey, timestamp] of this.reservationLocks.entries()) {
|
||||
if (now - timestamp > lockTimeout) {
|
||||
staleLocks.push(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
staleLocks.forEach(lockKey => {
|
||||
this.reservationLocks.delete(lockKey);
|
||||
});
|
||||
|
||||
if (staleLocks.length > 0) {
|
||||
logger.log('warn', `Cleaned up ${staleLocks.length} stale reservation locks`);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Gets the current status of a browser slot.
|
||||
*
|
||||
@@ -653,4 +773,22 @@ export class BrowserPool {
|
||||
}
|
||||
return this.pool[id].status || null;
|
||||
};
|
||||
|
||||
/**
|
||||
* Returns all browser instances in the pool.
|
||||
* Used for cleanup operations like graceful shutdown.
|
||||
*
|
||||
* @returns Map of browser IDs to browser instances
|
||||
*/
|
||||
public getAllBrowsers = (): Map<string, RemoteBrowser> => {
|
||||
const browsers = new Map<string, RemoteBrowser>();
|
||||
|
||||
for (const [id, info] of Object.entries(this.pool)) {
|
||||
if (info.browser) {
|
||||
browsers.set(id, info.browser);
|
||||
}
|
||||
}
|
||||
|
||||
return browsers;
|
||||
};
|
||||
}
|
||||
@@ -25,6 +25,7 @@ interface RunAttributes {
|
||||
runByAPI?: boolean;
|
||||
serializableOutput: Record<string, any[]>;
|
||||
binaryOutput: Record<string, string>;
|
||||
retryCount?: number;
|
||||
}
|
||||
|
||||
interface RunCreationAttributes extends Optional<RunAttributes, 'id'> { }
|
||||
@@ -46,6 +47,7 @@ class Run extends Model<RunAttributes, RunCreationAttributes> implements RunAttr
|
||||
public runByAPI!: boolean;
|
||||
public serializableOutput!: Record<string, any[]>;
|
||||
public binaryOutput!: Record<string, any>;
|
||||
public retryCount!: number;
|
||||
}
|
||||
|
||||
Run.init(
|
||||
@@ -120,6 +122,11 @@ Run.init(
|
||||
allowNull: true,
|
||||
defaultValue: {},
|
||||
},
|
||||
retryCount: {
|
||||
type: DataTypes.INTEGER,
|
||||
allowNull: true,
|
||||
defaultValue: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
sequelize,
|
||||
|
||||
@@ -14,11 +14,9 @@ import Run from './models/Run';
|
||||
import Robot from './models/Robot';
|
||||
import { browserPool } from './server';
|
||||
import { Page } from 'playwright';
|
||||
import { BinaryOutputService } from './storage/mino';
|
||||
import { capture } from './utils/analytics';
|
||||
import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet';
|
||||
import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable';
|
||||
import { RemoteBrowser } from './browser-management/classes/RemoteBrowser';
|
||||
import { io as serverIo } from "./server";
|
||||
import { sendWebhook } from './routes/webhook';
|
||||
|
||||
@@ -85,107 +83,6 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
|
||||
return copy;
|
||||
};
|
||||
|
||||
/**
|
||||
* Helper function to extract and process scraped data from browser interpreter
|
||||
*/
|
||||
async function extractAndProcessScrapedData(
|
||||
browser: RemoteBrowser,
|
||||
run: any
|
||||
): Promise<{
|
||||
categorizedOutput: any;
|
||||
uploadedBinaryOutput: any;
|
||||
totalDataPointsExtracted: number;
|
||||
totalSchemaItemsExtracted: number;
|
||||
totalListItemsExtracted: number;
|
||||
extractedScreenshotsCount: number;
|
||||
}> {
|
||||
let categorizedOutput: {
|
||||
scrapeSchema: Record<string, any>;
|
||||
scrapeList: Record<string, any>;
|
||||
} = {
|
||||
scrapeSchema: {},
|
||||
scrapeList: {}
|
||||
};
|
||||
|
||||
if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) {
|
||||
browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => {
|
||||
categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem;
|
||||
});
|
||||
}
|
||||
|
||||
if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) {
|
||||
browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => {
|
||||
categorizedOutput.scrapeList[`list-${index}`] = listItem;
|
||||
});
|
||||
}
|
||||
|
||||
const binaryOutput = browser?.interpreter?.binaryData?.reduce(
|
||||
(reducedObject: Record<string, any>, item: any, index: number): Record<string, any> => {
|
||||
return {
|
||||
[`item-${index}`]: item,
|
||||
...reducedObject,
|
||||
};
|
||||
},
|
||||
{}
|
||||
) || {};
|
||||
|
||||
let totalDataPointsExtracted = 0;
|
||||
let totalSchemaItemsExtracted = 0;
|
||||
let totalListItemsExtracted = 0;
|
||||
let extractedScreenshotsCount = 0;
|
||||
|
||||
if (categorizedOutput.scrapeSchema) {
|
||||
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
schemaResult.forEach(obj => {
|
||||
if (obj && typeof obj === 'object') {
|
||||
totalDataPointsExtracted += Object.keys(obj).length;
|
||||
}
|
||||
});
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalDataPointsExtracted += Object.keys(schemaResult).length;
|
||||
totalSchemaItemsExtracted += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (categorizedOutput.scrapeList) {
|
||||
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
listResult.forEach(obj => {
|
||||
if (obj && typeof obj === 'object') {
|
||||
totalDataPointsExtracted += Object.keys(obj).length;
|
||||
}
|
||||
});
|
||||
totalListItemsExtracted += listResult.length;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (binaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(binaryOutput).length;
|
||||
totalDataPointsExtracted += extractedScreenshotsCount;
|
||||
}
|
||||
|
||||
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
|
||||
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(
|
||||
run,
|
||||
binaryOutput
|
||||
);
|
||||
|
||||
return {
|
||||
categorizedOutput: {
|
||||
scrapeSchema: categorizedOutput.scrapeSchema || {},
|
||||
scrapeList: categorizedOutput.scrapeList || {}
|
||||
},
|
||||
uploadedBinaryOutput,
|
||||
totalDataPointsExtracted,
|
||||
totalSchemaItemsExtracted,
|
||||
totalListItemsExtracted,
|
||||
extractedScreenshotsCount
|
||||
};
|
||||
}
|
||||
|
||||
// Helper function to handle integration updates
|
||||
async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise<void> {
|
||||
@@ -234,6 +131,11 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
if (run.status === 'queued') {
|
||||
logger.log('info', `Run ${data.runId} has status 'queued', skipping stale execution job - processQueuedRuns will handle it`);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
const plainRun = run.toJSON();
|
||||
const browserId = data.browserId || plainRun.browserId;
|
||||
|
||||
@@ -309,6 +211,9 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
|
||||
// Execute the workflow
|
||||
const workflow = AddGeneratedFlags(recording.recording);
|
||||
|
||||
browser.interpreter.setRunId(data.runId);
|
||||
|
||||
const interpretationInfo = await browser.interpreter.InterpretRecording(
|
||||
workflow,
|
||||
currentPage,
|
||||
@@ -326,79 +231,49 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
|
||||
logger.log('info', `Workflow execution completed for run ${data.runId}`);
|
||||
|
||||
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
|
||||
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
|
||||
|
||||
const categorizedOutput = {
|
||||
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
|
||||
scrapeList: interpretationInfo.scrapeListOutput || {}
|
||||
};
|
||||
|
||||
if (await isRunAborted()) {
|
||||
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
|
||||
return { success: true };
|
||||
}
|
||||
|
||||
await run.update({
|
||||
...run,
|
||||
status: 'success',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
browserId: plainRun.browserId,
|
||||
log: interpretationInfo.log.join('\n'),
|
||||
serializableOutput: {
|
||||
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
|
||||
scrapeList: Object.values(categorizedOutput.scrapeList),
|
||||
},
|
||||
binaryOutput: uploadedBinaryOutput,
|
||||
log: interpretationInfo.log.join('\n')
|
||||
});
|
||||
|
||||
// Track extraction metrics
|
||||
let totalDataPointsExtracted = 0;
|
||||
let totalSchemaItemsExtracted = 0;
|
||||
let totalListItemsExtracted = 0;
|
||||
let extractedScreenshotsCount = 0;
|
||||
|
||||
if (categorizedOutput.scrapeSchema) {
|
||||
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
schemaResult.forEach(obj => {
|
||||
if (obj && typeof obj === 'object') {
|
||||
totalDataPointsExtracted += Object.keys(obj).length;
|
||||
const updatedRun = await Run.findOne({ where: { runId: data.runId } });
|
||||
if (updatedRun) {
|
||||
if (updatedRun.serializableOutput) {
|
||||
if (updatedRun.serializableOutput.scrapeSchema) {
|
||||
Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalSchemaItemsExtracted += 1;
|
||||
}
|
||||
});
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalDataPointsExtracted += Object.keys(schemaResult).length;
|
||||
totalSchemaItemsExtracted += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (categorizedOutput.scrapeList) {
|
||||
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
listResult.forEach(obj => {
|
||||
if (obj && typeof obj === 'object') {
|
||||
totalDataPointsExtracted += Object.keys(obj).length;
|
||||
|
||||
if (updatedRun.serializableOutput.scrapeList) {
|
||||
Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
totalListItemsExtracted += listResult.length;
|
||||
}
|
||||
});
|
||||
totalListItemsExtracted += listResult.length;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (uploadedBinaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
|
||||
totalDataPointsExtracted += extractedScreenshotsCount;
|
||||
}
|
||||
|
||||
if (updatedRun.binaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length;
|
||||
}
|
||||
}
|
||||
|
||||
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
|
||||
|
||||
console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`);
|
||||
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
|
||||
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
|
||||
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
|
||||
console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`);
|
||||
|
||||
// Capture metrics
|
||||
capture(
|
||||
@@ -415,7 +290,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
}
|
||||
);
|
||||
|
||||
// Trigger webhooks for run completion
|
||||
const webhookPayload = {
|
||||
robot_id: plainRun.robotMetaId,
|
||||
run_id: data.runId,
|
||||
@@ -424,13 +298,12 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
started_at: plainRun.startedAt,
|
||||
finished_at: new Date().toLocaleString(),
|
||||
extracted_data: {
|
||||
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
|
||||
captured_lists: categorizedOutput.scrapeList,
|
||||
captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [],
|
||||
captured_lists: updatedRun?.serializableOutput?.scrapeList || {},
|
||||
total_rows: totalRowsExtracted,
|
||||
captured_texts_count: totalSchemaItemsExtracted,
|
||||
captured_lists_count: totalListItemsExtracted,
|
||||
screenshots_count: extractedScreenshotsCount,
|
||||
total_data_points_extracted: totalDataPointsExtracted,
|
||||
},
|
||||
metadata: {
|
||||
browser_id: plainRun.browserId,
|
||||
@@ -475,30 +348,18 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
};
|
||||
|
||||
try {
|
||||
if (browser && browser.interpreter) {
|
||||
const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0;
|
||||
const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0;
|
||||
const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0;
|
||||
const hasData = (run.serializableOutput &&
|
||||
((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
|
||||
(run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
|
||||
(run.binaryOutput && Object.keys(run.binaryOutput).length > 0);
|
||||
|
||||
if (hasSchemaData || hasListData || hasBinaryData) {
|
||||
logger.log('info', `Extracting partial data from failed run ${data.runId}`);
|
||||
|
||||
partialData = await extractAndProcessScrapedData(browser, run);
|
||||
|
||||
partialUpdateData.serializableOutput = {
|
||||
scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema),
|
||||
scrapeList: Object.values(partialData.categorizedOutput.scrapeList),
|
||||
};
|
||||
partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput;
|
||||
|
||||
partialDataExtracted = true;
|
||||
logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`);
|
||||
|
||||
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
|
||||
}
|
||||
if (hasData) {
|
||||
logger.log('info', `Partial data found in failed run ${data.runId}, triggering integration updates`);
|
||||
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
|
||||
partialDataExtracted = true;
|
||||
}
|
||||
} catch (partialDataError: any) {
|
||||
logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`);
|
||||
} catch (dataCheckError: any) {
|
||||
logger.log('warn', `Failed to check for partial data in run ${data.runId}: ${dataCheckError.message}`);
|
||||
}
|
||||
|
||||
await run.update(partialUpdateData);
|
||||
@@ -652,7 +513,9 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
|
||||
async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||
try {
|
||||
const run = await Run.findOne({ where: { runId: runId } });
|
||||
const run = await Run.findOne({
|
||||
where: { runId: runId }
|
||||
});
|
||||
|
||||
if (!run) {
|
||||
logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`);
|
||||
@@ -702,24 +565,18 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||
return true;
|
||||
}
|
||||
|
||||
let currentLog = 'Run aborted by user';
|
||||
const extractedData = await extractAndProcessScrapedData(browser, run);
|
||||
|
||||
console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`);
|
||||
|
||||
await run.update({
|
||||
status: 'aborted',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
browserId: plainRun.browserId,
|
||||
log: currentLog,
|
||||
serializableOutput: {
|
||||
scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema),
|
||||
scrapeList: Object.values(extractedData.categorizedOutput.scrapeList),
|
||||
},
|
||||
binaryOutput: extractedData.uploadedBinaryOutput,
|
||||
log: 'Run aborted by user'
|
||||
});
|
||||
|
||||
if (extractedData.totalDataPointsExtracted > 0) {
|
||||
const hasData = (run.serializableOutput &&
|
||||
((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
|
||||
(run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
|
||||
(run.binaryOutput && Object.keys(run.binaryOutput).length > 0);
|
||||
|
||||
if (hasData) {
|
||||
await triggerIntegrationUpdates(runId, plainRun.robotMetaId);
|
||||
}
|
||||
|
||||
@@ -751,9 +608,52 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||
}
|
||||
}
|
||||
|
||||
// Track registered queues globally for individual queue registration
|
||||
const registeredUserQueues = new Map();
|
||||
const registeredAbortQueues = new Map();
|
||||
|
||||
async function registerWorkerForQueue(queueName: string) {
|
||||
if (!registeredUserQueues.has(queueName)) {
|
||||
await pgBoss.work(queueName, async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
|
||||
try {
|
||||
const singleJob = Array.isArray(job) ? job[0] : job;
|
||||
return await processRunExecution(singleJob);
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Run execution job failed in ${queueName}: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
registeredUserQueues.set(queueName, true);
|
||||
logger.log('info', `Registered worker for queue: ${queueName}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function registerAbortWorkerForQueue(queueName: string) {
|
||||
if (!registeredAbortQueues.has(queueName)) {
|
||||
await pgBoss.work(queueName, async (job: Job<AbortRunData> | Job<AbortRunData>[]) => {
|
||||
try {
|
||||
const data = extractJobData(job);
|
||||
const { userId, runId } = data;
|
||||
|
||||
logger.log('info', `Processing abort request for run ${runId} by user ${userId}`);
|
||||
const success = await abortRun(runId, userId);
|
||||
return { success };
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Abort run job failed in ${queueName}: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
registeredAbortQueues.set(queueName, true);
|
||||
logger.log('info', `Registered abort worker for queue: ${queueName}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function registerRunExecutionWorker() {
|
||||
try {
|
||||
const registeredUserQueues = new Map();
|
||||
|
||||
// Worker for executing runs (Legacy)
|
||||
await pgBoss.work('execute-run', async (job: Job<ExecuteRunData> | Job<ExecuteRunData>[]) => {
|
||||
@@ -951,9 +851,6 @@ async function startWorkers() {
|
||||
}
|
||||
}
|
||||
|
||||
// Start all workers
|
||||
startWorkers();
|
||||
|
||||
pgBoss.on('error', (error) => {
|
||||
logger.log('error', `PgBoss error: ${error.message}`);
|
||||
});
|
||||
@@ -972,4 +869,4 @@ process.on('SIGINT', async () => {
|
||||
});
|
||||
|
||||
// For use in other files
|
||||
export { pgBoss };
|
||||
export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers };
|
||||
|
||||
@@ -17,7 +17,7 @@ import { capture } from "../utils/analytics";
|
||||
import { encrypt, decrypt } from '../utils/auth';
|
||||
import { WorkflowFile } from 'maxun-core';
|
||||
import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker';
|
||||
import { pgBoss } from '../pgboss-worker';
|
||||
import { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue } from '../pgboss-worker';
|
||||
chromium.use(stealthPlugin());
|
||||
|
||||
export const router = Router();
|
||||
@@ -573,6 +573,7 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) =>
|
||||
try {
|
||||
const userQueueName = `execute-run-user-${req.user.id}`;
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
await registerWorkerForQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
userId: req.user.id,
|
||||
@@ -690,6 +691,7 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re
|
||||
|
||||
// Queue the execution job
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
await registerWorkerForQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
userId: req.user.id,
|
||||
@@ -949,8 +951,20 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
|
||||
});
|
||||
}
|
||||
|
||||
// Immediately stop interpreter like cloud version
|
||||
try {
|
||||
const browser = browserPool.getRemoteBrowser(run.browserId);
|
||||
if (browser && browser.interpreter) {
|
||||
logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`);
|
||||
await browser.interpreter.stopInterpretation();
|
||||
}
|
||||
} catch (immediateStopError: any) {
|
||||
logger.log('warn', `Failed to immediately stop interpreter: ${immediateStopError.message}`);
|
||||
}
|
||||
|
||||
const userQueueName = `abort-run-user-${req.user.id}`;
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
await registerAbortWorkerForQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
userId: req.user.id,
|
||||
@@ -961,7 +975,7 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
|
||||
|
||||
return res.send({
|
||||
success: true,
|
||||
message: 'Abort signal sent',
|
||||
message: 'Run stopped immediately, cleanup queued',
|
||||
jobId,
|
||||
isQueued: false
|
||||
});
|
||||
@@ -1018,6 +1032,7 @@ async function processQueuedRuns() {
|
||||
|
||||
const userQueueName = `execute-run-user-${userId}`;
|
||||
await pgBoss.createQueue(userQueueName);
|
||||
await registerWorkerForQueue(userQueueName);
|
||||
|
||||
const jobId = await pgBoss.send(userQueueName, {
|
||||
userId: userId,
|
||||
@@ -1041,4 +1056,81 @@ async function processQueuedRuns() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Recovers orphaned runs that were left in "running" status due to instance crashes
|
||||
* This function runs on server startup to ensure data reliability
|
||||
*/
|
||||
export async function recoverOrphanedRuns() {
|
||||
try {
|
||||
logger.log('info', 'Starting recovery of orphaned runs...');
|
||||
|
||||
const orphanedRuns = await Run.findAll({
|
||||
where: {
|
||||
status: ['running', 'scheduled']
|
||||
},
|
||||
order: [['startedAt', 'ASC']]
|
||||
});
|
||||
|
||||
if (orphanedRuns.length === 0) {
|
||||
logger.log('info', 'No orphaned runs found');
|
||||
return;
|
||||
}
|
||||
|
||||
logger.log('info', `Found ${orphanedRuns.length} orphaned runs to recover (including scheduled runs)`);
|
||||
|
||||
for (const run of orphanedRuns) {
|
||||
try {
|
||||
const runData = run.toJSON();
|
||||
logger.log('info', `Recovering orphaned run: ${runData.runId}`);
|
||||
|
||||
const browser = browserPool.getRemoteBrowser(runData.browserId);
|
||||
|
||||
if (!browser) {
|
||||
const retryCount = runData.retryCount || 0;
|
||||
|
||||
if (retryCount < 3) {
|
||||
await run.update({
|
||||
status: 'queued',
|
||||
retryCount: retryCount + 1,
|
||||
serializableOutput: {},
|
||||
binaryOutput: {},
|
||||
browserId: undefined,
|
||||
log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash`
|
||||
});
|
||||
|
||||
logger.log('info', `Re-queued crashed run ${runData.runId} (retry ${retryCount + 1}/3)`);
|
||||
} else {
|
||||
const crashRecoveryMessage = `Max retries exceeded (3/3) - Run failed after multiple server crashes.`;
|
||||
|
||||
await run.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: runData.log ? `${runData.log}\n${crashRecoveryMessage}` : crashRecoveryMessage
|
||||
});
|
||||
|
||||
logger.log('warn', `Max retries reached for run ${runData.runId}, marked as permanently failed`);
|
||||
}
|
||||
|
||||
if (runData.browserId) {
|
||||
try {
|
||||
browserPool.deleteRemoteBrowser(runData.browserId);
|
||||
logger.log('info', `Cleaned up stale browser reference: ${runData.browserId}`);
|
||||
} catch (cleanupError: any) {
|
||||
logger.log('warn', `Failed to cleanup browser reference ${runData.browserId}: ${cleanupError.message}`);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.log('info', `Run ${runData.runId} browser still active, not orphaned`);
|
||||
}
|
||||
} catch (runError: any) {
|
||||
logger.log('error', `Failed to recover run ${run.runId}: ${runError.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
logger.log('info', `Orphaned run recovery completed. Processed ${orphanedRuns.length} runs.`);
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to recover orphaned runs: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
export { processQueuedRuns };
|
||||
|
||||
@@ -20,7 +20,8 @@ import connectPgSimple from 'connect-pg-simple';
|
||||
import pg from 'pg';
|
||||
import session from 'express-session';
|
||||
import Run from './models/Run';
|
||||
import { processQueuedRuns } from './routes/storage';
|
||||
import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage';
|
||||
import { startWorkers } from './pgboss-worker';
|
||||
|
||||
const app = express();
|
||||
app.use(cors({
|
||||
@@ -143,6 +144,12 @@ if (require.main === module) {
|
||||
await connectDB();
|
||||
await syncDB();
|
||||
|
||||
logger.log('info', 'Cleaning up stale browser slots...');
|
||||
browserPool.cleanupStaleBrowserSlots();
|
||||
|
||||
await recoverOrphanedRuns();
|
||||
await startWorkers();
|
||||
|
||||
io = new Server(server);
|
||||
|
||||
io.of('/queued-run').on('connection', (socket) => {
|
||||
@@ -211,20 +218,6 @@ if (require.main === module) {
|
||||
if (require.main === module) {
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('Main app shutting down...');
|
||||
try {
|
||||
await Run.update(
|
||||
{
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: 'Process interrupted during execution - worker shutdown'
|
||||
},
|
||||
{
|
||||
where: { status: 'running' }
|
||||
}
|
||||
);
|
||||
} catch (error: any) {
|
||||
console.error('Error updating runs:', error);
|
||||
}
|
||||
|
||||
try {
|
||||
console.log('Closing PostgreSQL connection pool...');
|
||||
|
||||
@@ -4,6 +4,7 @@ import { Socket } from "socket.io";
|
||||
import { Page } from "playwright";
|
||||
import { InterpreterSettings } from "../../types";
|
||||
import { decrypt } from "../../utils/auth";
|
||||
import Run from "../../models/Run";
|
||||
|
||||
/**
|
||||
* Decrypts any encrypted inputs in the workflow. If checkLimit is true, it will also handle the limit validation for scrapeList action.
|
||||
@@ -112,6 +113,11 @@ export class WorkflowInterpreter {
|
||||
*/
|
||||
private currentScrapeListIndex: number = 0;
|
||||
|
||||
/**
|
||||
* Current run ID for real-time persistence
|
||||
*/
|
||||
private currentRunId: string | null = null;
|
||||
|
||||
/**
|
||||
* An array of id's of the pairs from the workflow that are about to be paused.
|
||||
* As "breakpoints".
|
||||
@@ -128,10 +134,12 @@ export class WorkflowInterpreter {
|
||||
/**
|
||||
* A public constructor taking a socket instance for communication with the client.
|
||||
* @param socket Socket.io socket instance enabling communication with the client (frontend) side.
|
||||
* @param runId Optional run ID for real-time data persistence
|
||||
* @constructor
|
||||
*/
|
||||
constructor(socket: Socket) {
|
||||
constructor(socket: Socket, runId?: string) {
|
||||
this.socket = socket;
|
||||
this.currentRunId = runId || null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -202,8 +210,14 @@ export class WorkflowInterpreter {
|
||||
this.currentActionType = type;
|
||||
}
|
||||
},
|
||||
serializableCallback: (data: any) => {
|
||||
serializableCallback: async (data: any) => {
|
||||
if (this.currentActionType === 'scrapeSchema') {
|
||||
const cumulativeScrapeSchemaData = Array.isArray(data) && data.length > 0 ? data : [data];
|
||||
|
||||
if (cumulativeScrapeSchemaData.length > 0) {
|
||||
await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
|
||||
}
|
||||
|
||||
if (Array.isArray(data) && data.length > 0) {
|
||||
this.socket.emit('serializableCallback', {
|
||||
type: 'captureText',
|
||||
@@ -216,13 +230,24 @@ export class WorkflowInterpreter {
|
||||
});
|
||||
}
|
||||
} else if (this.currentActionType === 'scrapeList') {
|
||||
if (data && Array.isArray(data) && data.length > 0) {
|
||||
// Use the current index for persistence
|
||||
await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex);
|
||||
}
|
||||
|
||||
this.socket.emit('serializableCallback', {
|
||||
type: 'captureList',
|
||||
data
|
||||
});
|
||||
}
|
||||
},
|
||||
binaryCallback: (data: string, mimetype: string) => {
|
||||
binaryCallback: async (data: string, mimetype: string) => {
|
||||
const binaryItem = { mimetype, data: JSON.stringify(data) };
|
||||
this.binaryData.push(binaryItem);
|
||||
|
||||
// Persist binary data to database
|
||||
await this.persistBinaryDataToDatabase(binaryItem);
|
||||
|
||||
this.socket.emit('binaryCallback', {
|
||||
data,
|
||||
mimetype,
|
||||
@@ -272,6 +297,10 @@ export class WorkflowInterpreter {
|
||||
public stopInterpretation = async () => {
|
||||
if (this.interpreter) {
|
||||
logger.log('info', 'Stopping the interpretation.');
|
||||
|
||||
this.interpreter.abort();
|
||||
logger.log('info', 'maxun-core interpreter aborted - data collection stopped immediately');
|
||||
|
||||
await this.interpreter.stop();
|
||||
this.socket.emit('log', '----- The interpretation has been stopped -----', false);
|
||||
this.clearState();
|
||||
@@ -294,8 +323,115 @@ export class WorkflowInterpreter {
|
||||
};
|
||||
this.binaryData = [];
|
||||
this.currentScrapeListIndex = 0;
|
||||
this.currentRunId = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the current run ID for real-time persistence.
|
||||
* @param runId The run ID to set
|
||||
*/
|
||||
public setRunId = (runId: string): void => {
|
||||
this.currentRunId = runId;
|
||||
logger.log('debug', `Set run ID for real-time persistence: ${runId}`);
|
||||
};
|
||||
|
||||
/**
|
||||
* Persists data to database in real-time during interpretation
|
||||
* @private
|
||||
*/
|
||||
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
|
||||
if (!this.currentRunId) {
|
||||
logger.log('debug', 'No run ID available for real-time persistence');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const run = await Run.findOne({ where: { runId: this.currentRunId } });
|
||||
|
||||
if (!run) {
|
||||
logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentSerializableOutput = run.serializableOutput ?
|
||||
JSON.parse(JSON.stringify(run.serializableOutput)) :
|
||||
{ scrapeSchema: [], scrapeList: [] };
|
||||
|
||||
if (actionType === 'scrapeSchema') {
|
||||
const newSchemaData = Array.isArray(data) ? data : [data];
|
||||
const updatedOutput = {
|
||||
...currentSerializableOutput,
|
||||
scrapeSchema: newSchemaData
|
||||
};
|
||||
|
||||
await run.update({
|
||||
serializableOutput: updatedOutput
|
||||
});
|
||||
|
||||
logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`);
|
||||
|
||||
} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
|
||||
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
|
||||
currentSerializableOutput.scrapeList = [];
|
||||
}
|
||||
|
||||
const updatedList = [...currentSerializableOutput.scrapeList];
|
||||
updatedList[listIndex] = data;
|
||||
|
||||
const updatedOutput = {
|
||||
...currentSerializableOutput,
|
||||
scrapeList: updatedList
|
||||
};
|
||||
|
||||
await run.update({
|
||||
serializableOutput: updatedOutput
|
||||
});
|
||||
|
||||
logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`);
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Persists binary data to database in real-time
|
||||
* @private
|
||||
*/
|
||||
private persistBinaryDataToDatabase = async (binaryItem: { mimetype: string, data: string }): Promise<void> => {
|
||||
if (!this.currentRunId) {
|
||||
logger.log('debug', 'No run ID available for binary data persistence');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const run = await Run.findOne({ where: { runId: this.currentRunId } });
|
||||
if (!run) {
|
||||
logger.log('warn', `Run not found for binary data persistence: ${this.currentRunId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentBinaryOutput = run.binaryOutput ?
|
||||
JSON.parse(JSON.stringify(run.binaryOutput)) :
|
||||
{};
|
||||
|
||||
const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`;
|
||||
|
||||
const updatedBinaryOutput = {
|
||||
...currentBinaryOutput,
|
||||
[uniqueKey]: binaryItem
|
||||
};
|
||||
|
||||
await run.update({
|
||||
binaryOutput: updatedBinaryOutput
|
||||
});
|
||||
|
||||
logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.mimetype}`);
|
||||
} catch (error: any) {
|
||||
logger.log('error', `Failed to persist binary data in real-time for run ${this.currentRunId}: ${error.message}`);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Interprets the recording as a run.
|
||||
* @param workflow The workflow to interpret.
|
||||
@@ -333,7 +469,7 @@ export class WorkflowInterpreter {
|
||||
this.currentScrapeListIndex++;
|
||||
}
|
||||
},
|
||||
serializableCallback: (data: any) => {
|
||||
serializableCallback: async (data: any) => {
|
||||
if (this.currentActionType === 'scrapeSchema') {
|
||||
if (Array.isArray(data) && data.length > 0) {
|
||||
mergedScrapeSchema = { ...mergedScrapeSchema, ...data[0] };
|
||||
@@ -342,14 +478,29 @@ export class WorkflowInterpreter {
|
||||
mergedScrapeSchema = { ...mergedScrapeSchema, ...data };
|
||||
this.serializableDataByType.scrapeSchema.push([data]);
|
||||
}
|
||||
|
||||
// Persist the cumulative scrapeSchema data
|
||||
const cumulativeScrapeSchemaData = Object.keys(mergedScrapeSchema).length > 0 ? [mergedScrapeSchema] : [];
|
||||
if (cumulativeScrapeSchemaData.length > 0) {
|
||||
await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
|
||||
}
|
||||
} else if (this.currentActionType === 'scrapeList') {
|
||||
if (data && Array.isArray(data) && data.length > 0) {
|
||||
// Use the current index for persistence
|
||||
await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex);
|
||||
}
|
||||
this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data;
|
||||
}
|
||||
|
||||
this.socket.emit('serializableCallback', data);
|
||||
},
|
||||
binaryCallback: async (data: string, mimetype: string) => {
|
||||
this.binaryData.push({ mimetype, data: JSON.stringify(data) });
|
||||
const binaryItem = { mimetype, data: JSON.stringify(data) };
|
||||
this.binaryData.push(binaryItem);
|
||||
|
||||
// Persist binary data to database
|
||||
await this.persistBinaryDataToDatabase(binaryItem);
|
||||
|
||||
this.socket.emit('binaryCallback', { data, mimetype });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,6 +106,39 @@ async function executeRun(id: string, userId: string) {
|
||||
|
||||
const plainRun = run.toJSON();
|
||||
|
||||
if (run.status === 'aborted' || run.status === 'aborting') {
|
||||
logger.log('info', `Scheduled Run ${id} has status ${run.status}, skipping execution`);
|
||||
return {
|
||||
success: false,
|
||||
error: `Run has status ${run.status}`
|
||||
}
|
||||
}
|
||||
|
||||
if (run.status === 'queued') {
|
||||
logger.log('info', `Scheduled Run ${id} has status 'queued', skipping stale execution - will be handled by recovery`);
|
||||
return {
|
||||
success: false,
|
||||
error: 'Run is queued and will be handled by recovery'
|
||||
}
|
||||
}
|
||||
|
||||
const retryCount = plainRun.retryCount || 0;
|
||||
if (retryCount >= 3) {
|
||||
logger.log('warn', `Scheduled Run ${id} has exceeded max retries (${retryCount}/3), marking as failed`);
|
||||
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId, userId }, raw: true });
|
||||
|
||||
await run.update({
|
||||
status: 'failed',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: plainRun.log ? `${plainRun.log}\nMax retries exceeded (3/3) - Run failed after multiple attempts.` : `Max retries exceeded (3/3) - Run failed after multiple attempts.`
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'Max retries exceeded'
|
||||
}
|
||||
}
|
||||
|
||||
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
|
||||
if (!recording) {
|
||||
return {
|
||||
@@ -127,58 +160,52 @@ async function executeRun(id: string, userId: string) {
|
||||
}
|
||||
|
||||
const workflow = AddGeneratedFlags(recording.recording);
|
||||
|
||||
// Set run ID for real-time data persistence
|
||||
browser.interpreter.setRunId(id);
|
||||
|
||||
const interpretationInfo = await browser.interpreter.InterpretRecording(
|
||||
workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings
|
||||
);
|
||||
|
||||
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
|
||||
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
|
||||
|
||||
const categorizedOutput = {
|
||||
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
|
||||
scrapeList: interpretationInfo.scrapeListOutput || {},
|
||||
};
|
||||
|
||||
await destroyRemoteBrowser(plainRun.browserId, userId);
|
||||
|
||||
await run.update({
|
||||
...run,
|
||||
status: 'success',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
browserId: plainRun.browserId,
|
||||
log: interpretationInfo.log.join('\n'),
|
||||
serializableOutput: {
|
||||
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
|
||||
scrapeList: Object.values(categorizedOutput.scrapeList),
|
||||
},
|
||||
binaryOutput: uploadedBinaryOutput,
|
||||
});
|
||||
|
||||
// Track extraction metrics
|
||||
// Get metrics from persisted data for analytics and webhooks
|
||||
let totalSchemaItemsExtracted = 0;
|
||||
let totalListItemsExtracted = 0;
|
||||
let extractedScreenshotsCount = 0;
|
||||
|
||||
if (categorizedOutput.scrapeSchema) {
|
||||
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalSchemaItemsExtracted += 1;
|
||||
const updatedRun = await Run.findOne({ where: { runId: id } });
|
||||
if (updatedRun) {
|
||||
if (updatedRun.serializableOutput) {
|
||||
if (updatedRun.serializableOutput.scrapeSchema) {
|
||||
Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
|
||||
if (Array.isArray(schemaResult)) {
|
||||
totalSchemaItemsExtracted += schemaResult.length;
|
||||
} else if (schemaResult && typeof schemaResult === 'object') {
|
||||
totalSchemaItemsExtracted += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (categorizedOutput.scrapeList) {
|
||||
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
totalListItemsExtracted += listResult.length;
|
||||
|
||||
if (updatedRun.serializableOutput.scrapeList) {
|
||||
Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => {
|
||||
if (Array.isArray(listResult)) {
|
||||
totalListItemsExtracted += listResult.length;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (uploadedBinaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
|
||||
}
|
||||
|
||||
if (updatedRun.binaryOutput) {
|
||||
extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length;
|
||||
}
|
||||
}
|
||||
|
||||
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
|
||||
@@ -204,8 +231,8 @@ async function executeRun(id: string, userId: string) {
|
||||
started_at: plainRun.startedAt,
|
||||
finished_at: new Date().toLocaleString(),
|
||||
extracted_data: {
|
||||
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
|
||||
captured_lists: categorizedOutput.scrapeList,
|
||||
captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [],
|
||||
captured_lists: updatedRun?.serializableOutput?.scrapeList || {},
|
||||
total_rows: totalRowsExtracted,
|
||||
captured_texts_count: totalSchemaItemsExtracted,
|
||||
captured_lists_count: totalListItemsExtracted,
|
||||
|
||||
Reference in New Issue
Block a user