diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 3ac993a7..b6cb1355 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -255,7 +255,6 @@ async function processRunExecution(job: Job) { return { success: true }; } - // Process the results const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); @@ -264,36 +263,57 @@ async function processRunExecution(job: Job) { return { success: true }; } - // Update the run record with results + const categorizedOutput = { + scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, + scrapeList: interpretationInfo.scrapeListOutput || {}, + other: interpretationInfo.otherOutput || {} + }; + await run.update({ ...run, status: 'success', finishedAt: new Date().toLocaleString(), browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), - serializableOutput: interpretationInfo.serializableOutput, + serializableOutput: { + scrapeSchema: Object.values(categorizedOutput.scrapeSchema), + scrapeList: Object.values(categorizedOutput.scrapeList), + other: Object.values(categorizedOutput.other), + }, binaryOutput: uploadedBinaryOutput, }); // Track extraction metrics - let totalRowsExtracted = 0; + let totalSchemaItemsExtracted = 0; + let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - let extractedItemsCount = 0; - - if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) { - extractedScreenshotsCount = 1; + + if (categorizedOutput.scrapeSchema) { + Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); } - - if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) { - const itemsArray = run.dataValues.serializableOutput["item-0"]; - extractedItemsCount = itemsArray.length; - - totalRowsExtracted = itemsArray.reduce((total, item) => { - return total + Object.keys(item).length; - }, 0); + + if (categorizedOutput.scrapeList) { + Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); } - - console.log(`Extracted Items Count: ${extractedItemsCount}`); + + if (uploadedBinaryOutput) { + extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; + } + + const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; + + console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`); + console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); console.log(`Total Rows Extracted: ${totalRowsExtracted}`); @@ -306,7 +326,8 @@ async function processRunExecution(job: Job) { created_at: new Date().toISOString(), status: 'success', totalRowsExtracted, - extractedItemsCount, + schemaItemsExtracted: totalSchemaItemsExtracted, + listItemsExtracted: totalListItemsExtracted, extractedScreenshotsCount, } ); @@ -339,7 +360,7 @@ async function processRunExecution(job: Job) { robotName: recording.recording_meta.name, status: 'success', finishedAt: new Date().toLocaleString() - });; + }); // Check for and process queued runs before destroying the browser const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId); @@ -458,7 +479,11 @@ async function abortRun(runId: string, userId: string): Promise { } let currentLog = 'Run aborted by user'; - let serializableOutput: Record = {}; + let categorizedOutput = { + scrapeSchema: {}, + scrapeList: {}, + other: {} + }; let binaryOutput: Record = {}; try { @@ -467,16 +492,16 @@ async function abortRun(runId: string, userId: string): Promise { currentLog = browser.interpreter.debugMessages.join('\n') || currentLog; } - if (browser.interpreter.serializableData) { - browser.interpreter.serializableData.forEach((item, index) => { - serializableOutput[`item-${index}`] = item; - }); + if (browser.interpreter.serializableDataByType) { + categorizedOutput = { + scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []), + scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []), + other: collectDataByType(browser.interpreter.serializableDataByType.other || []) + }; } if (browser.interpreter.binaryData) { - browser.interpreter.binaryData.forEach((item, index) => { - binaryOutput[`item-${index}`] = item; - }); + binaryOutput = collectBinaryData(browser.interpreter.binaryData); } } } catch (interpreterError) { @@ -488,7 +513,11 @@ async function abortRun(runId: string, userId: string): Promise { finishedAt: new Date().toLocaleString(), browserId: plainRun.browserId, log: currentLog, - serializableOutput, + serializableOutput: { + scrapeSchema: Object.values(categorizedOutput.scrapeSchema), + scrapeList: Object.values(categorizedOutput.scrapeList), + other: Object.values(categorizedOutput.other), + }, binaryOutput, }); @@ -529,6 +558,30 @@ async function abortRun(runId: string, userId: string): Promise { } } +/** + * Helper function to collect data from arrays into indexed objects + * @param dataArray Array of data to be transformed into an object with indexed keys + * @returns Object with indexed keys + */ +function collectDataByType(dataArray: any[]): Record { + return dataArray.reduce((result: Record, item, index) => { + result[`item-${index}`] = item; + return result; + }, {}); +} + +/** + * Helper function to collect binary data (like screenshots) + * @param binaryDataArray Array of binary data objects to be transformed + * @returns Object with indexed keys + */ +function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record { + return binaryDataArray.reduce((result: Record, item, index) => { + result[`item-${index}`] = item; + return result; + }, {}); +} + async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map();