From bdea3f9053b5e0165c824580bcb5c306d8fc6aaf Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 27 May 2025 23:18:10 +0530 Subject: [PATCH] feat: add webhook payloads --- server/src/pgboss-worker.ts | 85 ++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 54a70697..07e82ee4 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -20,6 +20,7 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-ma import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; +import { sendWebhook } from './routes/webhook'; if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) { throw new Error('Failed to start pgboss worker: one or more required environment variables are missing.'); @@ -203,6 +204,32 @@ async function processRunExecution(job: Job) { finishedAt: new Date().toLocaleString(), log: 'Failed: Recording not found', }); + + // Trigger webhooks for run failure + const failedWebhookPayload = { + robot_id: plainRun.robotMetaId, + run_id: data.runId, + robot_name: 'Unknown Robot', + status: 'failed', + started_at: plainRun.startedAt, + finished_at: new Date().toLocaleString(), + execution_time_ms: new Date().getTime() - new Date(plainRun.startedAt).getTime(), + error: { + message: "Failed: Recording not found", + type: 'RecodingNotFoundError' + }, + metadata: { + browser_id: plainRun.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); + } } // Check for queued runs even if this one failed @@ -314,7 +341,7 @@ async function processRunExecution(job: Job) { console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); console.log(`Total Rows Extracted: ${totalRowsExtracted}`); - + // Capture metrics capture( 'maxun-oss-run-created-manual', @@ -330,6 +357,35 @@ async function processRunExecution(job: Job) { } ); + // Trigger webhooks for run completion + const webhookPayload = { + robot_id: plainRun.robotMetaId, + run_id: data.runId, + robot_name: recording.recording_meta.name, + status: 'success', + started_at: plainRun.startedAt, + finished_at: new Date().toLocaleString(), + extracted_data: { + captured_texts: categorizedOutput.scrapeSchema["schema_merged"] || [], + captured_lists: categorizedOutput.scrapeList, + total_rows: totalRowsExtracted, + captured_texts_count: totalSchemaItemsExtracted, + captured_lists_count: totalListItemsExtracted, + screenshots_count: extractedScreenshotsCount + }, + metadata: { + browser_id: plainRun.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(plainRun.robotMetaId, 'run_completed', webhookPayload); + logger.log('info', `Webhooks sent successfully for completed run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send webhooks for run ${data.runId}: ${webhookError.message}`); + } + // Schedule updates for Google Sheets and Airtable try { googleSheetUpdateTasks[plainRun.runId] = { @@ -392,6 +448,33 @@ async function processRunExecution(job: Job) { error_message: executionError.message, } ); + + // Trigger webhooks for run failure + const failedWebhookPayload = { + robot_id: plainRun.robotMetaId, + run_id: data.runId, + robot_name: recording.recording_meta.name, + status: 'failed', + started_at: plainRun.startedAt, + finished_at: new Date().toLocaleString(), + execution_time_ms: new Date().getTime() - new Date(plainRun.startedAt).getTime(), + error: { + message: executionError.message, + stack: executionError.stack, + type: executionError.name || 'ExecutionError' + }, + metadata: { + browser_id: plainRun.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); + } } else { logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); }