diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index 19b97707..b51652b1 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -47,6 +47,7 @@ interface InterpreterOptions { activeId: (id: number) => void, debugMessage: (msg: string) => void, setActionType: (type: string) => void, + incrementScrapeListIndex: () => void, }> } @@ -475,6 +476,11 @@ export default class Interpreter extends EventEmitter { } await this.ensureScriptsLoaded(page); + + if (this.options.debugChannel?.incrementScrapeListIndex) { + this.options.debugChannel.incrementScrapeListIndex(); + } + if (!config.pagination) { const scrapeResults: Record[] = await page.evaluate((cfg) => window.scrapeList(cfg), config); await this.options.serializableCallback(scrapeResults); @@ -624,6 +630,8 @@ export default class Interpreter extends EventEmitter { }); allResults = allResults.concat(newResults); debugLog("Results collected:", allResults.length); + + await this.options.serializableCallback(allResults); }; const checkLimit = () => { diff --git a/public/locales/de.json b/public/locales/de.json index 8141d873..650e024d 100644 --- a/public/locales/de.json +++ b/public/locales/de.json @@ -474,7 +474,8 @@ "schedule_success": "Roboter {{name}} erfolgreich geplant", "schedule_failed": "Planen des Roboters {{name}} fehlgeschlagen", "abort_success": "Interpretation des Roboters {{name}} erfolgreich abgebrochen", - "abort_failed": "Abbrechen der Interpretation des Roboters {{name}} fehlgeschlagen" + "abort_failed": "Abbrechen der Interpretation des Roboters {{name}} fehlgeschlagen", + "abort_initiated": "Interpretation des Roboters {{name}} wird abgebrochen" }, "menu": { "recordings": "Roboter", diff --git a/public/locales/en.json b/public/locales/en.json index 5e0925a1..4b981a58 100644 --- a/public/locales/en.json +++ b/public/locales/en.json @@ -487,7 +487,8 @@ "schedule_success": "Robot {{name}} scheduled successfully", "schedule_failed": "Failed to schedule robot {{name}}", "abort_success": "Interpretation of robot {{name}} aborted successfully", - "abort_failed": "Failed to abort the interpretation of robot {{name}}" + "abort_failed": "Failed to abort the interpretation of robot {{name}}", + "abort_initiated": "Aborting the interpretation of robot {{name}}" }, "menu": { "recordings": "Robots", diff --git a/public/locales/es.json b/public/locales/es.json index e6953f38..8236157c 100644 --- a/public/locales/es.json +++ b/public/locales/es.json @@ -475,7 +475,8 @@ "schedule_success": "Robot {{name}} programado exitosamente", "schedule_failed": "Error al programar el robot {{name}}", "abort_success": "Interpretación del robot {{name}} abortada exitosamente", - "abort_failed": "Error al abortar la interpretación del robot {{name}}" + "abort_failed": "Error al abortar la interpretación del robot {{name}}", + "abort_initiated": "Cancelando la interpretación del robot {{name}}" }, "menu": { "recordings": "Robots", diff --git a/public/locales/ja.json b/public/locales/ja.json index 9d237674..ce4b74e5 100644 --- a/public/locales/ja.json +++ b/public/locales/ja.json @@ -475,7 +475,8 @@ "schedule_success": "ロボット{{name}}のスケジュールが正常に設定されました", "schedule_failed": "ロボット{{name}}のスケジュール設定に失敗しました", "abort_success": "ロボット{{name}}の解釈を中止しました", - "abort_failed": "ロボット{{name}}の解釈中止に失敗しました" + "abort_failed": "ロボット{{name}}の解釈中止に失敗しました", + "abort_initiated": "ロボット {{name}} の解釈を中止しています" }, "menu": { "recordings": "ロボット", diff --git a/public/locales/zh.json b/public/locales/zh.json index 949faf4d..3abcfa40 100644 --- a/public/locales/zh.json +++ b/public/locales/zh.json @@ -475,7 +475,8 @@ "schedule_success": "机器人{{name}}调度成功", "schedule_failed": "机器人{{name}}调度失败", "abort_success": "成功中止机器人{{name}}的解释", - "abort_failed": "中止机器人{{name}}的解释失败" + "abort_failed": "中止机器人{{name}}的解释失败", + "abort_initiated": "正在中止机器人 {{name}} 的解释" }, "menu": { "recordings": "机器人", diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index 374c24fa..bf4cbc1e 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -82,11 +82,140 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; +/** + * Helper function to extract and process scraped data from browser interpreter + */ +async function extractAndProcessScrapedData( + browser: RemoteBrowser, + run: any +): Promise<{ + categorizedOutput: any; + uploadedBinaryOutput: any; + totalDataPointsExtracted: number; + totalSchemaItemsExtracted: number; + totalListItemsExtracted: number; + extractedScreenshotsCount: number; +}> { + let categorizedOutput: { + scrapeSchema: Record; + scrapeList: Record; + } = { + scrapeSchema: {}, + scrapeList: {} + }; + + if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) { + browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => { + categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem; + }); + } + + if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) { + browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => { + categorizedOutput.scrapeList[`list-${index}`] = listItem; + }); + } + + const binaryOutput = browser?.interpreter?.binaryData?.reduce( + (reducedObject: Record, item: any, index: number): Record => { + return { + [`item-${index}`]: item, + ...reducedObject, + }; + }, + {} + ) || {}; + + let totalDataPointsExtracted = 0; + let totalSchemaItemsExtracted = 0; + let totalListItemsExtracted = 0; + let extractedScreenshotsCount = 0; + + if (categorizedOutput.scrapeSchema) { + Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + schemaResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalDataPointsExtracted += Object.keys(schemaResult).length; + totalSchemaItemsExtracted += 1; + } + }); + } + + if (categorizedOutput.scrapeList) { + Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + listResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); + totalListItemsExtracted += listResult.length; + } + }); + } + + if (binaryOutput) { + extractedScreenshotsCount = Object.keys(binaryOutput).length; + totalDataPointsExtracted += extractedScreenshotsCount; + } + + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput( + run, + binaryOutput + ); + + return { + categorizedOutput: { + scrapeSchema: categorizedOutput.scrapeSchema || {}, + scrapeList: categorizedOutput.scrapeList || {} + }, + uploadedBinaryOutput, + totalDataPointsExtracted, + totalSchemaItemsExtracted, + totalListItemsExtracted, + extractedScreenshotsCount + }; +} + +// Helper function to handle integration updates +async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { + try { + googleSheetUpdateTasks[runId] = { + robotId: robotMetaId, + runId: runId, + status: 'pending', + retries: 5, + }; + + airtableUpdateTasks[runId] = { + robotId: robotMetaId, + runId: runId, + status: 'pending', + retries: 5, + }; + + processAirtableUpdates(); + processGoogleSheetUpdates(); + } catch (err: any) { + logger.log('error', `Failed to update integrations for run: ${runId}: ${err.message}`); + } +} + +/** + * Modified processRunExecution function - only add browser reset + */ async function processRunExecution(job: Job) { const BROWSER_INIT_TIMEOUT = 30000; const data = job.data; - logger.log('info', `Processing run execution job for runId: ${data.runId}`); + logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`); try { // Find the run @@ -108,51 +237,8 @@ async function processRunExecution(job: Job) { throw new Error(`No browser ID available for run ${data.runId}`); } - // Find the recording - const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); - if (!recording) { - logger.log('error', `Recording for run ${data.runId} not found`); - - const currentRun = await Run.findOne({ where: { runId: data.runId } }); - if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Failed: Recording not found', - }); - - // Trigger webhooks for run failure - const failedWebhookPayload = { - robot_id: plainRun.robotMetaId, - run_id: data.runId, - robot_name: 'Unknown Robot', - status: 'failed', - started_at: plainRun.startedAt, - finished_at: new Date().toLocaleString(), - error: { - message: "Failed: Recording not found", - type: 'RecordingNotFoundError' - }, - metadata: { - browser_id: plainRun.browserId, - user_id: data.userId, - } - }; - - try { - await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); - logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); - } catch (webhookError: any) { - logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); - } - } - - return { success: false }; - } - logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`); - // Get the browser and execute the run let browser = browserPool.getRemoteBrowser(browserId); const browserWaitStart = Date.now(); @@ -168,7 +254,14 @@ async function processRunExecution(job: Job) { logger.log('info', `Browser ${browserId} found and ready for execution`); - try { + try { + // Find the recording + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); + + if (!recording) { + throw new Error(`Recording for run ${data.runId} not found`); + } + const isRunAborted = async (): Promise => { const currentRun = await Run.findOne({ where: { runId: data.runId } }); return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false; @@ -182,7 +275,7 @@ async function processRunExecution(job: Job) { await new Promise(resolve => setTimeout(resolve, 1000)); currentPage = browser.getCurrentPage(); } - + if (!currentPage) { throw new Error(`No current page available for browser ${browserId} after timeout`); } @@ -200,25 +293,27 @@ async function processRunExecution(job: Job) { if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`); - + await destroyRemoteBrowser(plainRun.browserId, data.userId); return { success: true }; } + + logger.log('info', `Workflow execution completed for run ${data.runId}`); const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - if (await isRunAborted()) { - logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); - return { success: true }; - } - const categorizedOutput = { scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, scrapeList: interpretationInfo.scrapeListOutput || {} }; + if (await isRunAborted()) { + logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); + return { success: true }; + } + await run.update({ ...run, status: 'success', @@ -233,6 +328,7 @@ async function processRunExecution(job: Job) { }); // Track extraction metrics + let totalDataPointsExtracted = 0; let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; @@ -240,23 +336,35 @@ async function processRunExecution(job: Job) { if (categorizedOutput.scrapeSchema) { Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { if (Array.isArray(schemaResult)) { + schemaResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); totalSchemaItemsExtracted += schemaResult.length; } else if (schemaResult && typeof schemaResult === 'object') { + totalDataPointsExtracted += Object.keys(schemaResult).length; totalSchemaItemsExtracted += 1; } }); } - + if (categorizedOutput.scrapeList) { Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { if (Array.isArray(listResult)) { + listResult.forEach(obj => { + if (obj && typeof obj === 'object') { + totalDataPointsExtracted += Object.keys(obj).length; + } + }); totalListItemsExtracted += listResult.length; } }); } - + if (uploadedBinaryOutput) { extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; + totalDataPointsExtracted += extractedScreenshotsCount; } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; @@ -265,6 +373,7 @@ async function processRunExecution(job: Job) { console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); console.log(`Total Rows Extracted: ${totalRowsExtracted}`); + console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`); // Capture metrics capture( @@ -295,7 +404,8 @@ async function processRunExecution(job: Job) { total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, - screenshots_count: extractedScreenshotsCount + screenshots_count: extractedScreenshotsCount, + total_data_points_extracted: totalDataPointsExtracted, }, metadata: { browser_id: plainRun.browserId, @@ -311,111 +421,213 @@ async function processRunExecution(job: Job) { } // Schedule updates for Google Sheets and Airtable - try { - googleSheetUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); - airtableUpdateTasks[plainRun.runId] = { - robotId: plainRun.robotMetaId, - runId: plainRun.runId, - status: 'pending', - retries: 5, - }; - - processAirtableUpdates(); - processGoogleSheetUpdates(); - } catch (err: any) { - logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); - } - - serverIo.of(plainRun.browserId).emit('run-completed', { + const completionData = { runId: data.runId, robotMetaId: plainRun.robotMetaId, robotName: recording.recording_meta.name, status: 'success', finishedAt: new Date().toLocaleString() - }); - - await destroyRemoteBrowser(plainRun.browserId, data.userId); - logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`); + }; + + serverIo.of(browserId).emit('run-completed', completionData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', completionData); + + await destroyRemoteBrowser(browserId, data.userId); + logger.log('info', `Browser ${browserId} destroyed after successful run ${data.runId}`); return { success: true }; } catch (executionError: any) { logger.log('error', `Run execution failed for run ${data.runId}: ${executionError.message}`); - const currentRun = await Run.findOne({ where: { runId: data.runId } }); - if (currentRun && (currentRun.status !== 'aborted' && currentRun.status !== 'aborting')) { - await run.update({ + let partialDataExtracted = false; + let partialData: any = null; + let partialUpdateData: any = { + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${executionError.message}`, + }; + + try { + if (browser && browser.interpreter) { + const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0; + const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0; + const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0; + + if (hasSchemaData || hasListData || hasBinaryData) { + logger.log('info', `Extracting partial data from failed run ${data.runId}`); + + partialData = await extractAndProcessScrapedData(browser, run); + + partialUpdateData.serializableOutput = { + scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema), + scrapeList: Object.values(partialData.categorizedOutput.scrapeList), + }; + partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput; + + partialDataExtracted = true; + logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`); + + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); + } + } + } catch (partialDataError: any) { + logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`); + } + + await run.update(partialUpdateData); + + try { + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + + const failureData = { + runId: data.runId, + robotMetaId: plainRun.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', status: 'failed', finishedAt: new Date().toLocaleString(), - log: `Failed: ${executionError.message}`, - }); - - // Capture failure metrics - capture( - 'maxun-oss-run-created-manual', - { - runId: data.runId, - user_id: data.userId, - created_at: new Date().toISOString(), - status: 'failed', - error_message: executionError.message, - } - ); - - // Trigger webhooks for run failure - const failedWebhookPayload = { - robot_id: plainRun.robotMetaId, - run_id: data.runId, - robot_name: recording.recording_meta.name, - status: 'failed', - started_at: plainRun.startedAt, - finished_at: new Date().toLocaleString(), - error: { - message: executionError.message, - stack: executionError.stack, - type: executionError.name || 'ExecutionError' - }, - metadata: { - browser_id: plainRun.browserId, - user_id: data.userId, - } + hasPartialData: partialDataExtracted }; - try { - await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); - logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); - } catch (webhookError: any) { - logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); - } - } else { - logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`); + serverIo.of(browserId).emit('run-completed', failureData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureData); + } catch (emitError: any) { + logger.log('warn', `Failed to emit failure event: ${emitError.message}`); } - - await destroyRemoteBrowser(plainRun.browserId, data.userId); - - return { success: false }; + + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + + const failedWebhookPayload = { + robot_id: plainRun.robotMetaId, + run_id: data.runId, + robot_name: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + started_at: plainRun.startedAt, + finished_at: new Date().toLocaleString(), + error: { + message: executionError.message, + stack: executionError.stack, + type: 'ExecutionError', + }, + partial_data_extracted: partialDataExtracted, + extracted_data: partialDataExtracted ? { + captured_texts: Object.values(partialUpdateData.serializableOutput?.scrapeSchema || []).flat() || [], + captured_lists: partialUpdateData.serializableOutput?.scrapeList || {}, + total_data_points_extracted: partialData?.totalDataPointsExtracted || 0, + captured_texts_count: partialData?.totalSchemaItemsExtracted || 0, + captured_lists_count: partialData?.totalListItemsExtracted || 0, + screenshots_count: partialData?.extractedScreenshotsCount || 0 + } : null, + metadata: { + browser_id: plainRun.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(plainRun.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); + } + + try { + const failureSocketData = { + runId: data.runId, + robotMetaId: run.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(run.browserId).emit('run-completed', failureSocketData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData); + } catch (socketError: any) { + logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`); + } + + capture('maxun-oss-run-created-manual', { + runId: data.runId, + user_id: data.userId, + created_at: new Date().toISOString(), + status: 'failed', + error_message: executionError.message, + partial_data_extracted: partialDataExtracted, + totalRowsExtracted: partialData?.totalSchemaItemsExtracted + partialData?.totalListItemsExtracted + partialData?.extractedScreenshotsCount || 0, + }); + + await destroyRemoteBrowser(browserId, data.userId); + logger.log('info', `Browser ${browserId} destroyed after failed run`); + + return { success: false, partialDataExtracted }; } } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : String(error); logger.log('error', `Failed to process run execution job: ${errorMessage}`); + + try { + const run = await Run.findOne({ where: { runId: data.runId }}); + + if (run) { + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: `Failed: ${errorMessage}`, + }); + + const recording = await Robot.findOne({ where: { 'recording_meta.id': run.robotMetaId }, raw: true }); + + const failedWebhookPayload = { + robot_id: run.robotMetaId, + run_id: data.runId, + robot_name: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + started_at: run.startedAt, + finished_at: new Date().toLocaleString(), + error: { + message: errorMessage, + }, + metadata: { + browser_id: run.browserId, + user_id: data.userId, + } + }; + + try { + await sendWebhook(run.robotMetaId, 'run_failed', failedWebhookPayload); + logger.log('info', `Failure webhooks sent successfully for run ${data.runId}`); + } catch (webhookError: any) { + logger.log('error', `Failed to send failure webhooks for run ${data.runId}: ${webhookError.message}`); + } + + try { + const failureSocketData = { + runId: data.runId, + robotMetaId: run.robotMetaId, + robotName: recording ? recording.recording_meta.name : 'Unknown Robot', + status: 'failed', + finishedAt: new Date().toLocaleString() + }; + + serverIo.of(run.browserId).emit('run-completed', failureSocketData); + serverIo.of('/queued-run').to(`user-${data.userId}`).emit('run-completed', failureSocketData); + } catch (socketError: any) { + logger.log('warn', `Failed to emit failure event in main catch: ${socketError.message}`); + } + } + } catch (updateError: any) { + logger.log('error', `Failed to update run status: ${updateError.message}`); + } + return { success: false }; } } async function abortRun(runId: string, userId: string): Promise { try { - const run = await Run.findOne({ - where: { - runId: runId, - runByUserId: userId - } - }); + const run = await Run.findOne({ where: { runId: runId } }); if (!run) { logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); @@ -466,32 +678,9 @@ async function abortRun(runId: string, userId: string): Promise { } let currentLog = 'Run aborted by user'; - let categorizedOutput = { - scrapeSchema: {}, - scrapeList: {}, - }; - let binaryOutput: Record = {}; - - try { - if (browser.interpreter) { - if (browser.interpreter.debugMessages) { - currentLog = browser.interpreter.debugMessages.join('\n') || currentLog; - } - - if (browser.interpreter.serializableDataByType) { - categorizedOutput = { - scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []), - scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []), - }; - } - - if (browser.interpreter.binaryData) { - binaryOutput = collectBinaryData(browser.interpreter.binaryData); - } - } - } catch (interpreterError) { - logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`); - } + const extractedData = await extractAndProcessScrapedData(browser, run); + + console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`); await run.update({ status: 'aborted', @@ -499,12 +688,16 @@ async function abortRun(runId: string, userId: string): Promise { browserId: plainRun.browserId, log: currentLog, serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), + scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema), + scrapeList: Object.values(extractedData.categorizedOutput.scrapeList), }, - binaryOutput, + binaryOutput: extractedData.uploadedBinaryOutput, }); + if (extractedData.totalDataPointsExtracted > 0) { + await triggerIntegrationUpdates(runId, plainRun.robotMetaId); + } + try { serverIo.of(plainRun.browserId).emit('run-aborted', { runId, @@ -515,7 +708,7 @@ async function abortRun(runId: string, userId: string): Promise { } catch (socketError) { logger.log('warn', `Failed to emit run-aborted event: ${socketError}`); } - + try { await new Promise(resolve => setTimeout(resolve, 500)); @@ -533,30 +726,6 @@ async function abortRun(runId: string, userId: string): Promise { } } -/** - * Helper function to collect data from arrays into indexed objects - * @param dataArray Array of data to be transformed into an object with indexed keys - * @returns Object with indexed keys - */ -function collectDataByType(dataArray: any[]): Record { - return dataArray.reduce((result: Record, item, index) => { - result[`item-${index}`] = item; - return result; - }, {}); -} - -/** - * Helper function to collect binary data (like screenshots) - * @param binaryDataArray Array of binary data objects to be transformed - * @returns Object with indexed keys - */ -function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record { - return binaryDataArray.reduce((result: Record, item, index) => { - result[`item-${index}`] = item; - return result; - }, {}); -} - async function registerRunExecutionWorker() { try { const registeredUserQueues = new Map(); diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 844f5ce0..74bea6ff 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -952,21 +952,40 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, */ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { try { - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } - - const run = await Run.findOne({ - where: { - runId: req.params.id, - runByUserId: req.user.id, - } - }); - + if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); } + + const run = await Run.findOne({ where: { runId: req.params.id } }); + if (!run) { return res.status(404).send({ error: 'Run not found' }); } + if (!['running', 'queued'].includes(run.status)) { + return res.status(400).send({ + error: `Cannot abort run with status: ${run.status}` + }); + } + + const isQueued = run.status === 'queued'; + + await run.update({ + status: 'aborting' + }); + + if (isQueued) { + await run.update({ + status: 'aborted', + finishedAt: new Date().toLocaleString(), + log: 'Run aborted while queued' + }); + + return res.send({ + success: true, + message: 'Queued run aborted', + isQueued: true + }); + } + if (!['running', 'queued'].includes(run.status)) { return res.status(400).send({ error: `Cannot abort run with status: ${run.status}` @@ -997,12 +1016,13 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, logger.log('info', `Abort signal sent for run ${req.params.id}, job ID: ${jobId}`); - return res.send({ - success: true, + return res.send({ + success: true, message: 'Abort signal sent', - jobId - }); - + jobId, + isQueued: false + }); + } catch (e) { const { message } = e as Error; logger.log('error', `Error aborting run ${req.params.id}: ${message}`); diff --git a/server/src/schedule-worker.ts b/server/src/schedule-worker.ts index 91c3c224..c75770e4 100644 --- a/server/src/schedule-worker.ts +++ b/server/src/schedule-worker.ts @@ -6,6 +6,7 @@ import logger from './logger'; import Robot from './models/Robot'; import { handleRunRecording } from './workflow-management/scheduler'; import { computeNextRun } from './utils/schedule'; +import { v4 as uuid } from "uuid"; if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) { throw new Error('One or more required environment variables are missing.'); @@ -32,7 +33,7 @@ interface ScheduledWorkflowData { */ export async function scheduleWorkflow(id: string, userId: string, cronExpression: string, timezone: string): Promise { try { - const runId = require('uuidv4').uuid(); + const runId = uuid(); const queueName = `scheduled-workflow-${id}`; diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index ca853489..f249f26e 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -107,6 +107,11 @@ export class WorkflowInterpreter { */ public binaryData: { mimetype: string, data: string }[] = []; + /** + * Track current scrapeList index + */ + private currentScrapeListIndex: number = 0; + /** * An array of id's of the pairs from the workflow that are about to be paused. * As "breakpoints". @@ -288,6 +293,7 @@ export class WorkflowInterpreter { scrapeList: [], }; this.binaryData = []; + this.currentScrapeListIndex = 0; } /** @@ -322,6 +328,9 @@ export class WorkflowInterpreter { }, setActionType: (type: string) => { this.currentActionType = type; + }, + incrementScrapeListIndex: () => { + this.currentScrapeListIndex++; } }, serializableCallback: (data: any) => { @@ -334,7 +343,7 @@ export class WorkflowInterpreter { this.serializableDataByType.scrapeSchema.push([data]); } } else if (this.currentActionType === 'scrapeList') { - this.serializableDataByType.scrapeList.push(data); + this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data; } this.socket.emit('serializableCallback', data); diff --git a/src/api/storage.ts b/src/api/storage.ts index 02558c72..d3fa3eb8 100644 --- a/src/api/storage.ts +++ b/src/api/storage.ts @@ -205,20 +205,24 @@ export const interpretStoredRecording = async (id: string): Promise => } } -export const notifyAboutAbort = async (id: string): Promise => { +export const notifyAboutAbort = async (id: string): Promise<{ success: boolean; isQueued?: boolean }> => { try { - const response = await axios.post(`${apiUrl}/storage/runs/abort/${id}`); + const response = await axios.post(`${apiUrl}/storage/runs/abort/${id}`, { withCredentials: true }); if (response.status === 200) { - return response.data; + return { + success: response.data.success, + isQueued: response.data.isQueued + }; } else { throw new Error(`Couldn't abort a running recording with id ${id}`); } } catch (error: any) { console.log(error); - return false; + return { success: false }; } } + export const scheduleStoredRecording = async (id: string, settings: ScheduleSettings): Promise => { try { const response = await axios.put( diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index 7abd2c7e..095a4980 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -52,16 +52,51 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) const navigate = useNavigate(); const abortRunHandler = (runId: string, robotName: string, browserId: string) => { + notify('info', t('main_page.notifications.abort_initiated', { name: robotName })); + aborted = true; + notifyAboutAbort(runId).then(async (response) => { - if (response) { - notify('success', t('main_page.notifications.abort_success', { name: robotName })); - await stopRecording(ids.browserId); - } else { + if (!response.success) { notify('error', t('main_page.notifications.abort_failed', { name: robotName })); + setRerenderRuns(true); + return; } - setRerenderRuns(true); - }) + + if (response.isQueued) { + setRerenderRuns(true); + + notify('success', t('main_page.notifications.abort_success', { name: robotName })); + + setQueuedRuns(prev => { + const newSet = new Set(prev); + newSet.delete(runId); + return newSet; + }); + + return; + } + + const abortSocket = io(`${apiUrl}/${browserId}`, { + transports: ["websocket"], + rejectUnauthorized: false + }); + + abortSocket.on('run-aborted', (abortData) => { + if (abortData.runId === runId) { + notify('success', t('main_page.notifications.abort_success', { name: abortData.robotName || robotName })); + setRerenderRuns(true); + abortSocket.disconnect(); + } + }); + + abortSocket.on('connect_error', (error) => { + console.log('Abort socket connection error:', error); + notify('error', t('main_page.notifications.abort_failed', { name: robotName })); + setRerenderRuns(true); + abortSocket.disconnect(); + }); + }); } const setRecordingInfo = (id: string, name: string) => {