feat: recorder revamp server changes

This commit is contained in:
Rohit Rajan
2025-10-21 00:43:08 +05:30
parent eafe11aef4
commit 5be2b3175b
9 changed files with 879 additions and 369 deletions

View File

@@ -149,14 +149,20 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
let browser = browserPool.getRemoteBrowser(browserId);
const browserWaitStart = Date.now();
let lastLogTime = 0;
let pollAttempts = 0;
const MAX_POLL_ATTEMPTS = 15;
while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) {
while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT && pollAttempts < MAX_POLL_ATTEMPTS) {
const currentTime = Date.now();
pollAttempts++;
const browserStatus = browserPool.getBrowserStatus(browserId);
if (browserStatus === null) {
throw new Error(`Browser slot ${browserId} does not exist in pool`);
}
if (browserStatus === "failed") {
throw new Error(`Browser ${browserId} initialization failed`);
}
if (currentTime - lastLogTime > 10000) {
logger.log('info', `Browser ${browserId} not ready yet (status: ${browserStatus}), waiting... (${Math.round((currentTime - browserWaitStart) / 1000)}s elapsed)`);
@@ -183,17 +189,25 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
const isRunAborted = async (): Promise<boolean> => {
const currentRun = await Run.findOne({ where: { runId: data.runId } });
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
try {
const currentRun = await Run.findOne({ where: { runId: data.runId } });
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
} catch (error: any) {
logger.log('error', `Error checking if run ${data.runId} is aborted: ${error.message}`);
return false;
}
};
let currentPage = browser.getCurrentPage();
const pageWaitStart = Date.now();
let lastPageLogTime = 0;
let pageAttempts = 0;
const MAX_PAGE_ATTEMPTS = 15;
while (!currentPage && (Date.now() - pageWaitStart) < BROWSER_PAGE_TIMEOUT) {
while (!currentPage && (Date.now() - pageWaitStart) < BROWSER_PAGE_TIMEOUT && pageAttempts < MAX_PAGE_ATTEMPTS) {
const currentTime = Date.now();
pageAttempts++;
if (currentTime - lastPageLogTime > 5000) {
logger.log('info', `Page not ready for browser ${browserId}, waiting... (${Math.round((currentTime - pageWaitStart) / 1000)}s elapsed)`);
@@ -209,6 +223,26 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
logger.log('info', `Starting workflow execution for run ${data.runId}`);
await run.update({
status: 'running',
log: 'Workflow execution started'
});
try {
const startedData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording.recording_meta.name,
status: 'running',
startedAt: new Date().toLocaleString()
};
serverIo.of(browserId).emit('run-started', startedData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-started', startedData);
} catch (socketError: any) {
logger.log('warn', `Failed to send run-started notification for API run ${plainRun.runId}: ${socketError.message}`);
}
// Execute the workflow
const workflow = AddGeneratedFlags(recording.recording);
@@ -231,6 +265,19 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
logger.log('info', `Workflow execution completed for run ${data.runId}`);
const binaryOutputService = new BinaryOutputService('maxuncloud-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(
run,
interpretationInfo.binaryOutput
);
// Get the already persisted and credit-validated data from the run record
const finalRun = await Run.findByPk(run.id);
const categorizedOutput = {
scrapeSchema: finalRun?.serializableOutput?.scrapeSchema || {},
scrapeList: finalRun?.serializableOutput?.scrapeList || {}
};
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`);
@@ -240,48 +287,39 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
await run.update({
status: 'success',
finishedAt: new Date().toLocaleString(),
log: interpretationInfo.log.join('\n')
log: interpretationInfo.log.join('\n'),
serializableOutput: JSON.parse(JSON.stringify({
scrapeSchema: categorizedOutput.scrapeSchema || {},
scrapeList: categorizedOutput.scrapeList || {},
})),
binaryOutput: uploadedBinaryOutput,
});
// Upload binary output to MinIO and update run with MinIO URLs
const updatedRun = await Run.findOne({ where: { runId: data.runId } });
if (updatedRun && updatedRun.binaryOutput && Object.keys(updatedRun.binaryOutput).length > 0) {
try {
const binaryService = new BinaryOutputService('maxun-run-screenshots');
await binaryService.uploadAndStoreBinaryOutput(updatedRun, updatedRun.binaryOutput);
logger.log('info', `Uploaded binary output to MinIO for run ${data.runId}`);
} catch (minioError: any) {
logger.log('error', `Failed to upload binary output to MinIO for run ${data.runId}: ${minioError.message}`);
}
}
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
if (updatedRun) {
if (updatedRun.serializableOutput) {
if (updatedRun.serializableOutput.scrapeSchema) {
Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
if (updatedRun.serializableOutput.scrapeList) {
Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
if (categorizedOutput) {
if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
if (updatedRun.binaryOutput) {
extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length;
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
if (run.binaryOutput) {
extractedScreenshotsCount = Object.keys(run.binaryOutput).length;
}
}
@@ -302,6 +340,21 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
);
try {
const completionData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording.recording_meta.name,
status: 'success',
finishedAt: new Date().toLocaleString()
};
serverIo.of(browserId).emit('run-completed', completionData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData);
} catch (socketError: any) {
logger.log('warn', `Failed to send run-completed notification for API run ${plainRun.runId}: ${socketError.message}`);
}
const webhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: data.runId,
@@ -310,12 +363,16 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
extracted_data: {
captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [],
captured_lists: updatedRun?.serializableOutput?.scrapeList || {},
total_rows: totalRowsExtracted,
captured_texts: Object.keys(categorizedOutput.scrapeSchema || {}).length > 0
? Object.entries(categorizedOutput.scrapeSchema).reduce((acc, [name, value]) => {
acc[name] = Array.isArray(value) ? value : [value];
return acc;
}, {} as Record<string, any[]>)
: {},
captured_lists: categorizedOutput.scrapeList,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
screenshots_count: extractedScreenshotsCount,
screenshots_count: extractedScreenshotsCount
},
metadata: {
browser_id: plainRun.browserId,
@@ -330,26 +387,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
logger.log('error', `Failed to send webhooks for run ${data.runId}: ${webhookError.message}`);
}
// Schedule updates for Google Sheets and Airtable
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
// Flush any remaining persistence buffer before emitting socket event
if (browser && browser.interpreter) {
await browser.interpreter.flushPersistenceBuffer();
logger.log('debug', `Flushed persistence buffer before emitting run-completed for run ${data.runId}`);
}
const completionData = {
runId: data.runId,
robotMetaId: plainRun.robotMetaId,
robotName: recording.recording_meta.name,
status: 'success',
finishedAt: new Date().toLocaleString()
};
serverIo.of(browserId).emit('run-completed', completionData);
serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData);
await destroyRemoteBrowser(browserId, data.userId);
logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`);
@@ -416,9 +455,13 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
},
partial_data_extracted: partialDataExtracted,
extracted_data: partialDataExtracted ? {
captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [],
captured_texts: Object.keys(partialUpdateData.serializableOutput?.scrapeSchema || {}).length > 0
? Object.entries(partialUpdateData.serializableOutput.scrapeSchema).reduce((acc, [name, value]) => {
acc[name] = Array.isArray(value) ? value : [value];
return acc;
}, {} as Record<string, any[]>)
: {},
captured_lists: partialUpdateData.serializableOutput?.scrapeList || {},
total_data_points_extracted: partialData?.totalDataPointsExtracted || 0,
captured_texts_count: partialData?.totalSchemaItemsExtracted || 0,
captured_lists_count: partialData?.totalListItemsExtracted || 0,
screenshots_count: partialData?.extractedScreenshotsCount || 0