feat: add webhook support for scheduler

This commit is contained in:
Rohit
2025-05-27 23:37:36 +05:30
parent c02a09ff6e
commit adbbacb870

View File

@@ -13,6 +13,8 @@ import { BinaryOutputService } from "../../storage/mino";
import { capture } from "../../utils/analytics"; import { capture } from "../../utils/analytics";
import { WorkflowFile } from "maxun-core"; import { WorkflowFile } from "maxun-core";
import { Page } from "playwright"; import { Page } from "playwright";
import { sendWebhook } from "../../routes/webhook";
import { airtableUpdateTasks, processAirtableUpdates } from "../integrations/airtable";
chromium.use(stealthPlugin()); chromium.use(stealthPlugin());
async function createWorkflowAndStoreMetadata(id: string, userId: string) { async function createWorkflowAndStoreMetadata(id: string, userId: string) {
@@ -152,26 +154,34 @@ async function executeRun(id: string, userId: string) {
binaryOutput: uploadedBinaryOutput, binaryOutput: uploadedBinaryOutput,
}); });
let totalRowsExtracted = 0; // Track extraction metrics
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0; let extractedScreenshotsCount = 0;
let extractedItemsCount = 0;
if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) { if (categorizedOutput.scrapeSchema) {
extractedScreenshotsCount = 1; Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
} }
if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) { if (categorizedOutput.scrapeList) {
const itemsArray = run.dataValues.serializableOutput["item-0"]; Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
extractedItemsCount = itemsArray.length; if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
totalRowsExtracted = itemsArray.reduce((total, item) => { }
return total + Object.keys(item).length; });
}, 0);
} }
console.log(`Extracted Items Count: ${extractedItemsCount}`); if (uploadedBinaryOutput) {
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
console.log(`Total Rows Extracted: ${totalRowsExtracted}`); }
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
capture( capture(
'maxun-oss-run-created-scheduled', 'maxun-oss-run-created-scheduled',
@@ -180,18 +190,60 @@ async function executeRun(id: string, userId: string) {
created_at: new Date().toISOString(), created_at: new Date().toISOString(),
status: 'success', status: 'success',
totalRowsExtracted, totalRowsExtracted,
extractedItemsCount, schemaItemsExtracted: totalSchemaItemsExtracted,
listItemsExtracted: totalListItemsExtracted,
extractedScreenshotsCount, extractedScreenshotsCount,
} }
); );
googleSheetUpdateTasks[id] = { const webhookPayload = {
robotId: plainRun.robotMetaId, robot_id: plainRun.robotMetaId,
runId: id, run_id: plainRun.runId,
status: 'pending', robot_name: recording.recording_meta.name,
retries: 5, status: 'success',
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
extracted_data: {
captured_texts: categorizedOutput.scrapeSchema["schema_merged"] || [],
captured_lists: categorizedOutput.scrapeList,
total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount
},
metadata: {
browser_id: plainRun.browserId,
user_id: userId
}
}; };
processGoogleSheetUpdates();
try {
await sendWebhook(plainRun.robotMetaId, 'run_completed', webhookPayload);
logger.log('info', `Webhooks sent successfully for completed run ${plainRun.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send webhooks for run ${plainRun.runId}: ${webhookError.message}`);
}
try {
googleSheetUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
airtableUpdateTasks[plainRun.runId] = {
robotId: plainRun.robotMetaId,
runId: plainRun.runId,
status: 'pending',
retries: 5,
};
processAirtableUpdates();
processGoogleSheetUpdates();
} catch (err: any) {
logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`);
}
return true; return true;
} catch (error: any) { } catch (error: any) {
logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`); logger.log('info', `Error while running a robot with id: ${id} - ${error.message}`);
@@ -202,6 +254,31 @@ async function executeRun(id: string, userId: string) {
status: 'failed', status: 'failed',
finishedAt: new Date().toLocaleString(), finishedAt: new Date().toLocaleString(),
}); });
// Trigger webhooks for run failure
const failedWebhookPayload = {
robot_id: run.robotMetaId,
run_id: run.runId,
robot_name: 'Unknown Robot',
status: 'failed',
started_at: run.startedAt,
finished_at: new Date().toLocaleString(),
error: {
message: "Failed: Recording not found",
type: 'RecodingNotFoundError'
},
metadata: {
browser_id: run.browserId,
user_id: userId,
}
};
try {
await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload);
logger.log('info', `Failure webhooks sent successfully for run ${run.runId}`);
} catch (webhookError: any) {
logger.log('error', `Failed to send failure webhooks for run ${run.runId}: ${webhookError.message}`);
}
} }
capture( capture(
'maxun-oss-run-created-scheduled', 'maxun-oss-run-created-scheduled',