diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 1559e63c..d0d4884a 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -13,6 +13,8 @@ import { BinaryOutputService } from "../../storage/mino"; import { capture } from "../../utils/analytics"; import { WorkflowFile } from "maxun-core"; import { Page } from "playwright"; +import { sendWebhook } from "../../routes/webhook"; +import { airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable"; chromium.use(stealthPlugin()); async function createWorkflowAndStoreMetadata(id: string, userId: string) { @@ -152,26 +154,34 @@ async function executeRun(id: string, userId: string) { binaryOutput: uploadedBinaryOutput, }); - let totalRowsExtracted = 0; + // Track extraction metrics + let totalSchemaItemsExtracted = 0; + let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - let extractedItemsCount = 0; - - if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) { - extractedScreenshotsCount = 1; + + if (categorizedOutput.scrapeSchema) { + Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); } - - if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) { - const itemsArray = run.dataValues.serializableOutput["item-0"]; - extractedItemsCount = itemsArray.length; - - totalRowsExtracted = itemsArray.reduce((total, item) => { - return total + Object.keys(item).length; - }, 0); + + if (categorizedOutput.scrapeList) { + Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); } - - console.log(`Extracted Items Count: ${extractedItemsCount}`); - console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); - console.log(`Total Rows Extracted: ${totalRowsExtracted}`); + + if (uploadedBinaryOutput) { + extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; + } + + const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; capture( 'maxun-oss-run-created-scheduled', @@ -180,18 +190,60 @@ async function executeRun(id: string, userId: string) { created_at: new Date().toISOString(), status: 'success', totalRowsExtracted, - extractedItemsCount, + schemaItemsExtracted: totalSchemaItemsExtracted, + listItemsExtracted: totalListItemsExtracted, extractedScreenshotsCount, } ); - googleSheetUpdateTasks[id] = { - robotId: plainRun.robotMetaId, - runId: id, - status: 'pending', - retries: 5, + const webhookPayload = { + robot_id: plainRun.robotMetaId, + run_id: plainRun.runId, + robot_name: recording.recording_meta.name, + status: 'success', + started_at: plainRun.startedAt, + finished_at: new Date().toLocaleString(), + extracted_data: { + captured_texts: categorizedOutput.scrapeSchema["schema_merged"] || [], + captured_lists: categorizedOutput.scrapeList, + total_rows: totalRowsExtracted, + captured_texts_count: totalSchemaItemsExtracted, + captured_lists_count: totalListItemsExtracted, + screenshots_count: extractedScreenshotsCount + }, + metadata: { + browser_id: plainRun.browserId, + user_id: userId + } }; - processGoogleSheetUpdates(); + + try { + await sendWebhook(plainRun.robotMetaId, 'run_completed', webhookPayload); + logger.log('info', `Webhooks sent successfully for completed run ${plainRun.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send webhooks for run ${plainRun.runId}: ${webhookError.message}`); + } + + try { + googleSheetUpdateTasks[plainRun.runId] = { + robotId: plainRun.robotMetaId, + runId: plainRun.runId, + status: 'pending', + retries: 5, + }; + + airtableUpdateTasks[plainRun.runId] = { + robotId: plainRun.robotMetaId, + runId: plainRun.runId, + status: 'pending', + retries: 5, + }; + + processAirtableUpdates(); + processGoogleSheetUpdates(); + } catch (err: any) { + logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); + } return true; } catch (error: any) { logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); @@ -202,6 +254,31 @@ async function executeRun(id: string, userId: string) { status: 'failed', finishedAt: new Date().toLocaleString(), }); + + // Trigger webhooks for run failure + const failedWebhookPayload = { + robot_id: run.robotMetaId, + run_id: run.runId, + robot_name: 'Unknown Robot', + status: 'failed', + started_at: run.startedAt, + finished_at: new Date().toLocaleString(), + error: { + message: "Failed: Recording not found", + type: 'RecodingNotFoundError' + }, + metadata: { + browser_id: run.browserId, + user_id: userId, + } + }; + + try { + await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${run.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${run.runId}: ${webhookError.message}`); + } } capture( 'maxun-oss-run-created-scheduled',