feat: register abort run queue worker
This commit is contained in:
@@ -46,6 +46,11 @@ interface ExecuteRunData {
|
||||
browserId: string;
|
||||
}
|
||||
|
||||
interface AbortRunData {
|
||||
userId: string;
|
||||
runId: string;
|
||||
}
|
||||
|
||||
const pgBoss = new PgBoss({connectionString: pgBossConnectionString });
|
||||
|
||||
/**
|
||||
@@ -358,6 +363,78 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
|
||||
}
|
||||
}
|
||||
|
||||
async function abortRun(runId: string, userId: string): Promise<boolean> {
|
||||
try {
|
||||
const run = await Run.findOne({
|
||||
where: {
|
||||
runId: runId,
|
||||
runByUserId: userId
|
||||
}
|
||||
});
|
||||
|
||||
if (!run) {
|
||||
logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const plainRun = run.toJSON();
|
||||
|
||||
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
|
||||
|
||||
if (!browser) {
|
||||
await run.update({
|
||||
status: 'aborted',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
log: 'Aborted: Browser not found or already closed'
|
||||
});
|
||||
|
||||
logger.log('warn', `Browser not found for run ${runId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
const currentLog = browser.interpreter.debugMessages.join('\n');
|
||||
const serializableOutput = browser.interpreter.serializableData.reduce((reducedObject, item, index) => {
|
||||
return {
|
||||
[`item-${index}`]: item,
|
||||
...reducedObject,
|
||||
}
|
||||
}, {});
|
||||
|
||||
const binaryOutput = browser.interpreter.binaryData.reduce((reducedObject, item, index) => {
|
||||
return {
|
||||
[`item-${index}`]: item,
|
||||
...reducedObject,
|
||||
}
|
||||
}, {});
|
||||
|
||||
await run.update({
|
||||
status: 'aborted',
|
||||
finishedAt: new Date().toLocaleString(),
|
||||
browserId: plainRun.browserId,
|
||||
log: currentLog || 'Run aborted by user',
|
||||
serializableOutput,
|
||||
binaryOutput,
|
||||
});
|
||||
|
||||
const queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId);
|
||||
|
||||
if (!queuedRunProcessed) {
|
||||
try {
|
||||
await destroyRemoteBrowser(plainRun.browserId, userId);
|
||||
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
|
||||
} catch (cleanupError) {
|
||||
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to abort run ${runId}: ${errorMessage}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function registerRunExecutionWorker() {
|
||||
try {
|
||||
const registeredUserQueues = new Map();
|
||||
@@ -412,6 +489,52 @@ async function registerRunExecutionWorker() {
|
||||
}
|
||||
}
|
||||
|
||||
async function registerAbortRunWorker() {
|
||||
try {
|
||||
const registeredAbortQueues = new Map();
|
||||
|
||||
const checkForNewAbortQueues = async () => {
|
||||
try {
|
||||
const activeQueues = await pgBoss.getQueues();
|
||||
|
||||
const abortQueues = activeQueues.filter(q => q.name.startsWith('abort-run-user-'));
|
||||
|
||||
for (const queue of abortQueues) {
|
||||
if (!registeredAbortQueues.has(queue.name)) {
|
||||
await pgBoss.work(queue.name, async (job: Job<AbortRunData> | Job<AbortRunData>[]) => {
|
||||
try {
|
||||
const data = extractJobData(job);
|
||||
const { userId, runId } = data;
|
||||
|
||||
logger.log('info', `Processing abort request for run ${runId} by user ${userId}`);
|
||||
const success = await abortRun(runId, userId);
|
||||
return { success };
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Abort run job failed in ${queue.name}: ${errorMessage}`);
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
registeredAbortQueues.set(queue.name, true);
|
||||
logger.log('info', `Registered abort worker for queue: ${queue.name}`);
|
||||
}
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to check for new abort queues: ${errorMessage}`);
|
||||
}
|
||||
};
|
||||
|
||||
await checkForNewAbortQueues();
|
||||
|
||||
logger.log('info', 'Abort run worker registration system initialized');
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.log('error', `Failed to initialize abort run worker system: ${errorMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initialize PgBoss and register all workers
|
||||
@@ -493,6 +616,9 @@ async function startWorkers() {
|
||||
// Register the run execution worker
|
||||
await registerRunExecutionWorker();
|
||||
|
||||
// Register the abort run worker
|
||||
await registerAbortRunWorker();
|
||||
|
||||
logger.log('info', 'All recording workers registered successfully');
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
|
||||
Reference in New Issue
Block a user