feat: run and abort categorize data by action type

This commit is contained in:
Rohit
2025-04-27 15:27:52 +05:30
parent cd4820f818
commit bd5087ef6c

View File

@@ -255,7 +255,6 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
return { success: true };
}
// Process the results
const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);
@@ -264,36 +263,57 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
return { success: true };
}
// Update the run record with results
const categorizedOutput = {
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
scrapeList: interpretationInfo.scrapeListOutput || {},
other: interpretationInfo.otherOutput || {}
};
await run.update({
...run,
status: 'success',
finishedAt: new Date().toLocaleString(),
browserId: plainRun.browserId,
log: interpretationInfo.log.join('\n'),
serializableOutput: interpretationInfo.serializableOutput,
serializableOutput: {
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
scrapeList: Object.values(categorizedOutput.scrapeList),
other: Object.values(categorizedOutput.other),
},
binaryOutput: uploadedBinaryOutput,
});
// Track extraction metrics
let totalRowsExtracted = 0;
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
let extractedItemsCount = 0;
if (run.dataValues.binaryOutput && run.dataValues.binaryOutput["item-0"]) {
extractedScreenshotsCount = 1;
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
if (run.dataValues.serializableOutput && run.dataValues.serializableOutput["item-0"]) {
const itemsArray = run.dataValues.serializableOutput["item-0"];
extractedItemsCount = itemsArray.length;
totalRowsExtracted = itemsArray.reduce((total, item) => {
return total + Object.keys(item).length;
}, 0);
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
console.log(`Extracted Items Count: ${extractedItemsCount}`);
if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
}
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;
console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`);
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
@@ -306,7 +326,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
created_at: new Date().toISOString(),
status: 'success',
totalRowsExtracted,
extractedItemsCount,
schemaItemsExtracted: totalSchemaItemsExtracted,
listItemsExtracted: totalListItemsExtracted,
extractedScreenshotsCount,
}
);
@@ -339,7 +360,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
robotName: recording.recording_meta.name,
status: 'success',
finishedAt: new Date().toLocaleString()
});;
});
// Check for and process queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
@@ -458,7 +479,11 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
}
let currentLog = 'Run aborted by user';
let serializableOutput: Record<string, any> = {};
let categorizedOutput = {
scrapeSchema: {},
scrapeList: {},
other: {}
};
let binaryOutput: Record<string, any> = {};
try {
@@ -467,16 +492,16 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
currentLog = browser.interpreter.debugMessages.join('\n') || currentLog;
}
if (browser.interpreter.serializableData) {
browser.interpreter.serializableData.forEach((item, index) => {
serializableOutput[`item-${index}`] = item;
});
if (browser.interpreter.serializableDataByType) {
categorizedOutput = {
scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []),
scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []),
other: collectDataByType(browser.interpreter.serializableDataByType.other || [])
};
}
if (browser.interpreter.binaryData) {
browser.interpreter.binaryData.forEach((item, index) => {
binaryOutput[`item-${index}`] = item;
});
binaryOutput = collectBinaryData(browser.interpreter.binaryData);
}
}
} catch (interpreterError) {
@@ -488,7 +513,11 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
finishedAt: new Date().toLocaleString(),
browserId: plainRun.browserId,
log: currentLog,
serializableOutput,
serializableOutput: {
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
scrapeList: Object.values(categorizedOutput.scrapeList),
other: Object.values(categorizedOutput.other),
},
binaryOutput,
});
@@ -529,6 +558,30 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
}
}
/**
* Helper function to collect data from arrays into indexed objects
* @param dataArray Array of data to be transformed into an object with indexed keys
* @returns Object with indexed keys
*/
function collectDataByType(dataArray: any[]): Record<string, any> {
return dataArray.reduce((result: Record<string, any>, item, index) => {
result[`item-${index}`] = item;
return result;
}, {});
}
/**
* Helper function to collect binary data (like screenshots)
* @param binaryDataArray Array of binary data objects to be transformed
* @returns Object with indexed keys
*/
function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record<string, any> {
return binaryDataArray.reduce((result: Record<string, any>, item, index) => {
result[`item-${index}`] = item;
return result;
}, {});
}
async function registerRunExecutionWorker() {
try {
const registeredUserQueues = new Map();