feat: revamp abort run execution
This commit is contained in:
@@ -627,12 +627,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
|||||||
|
|
||||||
async function abortRun(runId: string, userId: string): Promise<boolean> {
|
async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||||
try {
|
try {
|
||||||
const run = await Run.findOne({
|
const run = await Run.findOne({ where: { runId: runId } });
|
||||||
where: {
|
|
||||||
runId: runId,
|
|
||||||
runByUserId: userId
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!run) {
|
if (!run) {
|
||||||
logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`);
|
logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`);
|
||||||
@@ -683,32 +678,9 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let currentLog = 'Run aborted by user';
|
let currentLog = 'Run aborted by user';
|
||||||
let categorizedOutput = {
|
const extractedData = await extractAndProcessScrapedData(browser, run);
|
||||||
scrapeSchema: {},
|
|
||||||
scrapeList: {},
|
console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`);
|
||||||
};
|
|
||||||
let binaryOutput: Record<string, any> = {};
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (browser.interpreter) {
|
|
||||||
if (browser.interpreter.debugMessages) {
|
|
||||||
currentLog = browser.interpreter.debugMessages.join('\n') || currentLog;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (browser.interpreter.serializableDataByType) {
|
|
||||||
categorizedOutput = {
|
|
||||||
scrapeSchema: collectDataByType(browser.interpreter.serializableDataByType.scrapeSchema || []),
|
|
||||||
scrapeList: collectDataByType(browser.interpreter.serializableDataByType.scrapeList || []),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (browser.interpreter.binaryData) {
|
|
||||||
binaryOutput = collectBinaryData(browser.interpreter.binaryData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (interpreterError) {
|
|
||||||
logger.log('warn', `Error collecting data from interpreter: ${interpreterError}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
await run.update({
|
await run.update({
|
||||||
status: 'aborted',
|
status: 'aborted',
|
||||||
@@ -716,12 +688,16 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
|||||||
browserId: plainRun.browserId,
|
browserId: plainRun.browserId,
|
||||||
log: currentLog,
|
log: currentLog,
|
||||||
serializableOutput: {
|
serializableOutput: {
|
||||||
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
|
scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema),
|
||||||
scrapeList: Object.values(categorizedOutput.scrapeList),
|
scrapeList: Object.values(extractedData.categorizedOutput.scrapeList),
|
||||||
},
|
},
|
||||||
binaryOutput,
|
binaryOutput: extractedData.uploadedBinaryOutput,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (extractedData.totalDataPointsExtracted > 0) {
|
||||||
|
await triggerIntegrationUpdates(runId, plainRun.robotMetaId);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
serverIo.of(plainRun.browserId).emit('run-aborted', {
|
serverIo.of(plainRun.browserId).emit('run-aborted', {
|
||||||
runId,
|
runId,
|
||||||
@@ -733,6 +709,15 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
|||||||
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
|
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
await destroyRemoteBrowser(plainRun.browserId, userId);
|
||||||
|
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
|
||||||
|
} catch (cleanupError) {
|
||||||
|
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||||
@@ -741,30 +726,6 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to collect data from arrays into indexed objects
|
|
||||||
* @param dataArray Array of data to be transformed into an object with indexed keys
|
|
||||||
* @returns Object with indexed keys
|
|
||||||
*/
|
|
||||||
function collectDataByType(dataArray: any[]): Record<string, any> {
|
|
||||||
return dataArray.reduce((result: Record<string, any>, item, index) => {
|
|
||||||
result[`item-${index}`] = item;
|
|
||||||
return result;
|
|
||||||
}, {});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to collect binary data (like screenshots)
|
|
||||||
* @param binaryDataArray Array of binary data objects to be transformed
|
|
||||||
* @returns Object with indexed keys
|
|
||||||
*/
|
|
||||||
function collectBinaryData(binaryDataArray: { mimetype: string, data: string, type?: string }[]): Record<string, any> {
|
|
||||||
return binaryDataArray.reduce((result: Record<string, any>, item, index) => {
|
|
||||||
result[`item-${index}`] = item;
|
|
||||||
return result;
|
|
||||||
}, {});
|
|
||||||
}
|
|
||||||
|
|
||||||
async function registerRunExecutionWorker() {
|
async function registerRunExecutionWorker() {
|
||||||
try {
|
try {
|
||||||
const registeredUserQueues = new Map();
|
const registeredUserQueues = new Map();
|
||||||
|
|||||||
Reference in New Issue
Block a user