diff --git a/server/src/api/record.ts b/server/src/api/record.ts index e05aa8ce..a10a1c43 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -597,65 +597,53 @@ async function executeRun(id: string, userId: string) { } const workflow = AddGeneratedFlags(recording.recording); + + browser.interpreter.setRunId(id); + const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - - const categorizedOutput = { - scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, - scrapeList: interpretationInfo.scrapeListOutput || {}, - }; - await destroyRemoteBrowser(plainRun.browserId, userId); const updatedRun = await run.update({ - ...run, status: 'success', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), - serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), - }, - binaryOutput: uploadedBinaryOutput, }); - let totalSchemaItemsExtracted = 0; - let totalListItemsExtracted = 0; - let extractedScreenshotsCount = 0; - - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalSchemaItemsExtracted += 1; - } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - totalListItemsExtracted += listResult.length; - } - }); - } - - if (uploadedBinaryOutput) { - extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; - } - - const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; - - console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`); - console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); - console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); - console.log(`Total Rows Extracted: ${totalRowsExtracted}`); + let totalSchemaItemsExtracted = 0; + let totalListItemsExtracted = 0; + let extractedScreenshotsCount = 0; + + const finalRun = await Run.findOne({ where: { runId: id } }); + if (finalRun) { + if (finalRun.serializableOutput) { + if (finalRun.serializableOutput.scrapeSchema) { + Object.values(finalRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); + } + + if (finalRun.serializableOutput.scrapeList) { + Object.values(finalRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); + } + } + + if (finalRun.binaryOutput) { + extractedScreenshotsCount = Object.keys(finalRun.binaryOutput).length; + } + } + + const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; capture('maxun-oss-run-created-api',{ runId: id, @@ -668,7 +656,6 @@ async function executeRun(id: string, userId: string) { } ) - // Trigger webhooks for run completion const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: plainRun.runId, @@ -677,8 +664,8 @@ async function executeRun(id: string, userId: string) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [], - captured_lists: categorizedOutput.scrapeList, + captured_texts: finalRun?.serializableOutput?.scrapeSchema ? Object.values(finalRun.serializableOutput.scrapeSchema).flat() : [], + captured_lists: finalRun?.serializableOutput?.scrapeList || {}, total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index c8baa9c0..fe30dbd8 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -14,11 +14,9 @@ import Run from './models/Run'; import Robot from './models/Robot'; import { browserPool } from './server'; import { Page } from 'playwright'; -import { BinaryOutputService } from './storage/mino'; import { capture } from './utils/analytics'; import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet'; import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; -import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; import { sendWebhook } from './routes/webhook'; @@ -85,107 +83,6 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; -/** - * Helper function to extract and process scraped data from browser interpreter - */ -async function extractAndProcessScrapedData( - browser: RemoteBrowser, - run: any -): Promise<{ - categorizedOutput: any; - uploadedBinaryOutput: any; - totalDataPointsExtracted: number; - totalSchemaItemsExtracted: number; - totalListItemsExtracted: number; - extractedScreenshotsCount: number; -}> { - let categorizedOutput: { - scrapeSchema: Record; - scrapeList: Record; - } = { - scrapeSchema: {}, - scrapeList: {} - }; - - if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) { - browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => { - categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem; - }); - } - - if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) { - browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => { - categorizedOutput.scrapeList[`list-${index}`] = listItem; - }); - } - - const binaryOutput = browser?.interpreter?.binaryData?.reduce( - (reducedObject: Record, item: any, index: number): Record => { - return { - [`item-${index}`]: item, - ...reducedObject, - }; - }, - {} - ) || {}; - - let totalDataPointsExtracted = 0; - let totalSchemaItemsExtracted = 0; - let totalListItemsExtracted = 0; - let extractedScreenshotsCount = 0; - - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - schemaResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; - } - }); - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalDataPointsExtracted += Object.keys(schemaResult).length; - totalSchemaItemsExtracted += 1; - } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - listResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; - } - }); - totalListItemsExtracted += listResult.length; - } - }); - } - - if (binaryOutput) { - extractedScreenshotsCount = Object.keys(binaryOutput).length; - totalDataPointsExtracted += extractedScreenshotsCount; - } - - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput( - run, - binaryOutput - ); - - return { - categorizedOutput: { - scrapeSchema: categorizedOutput.scrapeSchema || {}, - scrapeList: categorizedOutput.scrapeList || {} - }, - uploadedBinaryOutput, - totalDataPointsExtracted, - totalSchemaItemsExtracted, - totalListItemsExtracted, - extractedScreenshotsCount - }; -} // Helper function to handle integration updates async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { @@ -234,6 +131,11 @@ async function processRunExecution(job: Job) { return { success: true }; } + if (run.status === 'queued') { + logger.log('info', `Run ${data.runId} has status 'queued', skipping stale execution job - processQueuedRuns will handle it`); + return { success: true }; + } + const plainRun = run.toJSON(); const browserId = data.browserId || plainRun.browserId; @@ -309,6 +211,9 @@ async function processRunExecution(job: Job) { // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); + + browser.interpreter.setRunId(data.runId); + const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, @@ -326,79 +231,49 @@ async function processRunExecution(job: Job) { logger.log('info', `Workflow execution completed for run ${data.runId}`); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - - const categorizedOutput = { - scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, - scrapeList: interpretationInfo.scrapeListOutput || {} - }; - if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); return { success: true }; } await run.update({ - ...run, status: 'success', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: interpretationInfo.log.join('\n'), - serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), - }, - binaryOutput: uploadedBinaryOutput, + log: interpretationInfo.log.join('\n') }); - // Track extraction metrics - let totalDataPointsExtracted = 0; let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - schemaResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; + const updatedRun = await Run.findOne({ where: { runId: data.runId } }); + if (updatedRun) { + if (updatedRun.serializableOutput) { + if (updatedRun.serializableOutput.scrapeSchema) { + Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; } }); - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalDataPointsExtracted += Object.keys(schemaResult).length; - totalSchemaItemsExtracted += 1; } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - listResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; + + if (updatedRun.serializableOutput.scrapeList) { + Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; } }); - totalListItemsExtracted += listResult.length; } - }); - } - - if (uploadedBinaryOutput) { - extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; - totalDataPointsExtracted += extractedScreenshotsCount; + } + + if (updatedRun.binaryOutput) { + extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length; + } } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; - - console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`); - console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); - console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); - console.log(`Total Rows Extracted: ${totalRowsExtracted}`); - console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`); // Capture metrics capture( @@ -415,7 +290,6 @@ async function processRunExecution(job: Job) { } ); - // Trigger webhooks for run completion const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: data.runId, @@ -424,13 +298,12 @@ async function processRunExecution(job: Job) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [], - captured_lists: categorizedOutput.scrapeList, + captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [], + captured_lists: updatedRun?.serializableOutput?.scrapeList || {}, total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, screenshots_count: extractedScreenshotsCount, - total_data_points_extracted: totalDataPointsExtracted, }, metadata: { browser_id: plainRun.browserId, @@ -475,30 +348,18 @@ async function processRunExecution(job: Job) { }; try { - if (browser && browser.interpreter) { - const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0; - const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0; - const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0; + const hasData = (run.serializableOutput && + ((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) || + (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) || + (run.binaryOutput && Object.keys(run.binaryOutput).length > 0); - if (hasSchemaData || hasListData || hasBinaryData) { - logger.log('info', `Extracting partial data from failed run ${data.runId}`); - - partialData = await extractAndProcessScrapedData(browser, run); - - partialUpdateData.serializableOutput = { - scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema), - scrapeList: Object.values(partialData.categorizedOutput.scrapeList), - }; - partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput; - - partialDataExtracted = true; - logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`); - - await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); - } + if (hasData) { + logger.log('info', `Partial data found in failed run ${data.runId}, triggering integration updates`); + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); + partialDataExtracted = true; } - } catch (partialDataError: any) { - logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`); + } catch (dataCheckError: any) { + logger.log('warn', `Failed to check for partial data in run ${data.runId}: ${dataCheckError.message}`); } await run.update(partialUpdateData); @@ -652,7 +513,9 @@ async function processRunExecution(job: Job) { async function abortRun(runId: string, userId: string): Promise { try { - const run = await Run.findOne({ where: { runId: runId } }); + const run = await Run.findOne({ + where: { runId: runId } + }); if (!run) { logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); @@ -702,24 +565,18 @@ async function abortRun(runId: string, userId: string): Promise { return true; } - let currentLog = 'Run aborted by user'; - const extractedData = await extractAndProcessScrapedData(browser, run); - - console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`); - await run.update({ status: 'aborted', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: currentLog, - serializableOutput: { - scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema), - scrapeList: Object.values(extractedData.categorizedOutput.scrapeList), - }, - binaryOutput: extractedData.uploadedBinaryOutput, + log: 'Run aborted by user' }); - if (extractedData.totalDataPointsExtracted > 0) { + const hasData = (run.serializableOutput && + ((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) || + (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) || + (run.binaryOutput && Object.keys(run.binaryOutput).length > 0); + + if (hasData) { await triggerIntegrationUpdates(runId, plainRun.robotMetaId); } diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index b40e55f2..7dbafab6 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -106,6 +106,39 @@ async function executeRun(id: string, userId: string) { const plainRun = run.toJSON(); + if (run.status === 'aborted' || run.status === 'aborting') { + logger.log('info', `Scheduled Run ${id} has status ${run.status}, skipping execution`); + return { + success: false, + error: `Run has status ${run.status}` + } + } + + if (run.status === 'queued') { + logger.log('info', `Scheduled Run ${id} has status 'queued', skipping stale execution - will be handled by recovery`); + return { + success: false, + error: 'Run is queued and will be handled by recovery' + } + } + + const retryCount = plainRun.retryCount || 0; + if (retryCount >= 3) { + logger.log('warn', `Scheduled Run ${id} has exceeded max retries (${retryCount}/3), marking as failed`); + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId, userId }, raw: true }); + + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: plainRun.log ? `${plainRun.log}\nMax retries exceeded (3/3) - Run failed after multiple attempts.` : `Max retries exceeded (3/3) - Run failed after multiple attempts.` + }); + + return { + success: false, + error: 'Max retries exceeded' + } + } + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); if (!recording) { return { @@ -127,58 +160,52 @@ async function executeRun(id: string, userId: string) { } const workflow = AddGeneratedFlags(recording.recording); + + // Set run ID for real-time data persistence + browser.interpreter.setRunId(id); + const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - - const categorizedOutput = { - scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, - scrapeList: interpretationInfo.scrapeListOutput || {}, - }; - await destroyRemoteBrowser(plainRun.browserId, userId); await run.update({ - ...run, status: 'success', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), - serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), - }, - binaryOutput: uploadedBinaryOutput, }); - // Track extraction metrics + // Get metrics from persisted data for analytics and webhooks let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalSchemaItemsExtracted += 1; + const updatedRun = await Run.findOne({ where: { runId: id } }); + if (updatedRun) { + if (updatedRun.serializableOutput) { + if (updatedRun.serializableOutput.scrapeSchema) { + Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - totalListItemsExtracted += listResult.length; + + if (updatedRun.serializableOutput.scrapeList) { + Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); } - }); - } - - if (uploadedBinaryOutput) { - extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; + } + + if (updatedRun.binaryOutput) { + extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length; + } } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; @@ -204,8 +231,8 @@ async function executeRun(id: string, userId: string) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [], - captured_lists: categorizedOutput.scrapeList, + captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [], + captured_lists: updatedRun?.serializableOutput?.scrapeList || {}, total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted,