From d02545f6d9a9ba0e90352e727d02545c59b87e82 Mon Sep 17 00:00:00 2001 From: Rohit Date: Thu, 12 Jun 2025 10:33:49 +0530 Subject: [PATCH] feat: add partial data processing helper func --- server/src/pgboss-worker.ts | 107 +++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 3 deletions(-) diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 73bf4be4..d89fa04c 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -82,6 +82,108 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; +/** + * Helper function to extract and process scraped data from browser interpreter + */ +async function extractAndProcessScrapedData( + browser: RemoteBrowser, + run: any +): Promise<{ + categorizedOutput: any; + uploadedBinaryOutput: any; + totalDataPointsExtracted: number; + totalSchemaItemsExtracted: number; + totalListItemsExtracted: number; + extractedScreenshotsCount: number; +}> { + let categorizedOutput: { + scrapeSchema: Record; + scrapeList: Record; + } = { + scrapeSchema: {}, + scrapeList: {} + }; + + if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) { + browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => { + categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem; + }); + } + + if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) { + browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => { + categorizedOutput.scrapeList[`list-${index}`] = listItem; + }); + } + + const binaryOutput = browser?.interpreter?.binaryData?.reduce( + (reducedObject: Record, item: any, index: number): Record => { + return { + [`item-${index}`]: item, + ...reducedObject, + }; + }, + {} + ) || {}; + + let totalDataPointsExtracted = 0; + let totalSchemaItemsExtracted = 0; + let totalListItemsExtracted = 0; + let extractedScreenshotsCount = 0; + + if (categorizedOutput.scrapeSchema) { + Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + schemaResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalDataPointsExtracted += Object.keys(schemaResult).length; + totalSchemaItemsExtracted += 1; + } + }); + } + + if (categorizedOutput.scrapeList) { + Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + listResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); + totalListItemsExtracted += listResult.length; + } + }); + } + + if (binaryOutput) { + extractedScreenshotsCount = Object.keys(binaryOutput).length; + totalDataPointsExtracted += extractedScreenshotsCount; + } + + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput( + run, + binaryOutput + ); + + return { + categorizedOutput: { + scrapeSchema: categorizedOutput.scrapeSchema || {}, + scrapeList: categorizedOutput.scrapeList || {} + }, + uploadedBinaryOutput, + totalDataPointsExtracted, + totalSchemaItemsExtracted, + totalListItemsExtracted, + extractedScreenshotsCount + }; +} + /** * Modified processRunExecution function - only add browser reset */ @@ -156,8 +258,7 @@ async function processRunExecution(job: Job) { return { success: false }; } - try { - + try { const isRunAborted = async (): Promise => { const currentRun = await Run.findOne({ where: { runId: data.runId } }); return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; @@ -482,7 +583,7 @@ async function abortRun(runId: string, userId: string): Promise { } catch (socketError) { logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); } - + return true; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error);