Merge pull request #628 from getmaxun/unified-run

feat: unified run creation and execution
This commit is contained in:
Karishma Shukla
2025-06-12 21:24:46 +05:30
committed by GitHub
15 changed files with 621 additions and 287 deletions

View File

@@ -61,7 +61,8 @@
"notifications": {
"delete_warning": "Der Roboter hat zugehörige Ausführungen. Löschen Sie zuerst die Ausführungen, um den Roboter zu löschen",
"delete_success": "Roboter erfolgreich gelöscht",
"auth_success": "Roboter erfolgreich authentifiziert"
"auth_success": "Roboter erfolgreich authentifiziert",
"browser_limit_warning": "Remote-Browser sind derzeit ausgelastet. Bitte warten Sie einige Minuten und versuchen Sie es erneut"
}
},
"mainmenu": {

View File

@@ -68,7 +68,8 @@
"notifications": {
"delete_warning": "The robot has associated runs. First delete runs to delete the robot",
"delete_success": "Robot deleted successfully",
"auth_success": "Robot successfully authenticated"
"auth_success": "Robot successfully authenticated",
"browser_limit_warning": "Remote browsers are currently busy. Please wait for a few minutes and try again"
}
},
"mainmenu":{

View File

@@ -62,7 +62,8 @@
"notifications": {
"delete_warning": "El robot tiene ejecuciones asociadas. Primero elimine las ejecuciones para eliminar el robot",
"delete_success": "Robot eliminado exitosamente",
"auth_success": "Robot autenticado exitosamente"
"auth_success": "Robot autenticado exitosamente",
"browser_limit_warning": "Los navegadores remotos están ocupados actualmente. Por favor, espere unos minutos e inténtelo de nuevo"
}
},
"mainmenu": {

View File

@@ -62,7 +62,8 @@
"notifications": {
"delete_warning": "ロボットには関連する実行があります。ロボットを削除するには、まず実行を削除してください",
"delete_success": "ロボットが正常に削除されました",
"auth_success": "ロボットの認証に成功しました"
"auth_success": "ロボットの認証に成功しました",
"browser_limit_warning": "リモートブラウザは現在ビジー状態です。数分お待ちいただいてから再度お試しください"
}
},
"mainmenu": {

View File

@@ -62,7 +62,8 @@
"notifications": {
"delete_warning": "该机器人有关联的运行记录。请先删除运行记录才能删除机器人",
"delete_success": "机器人删除成功",
"auth_success": "机器人认证成功"
"auth_success": "机器人认证成功",
"browser_limit_warning": "远程浏览器当前繁忙。请稍等几分钟后重试"
}
},
"mainmenu": {

View File

@@ -14,7 +14,7 @@ interface BrowserPoolInfo {
/**
* The instance of remote browser.
*/
browser: RemoteBrowser,
browser: RemoteBrowser | null,
/**
* States if the browser's instance is being actively used.
* Helps to persist the progress on the frontend when the application has been reloaded.
@@ -31,6 +31,11 @@ interface BrowserPoolInfo {
* @default "recording"
*/
state: BrowserState,
/**
* The status of the browser instance.
* Can be "reserved", "initializing", "ready" or "failed".
*/
status?: "reserved" | "initializing" | "ready" | "failed",
}
/**
@@ -205,8 +210,18 @@ export class BrowserPool {
* @returns remote browser instance or undefined if it does not exist in the pool
*/
public getRemoteBrowser = (id: string): RemoteBrowser | undefined => {
logger.log('debug', `Remote browser with id: ${id} retrieved from the pool`);
return this.pool[id]?.browser;
const poolInfo = this.pool[id];
if (!poolInfo) {
return undefined;
}
// Return undefined for reserved slots (browser is null)
if (poolInfo.status === "reserved") {
logger.log('debug', `Browser ${id} is reserved but not yet ready`);
return undefined;
}
return poolInfo.browser || undefined;
};
/**
@@ -506,6 +521,29 @@ export class BrowserPool {
return browserIds.length > 0 ? browserIds[0] : null;
};
/**
* Checks if there are available browser slots for a user.
* Returns true if user has available slots AND none of their active browsers are in "recording" state.
* @param userId the user ID to check browser slots for
* @returns {boolean} true if user has available slots and no recording browsers, false otherwise
*/
public hasAvailableBrowserSlots = (userId: string, state?: BrowserState): boolean => {
const userBrowserIds = this.userToBrowserMap.get(userId) || [];
if (userBrowserIds.length >= 2) {
return false;
}
if (state === "recording") {
const hasBrowserInState = userBrowserIds.some(browserId =>
this.pool[browserId] && this.pool[browserId].state === "recording"
);
return !hasBrowserInState;
}
return true;
};
/**
* Returns the first active browser's instance id from the pool.
* If there is no active browser, it returns null.
@@ -524,4 +562,71 @@ export class BrowserPool {
// logger.log('warn', `No active browser in the pool`);
return null;
};
/**
* Reserves a browser slot immediately without creating the actual browser.
* This ensures slot counting is accurate for rapid successive requests.
*
* @param id browser ID to reserve
* @param userId user ID that owns this reservation
* @param state browser state ("recording" or "run")
* @returns true if slot was reserved, false if user has reached limit
*/
public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => {
// Check if user has available slots first
if (!this.hasAvailableBrowserSlots(userId, state)) {
logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`);
return false;
}
// Reserve the slot with null browser
this.pool[id] = {
browser: null,
active: false,
userId,
state,
status: "reserved"
};
// Update the user-to-browser mapping
let userBrowserIds = this.userToBrowserMap.get(userId) || [];
if (!userBrowserIds.includes(id)) {
userBrowserIds.push(id);
this.userToBrowserMap.set(userId, userBrowserIds);
}
logger.log('info', `Reserved browser slot ${id} for user ${userId} in state ${state}`);
return true;
};
/**
* Upgrades a reserved slot to an actual browser instance.
*
* @param id browser ID that was previously reserved
* @param browser the actual RemoteBrowser instance
* @returns true if successful, false if slot wasn't reserved
*/
public upgradeBrowserSlot = (id: string, browser: RemoteBrowser): boolean => {
if (!this.pool[id] || this.pool[id].status !== "reserved") {
logger.log('warn', `Cannot upgrade browser ${id}: slot not reserved`);
return false;
}
this.pool[id].browser = browser;
this.pool[id].status = "ready";
logger.log('info', `Upgraded browser slot ${id} to ready state`);
return true;
};
/**
* Marks a reserved slot as failed and removes it.
*
* @param id browser ID to mark as failed
*/
public failBrowserSlot = (id: string): void => {
if (this.pool[id]) {
logger.log('info', `Marking browser slot ${id} as failed`);
this.deleteRemoteBrowser(id);
}
};
}

View File

@@ -54,20 +54,23 @@ export const initializeRemoteBrowserForRecording = (userId: string): string => {
* @category BrowserManagement-Controller
*/
export const createRemoteBrowserForRun = (userId: string): string => {
const id = uuid();
if (!userId) {
logger.log('error', 'createRemoteBrowserForRun: Missing required parameter userId');
throw new Error('userId is required');
}
const id = uuid();
const slotReserved = browserPool.reserveBrowserSlot(id, userId, "run");
if (!slotReserved) {
logger.log('warn', `Cannot create browser for user ${userId}: no available slots`);
throw new Error('User has reached maximum browser limit');
}
logger.log('info', `createRemoteBrowserForRun: Reserved slot ${id} for user ${userId}`);
initializeBrowserAsync(id, userId);
createSocketConnectionForRun(
io.of(id),
async (socket: Socket) => {
try {
const browserSession = new RemoteBrowser(socket, userId, id);
await browserSession.initialize(userId);
browserPool.addRemoteBrowser(id, browserSession, userId, false, "run");
socket.emit('ready-for-run');
} catch (error: any) {
logger.error(`Error initializing browser: ${error.message}`);
}
});
return id;
};
@@ -135,6 +138,19 @@ export const getActiveBrowserIdByState = (userId: string, state: "recording" | "
return browserPool.getActiveBrowserId(userId, state);
};
/**
* Checks if there are available browser slots for a user.
* Wrapper around {@link browserPool.hasAvailableBrowserSlots()} function.
* If state is provided, also checks that none of their active browsers are in that state.
* @param userId the user ID to check browser slots for
* @param state optional state to check - if provided, ensures no browser is in this state
* @returns {boolean} true if user has available slots (and no browsers in specified state if state is provided)
* @category BrowserManagement-Controller
*/
export const canCreateBrowserInState = (userId: string, state?: "recording" | "run"): boolean => {
return browserPool.hasAvailableBrowserSlots(userId, state);
};
/**
* Returns the url string from a remote browser if exists in the browser pool.
* @param id instance id of the remote browser
@@ -198,3 +214,87 @@ export const stopRunningInterpretation = async (userId: string) => {
logger.log('error', 'Cannot stop interpretation: No active browser or generator.');
}
};
const initializeBrowserAsync = async (id: string, userId: string) => {
try {
const namespace = io.of(id);
let clientConnected = false;
let connectionTimeout: NodeJS.Timeout;
const waitForConnection = new Promise<Socket | null>((resolve) => {
namespace.on('connection', (socket: Socket) => {
clientConnected = true;
clearTimeout(connectionTimeout);
logger.log('info', `Frontend connected to browser ${id} via socket ${socket.id}`);
resolve(socket);
});
connectionTimeout = setTimeout(() => {
if (!clientConnected) {
logger.log('warn', `No client connected to browser ${id} within timeout, proceeding with dummy socket`);
resolve(null);
}
}, 10000);
});
namespace.on('error', (error: any) => {
logger.log('error', `Socket namespace error for browser ${id}: ${error.message}`);
clearTimeout(connectionTimeout);
browserPool.failBrowserSlot(id);
});
const socket = await waitForConnection;
try {
let browserSession: RemoteBrowser;
if (socket) {
logger.log('info', `Using real socket for browser ${id}`);
browserSession = new RemoteBrowser(socket, userId, id);
} else {
logger.log('info', `Using dummy socket for browser ${id}`);
const dummySocket = {
emit: (event: string, data?: any) => {
logger.log('debug', `Browser ${id} dummy socket emitted ${event}:`, data);
},
on: () => {},
id: `dummy-${id}`,
} as any;
browserSession = new RemoteBrowser(dummySocket, userId, id);
}
await browserSession.initialize(userId);
const upgraded = browserPool.upgradeBrowserSlot(id, browserSession);
if (!upgraded) {
throw new Error('Failed to upgrade reserved browser slot');
}
if (socket) {
socket.emit('ready-for-run');
} else {
setTimeout(async () => {
try {
logger.log('info', `Starting execution for browser ${id} with dummy socket`);
} catch (error: any) {
logger.log('error', `Error executing run for browser ${id}: ${error.message}`);
}
}, 100);
}
logger.log('info', `Browser ${id} successfully initialized for run with ${socket ? 'real' : 'dummy'} socket`);
} catch (error: any) {
logger.log('error', `Error initializing browser ${id}: ${error.message}`);
browserPool.failBrowserSlot(id);
if (socket) {
socket.emit('error', { message: error.message });
}
}
} catch (error: any) {
logger.log('error', `Error setting up browser ${id}: ${error.message}`);
browserPool.failBrowserSlot(id);
}
};

View File

@@ -82,102 +82,13 @@ function AddGeneratedFlags(workflow: WorkflowFile) {
return copy;
};
/**
* Function to reset browser state without creating a new browser
*/
async function resetBrowserState(browser: RemoteBrowser): Promise<boolean> {
try {
const currentPage = browser.getCurrentPage();
if (!currentPage) {
logger.log('error', 'No current page available to reset browser state');
return false;
}
// Navigate to blank page to reset state
await currentPage.goto('about:blank', { waitUntil: 'networkidle', timeout: 10000 });
// Clear browser storage
await currentPage.evaluate(() => {
try {
localStorage.clear();
sessionStorage.clear();
} catch (e) {
// Ignore errors in cleanup
}
});
// Clear cookies
const context = currentPage.context();
await context.clearCookies();
return true;
} catch (error) {
logger.log('error', `Failed to reset browser state`);
return false;
}
}
/**
* Modified checkAndProcessQueuedRun function - only changes browser reset logic
*/
async function checkAndProcessQueuedRun(userId: string, browserId: string): Promise<boolean> {
try {
// Find the oldest queued run for this specific browser
const queuedRun = await Run.findOne({
where: {
browserId: browserId,
runByUserId: userId,
status: 'queued'
},
order: [['startedAt', 'ASC']]
});
if (!queuedRun) {
logger.log('info', `No queued runs found for browser ${browserId}`);
return false;
}
// Reset the browser state before next run
const browser = browserPool.getRemoteBrowser(browserId);
if (browser) {
logger.log('info', `Resetting browser state for browser ${browserId} before next run`);
await resetBrowserState(browser);
}
// Update the queued run to running status
await queuedRun.update({
status: 'running',
log: 'Run started - using browser from previous run'
});
// Use user-specific queue
const userQueueName = `execute-run-user-${userId}`;
// Schedule the run execution
await pgBoss.createQueue(userQueueName);
const executeJobId = await pgBoss.send(userQueueName, {
userId: userId,
runId: queuedRun.runId,
browserId: browserId
});
logger.log('info', `Scheduled queued run ${queuedRun.runId} to use browser ${browserId}, job ID: ${executeJobId}`);
return true;
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error);
logger.log('error', `Error checking for queued runs: ${errorMessage}`);
return false;
}
}
/**
* Modified processRunExecution function - only add browser reset
*/
async function processRunExecution(job: Job<ExecuteRunData>) {
try {
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}, browserId: ${data.browserId}`);
const BROWSER_INIT_TIMEOUT = 30000;
const data = job.data;
logger.log('info', `Processing run execution job for runId: ${data.runId}`);
try {
// Find the run
const run = await Run.findOne({ where: { runId: data.runId } });
if (!run) {
@@ -191,6 +102,11 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
const plainRun = run.toJSON();
const browserId = data.browserId || plainRun.browserId;
if (!browserId) {
throw new Error(`No browser ID available for run ${data.runId}`);
}
// Find the recording
const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true });
@@ -231,33 +147,47 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
}
}
// Check for queued runs even if this one failed
await checkAndProcessQueuedRun(data.userId, data.browserId);
return { success: false };
}
logger.log('info', `Looking for browser ${browserId} for run ${data.runId}`);
// Get the browser and execute the run
const browser = browserPool.getRemoteBrowser(plainRun.browserId);
let currentPage = browser?.getCurrentPage();
let browser = browserPool.getRemoteBrowser(browserId);
const browserWaitStart = Date.now();
if (!browser || !currentPage) {
logger.log('error', `Browser or page not available for run ${data.runId}`);
// Even if this run failed, check for queued runs
await checkAndProcessQueuedRun(data.userId, data.browserId);
return { success: false };
while (!browser && (Date.now() - browserWaitStart) < BROWSER_INIT_TIMEOUT) {
logger.log('debug', `Browser ${browserId} not ready yet, waiting...`);
await new Promise(resolve => setTimeout(resolve, 1000));
browser = browserPool.getRemoteBrowser(browserId);
}
if (!browser) {
throw new Error(`Browser ${browserId} not found in pool after timeout`);
}
logger.log('info', `Browser ${browserId} found and ready for execution`);
try {
// Reset the browser state before executing this run
await resetBrowserState(browser);
const isRunAborted = async (): Promise<boolean> => {
const currentRun = await Run.findOne({ where: { runId: data.runId } });
return currentRun ? (currentRun.status === 'aborted' || currentRun.status === 'aborting') : false;
};
let currentPage = browser.getCurrentPage();
const pageWaitStart = Date.now();
while (!currentPage && (Date.now() - pageWaitStart) < 30000) {
logger.log('debug', `Page not ready for browser ${browserId}, waiting...`);
await new Promise(resolve => setTimeout(resolve, 1000));
currentPage = browser.getCurrentPage();
}
if (!currentPage) {
throw new Error(`No current page available for browser ${browserId} after timeout`);
}
logger.log('info', `Starting workflow execution for run ${data.runId}`);
// Execute the workflow
const workflow = AddGeneratedFlags(recording.recording);
@@ -271,12 +201,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
if (await isRunAborted()) {
logger.log('info', `Run ${data.runId} was aborted during execution, not updating status`);
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
if (!queuedRunProcessed) {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
}
await destroyRemoteBrowser(plainRun.browserId, data.userId);
return { success: true };
}
@@ -415,14 +340,8 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
finishedAt: new Date().toLocaleString()
});
// Check for and process queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
// Only destroy the browser if no queued run was found
if (!queuedRunProcessed) {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
}
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
return { success: true };
} catch (executionError: any) {
@@ -477,18 +396,7 @@ async function processRunExecution(job: Job<ExecuteRunData>) {
logger.log('info', `Run ${data.runId} was aborted, not updating status to failed`);
}
// Check for queued runs before destroying the browser
const queuedRunProcessed = await checkAndProcessQueuedRun(data.userId, plainRun.browserId);
// Only destroy the browser if no queued run was found
if (!queuedRunProcessed) {
try {
await destroyRemoteBrowser(plainRun.browserId, data.userId);
logger.log('info', `No queued runs found for browser ${plainRun.browserId}, browser destroyed`);
} catch (cleanupError: any) {
logger.log('warn', `Failed to clean up browser for failed run ${data.runId}: ${cleanupError.message}`);
}
}
await destroyRemoteBrowser(plainRun.browserId, data.userId);
return { success: false };
}
@@ -607,23 +515,14 @@ async function abortRun(runId: string, userId: string): Promise<boolean> {
} catch (socketError) {
logger.log('warn', `Failed to emit run-aborted event: ${socketError}`);
}
let queuedRunProcessed = false;
try {
queuedRunProcessed = await checkAndProcessQueuedRun(userId, plainRun.browserId);
} catch (queueError) {
logger.log('warn', `Error checking queued runs: ${queueError}`);
}
if (!queuedRunProcessed) {
try {
await new Promise(resolve => setTimeout(resolve, 500));
await destroyRemoteBrowser(plainRun.browserId, userId);
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
} catch (cleanupError) {
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
}
try {
await new Promise(resolve => setTimeout(resolve, 500));
await destroyRemoteBrowser(plainRun.browserId, userId);
logger.log('info', `Browser ${plainRun.browserId} destroyed successfully after abort`);
} catch (cleanupError) {
logger.log('warn', `Failed to clean up browser for aborted run ${runId}: ${cleanupError}`);
}
return true;

View File

@@ -11,6 +11,7 @@ import {
getRemoteBrowserCurrentTabs,
getActiveBrowserIdByState,
destroyRemoteBrowser,
canCreateBrowserInState,
} from '../browser-management/controller';
import { chromium } from 'playwright-extra';
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
@@ -181,6 +182,18 @@ router.get('/active', requireSignIn, (req: AuthenticatedRequest, res) => {
return res.send(id);
});
/**
* GET endpoint for checking if the user can create a new remote browser.
*/
router.get('/can-create/:state', requireSignIn, (req: AuthenticatedRequest, res) => {
if (!req.user) {
return res.status(401).send('User not authenticated');
}
const state = req.params.state as "recording" | "run";
const canCreate = canCreateBrowserInState(req.user.id, state);
return res.json({ canCreate });
});
/**
* GET endpoint for getting the current url of the active remote browser.
*/

View File

@@ -1,6 +1,6 @@
import { Router } from 'express';
import logger from "../logger";
import { createRemoteBrowserForRun, getActiveBrowserIdByState } from "../browser-management/controller";
import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserIdByState } from "../browser-management/controller";
import { chromium } from 'playwright-extra';
import stealthPlugin from 'puppeteer-extra-plugin-stealth';
import { browserPool } from "../server";
@@ -517,98 +517,124 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) =>
return res.status(401).send({ error: 'Unauthorized' });
}
const proxyConfig = await getDecryptedProxyConfig(req.user.id);
let proxyOptions: any = {};
if (proxyConfig.proxy_url) {
proxyOptions = {
server: proxyConfig.proxy_url,
...(proxyConfig.proxy_username && proxyConfig.proxy_password && {
username: proxyConfig.proxy_username,
password: proxyConfig.proxy_password,
}),
};
}
console.log(`Proxy config for run: ${JSON.stringify(proxyOptions)}`);
// Generate runId first
const runId = uuid();
// Check if user has reached browser limit
const userBrowserIds = browserPool.getAllBrowserIdsForUser(req.user.id);
const canCreateBrowser = userBrowserIds.length < 2;
if (canCreateBrowser) {
// User has available browser slots, create it directly
const id = createRemoteBrowserForRun(req.user.id);
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(req.user.id, "run");
const run = await Run.create({
status: 'running',
if (canCreateBrowser) {
let browserId: string;
try {
browserId = await createRemoteBrowserForRun(req.user.id);
if (!browserId || browserId.trim() === '') {
throw new Error('Failed to generate valid browser ID');
}
logger.log('info', `Created browser ${browserId} for run ${runId}`);
} catch (browserError: any) {
logger.log('error', `Failed to create browser: ${browserError.message}`);
return res.status(500).send({ error: 'Failed to create browser instance' });
}
try {
await Run.create({
status: 'running',
name: recording.recording_meta.name,
robotId: recording.id,
robotMetaId: recording.recording_meta.id,
startedAt: new Date().toLocaleString(),
finishedAt: '',
browserId: browserId,
interpreterSettings: req.body,
log: '',
runId,
runByUserId: req.user.id,
serializableOutput: {},
binaryOutput: {},
});
logger.log('info', `Created run ${runId} with browser ${browserId}`);
} catch (dbError: any) {
logger.log('error', `Database error creating run: ${dbError.message}`);
try {
await destroyRemoteBrowser(browserId, req.user.id);
} catch (cleanupError: any) {
logger.log('warn', `Failed to cleanup browser after run creation failure: ${cleanupError.message}`);
}
return res.status(500).send({ error: 'Failed to create run record' });
}
try {
const userQueueName = `execute-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, {
userId: req.user.id,
runId: runId,
browserId: browserId,
});
logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`);
} catch (queueError: any) {
logger.log('error', `Failed to queue run execution: ${queueError.message}`);
try {
await Run.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Failed to queue execution job'
}, { where: { runId: runId } });
await destroyRemoteBrowser(browserId, req.user.id);
} catch (cleanupError: any) {
logger.log('warn', `Failed to cleanup after queue error: ${cleanupError.message}`);
}
return res.status(503).send({ error: 'Unable to queue run, please try again later' });
}
return res.send({
browserId: browserId,
runId: runId,
robotMetaId: recording.recording_meta.id,
queued: false
});
} else {
const browserId = uuid();
await Run.create({
status: 'queued',
name: recording.recording_meta.name,
robotId: recording.id,
robotMetaId: recording.recording_meta.id,
startedAt: new Date().toLocaleString(),
finishedAt: '',
browserId: id,
browserId,
interpreterSettings: req.body,
log: '',
log: 'Run queued - waiting for available browser slot',
runId,
runByUserId: req.user.id,
serializableOutput: {},
binaryOutput: {},
});
const plainRun = run.toJSON();
return res.send({
browserId: id,
runId: plainRun.runId,
browserId: browserId,
runId: runId,
robotMetaId: recording.recording_meta.id,
queued: false
queued: true
});
} else {
const browserId = getActiveBrowserIdByState(req.user.id, "run")
if (browserId) {
// User has reached the browser limit, queue the run
try {
// Create the run record with 'queued' status
await Run.create({
status: 'queued',
name: recording.recording_meta.name,
robotId: recording.id,
robotMetaId: recording.recording_meta.id,
startedAt: new Date().toLocaleString(),
finishedAt: '',
browserId: browserId, // Random will be updated later
interpreterSettings: req.body,
log: 'Run queued - waiting for available browser slot',
runId,
runByUserId: req.user.id,
serializableOutput: {},
binaryOutput: {},
});
return res.send({
browserId: browserId,
runId: runId,
robotMetaId: recording.recording_meta.id,
queued: true,
});
} catch (queueError: any) {
logger.log('error', `Failed to queue run job: ${queueError.message}`);
return res.status(503).send({ error: 'Unable to queue run, please try again later' });
}
} else {
logger.log('info', "Browser id does not exist");
return res.send('');
}
}
} catch (e) {
const { message } = e as Error;
logger.log('info', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
return res.send('');
logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`);
return res.status(500).send({ error: 'Internal server error' });
}
});
@@ -919,3 +945,73 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
return res.send(false);
}
});
async function processQueuedRuns() {
try {
const queuedRun = await Run.findOne({
where: { status: 'queued' },
order: [['startedAt', 'ASC']]
});
if (!queuedRun) return;
const userId = queuedRun.runByUserId;
const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run");
if (canCreateBrowser) {
logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`);
const recording = await Robot.findOne({
where: {
'recording_meta.id': queuedRun.robotMetaId
},
raw: true
});
if (!recording) {
await queuedRun.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: 'Recording not found'
});
return;
}
try {
const newBrowserId = await createRemoteBrowserForRun(userId);
logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`);
await queuedRun.update({
status: 'running',
browserId: newBrowserId,
log: 'Browser created and ready for execution'
});
const userQueueName = `execute-run-user-${userId}`;
await pgBoss.createQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, {
userId: userId,
runId: queuedRun.runId,
browserId: newBrowserId,
});
logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`);
} catch (browserError: any) {
logger.log('error', `Failed to create browser for queued run: ${browserError.message}`);
await queuedRun.update({
status: 'failed',
finishedAt: new Date().toLocaleString(),
log: `Failed to create browser: ${browserError.message}`
});
}
}
} catch (error: any) {
logger.log('error', `Error processing queued runs: ${error.message}`);
}
}
export { processQueuedRuns };

View File

@@ -20,6 +20,7 @@ import connectPgSimple from 'connect-pg-simple';
import pg from 'pg';
import session from 'express-session';
import Run from './models/Run';
import { processQueuedRuns } from './routes/storage';
const app = express();
app.use(cors({
@@ -163,6 +164,10 @@ app.use((req, res, next) => {
next();
});
setInterval(() => {
processQueuedRuns();
}, 5000);
server.listen(SERVER_PORT, '0.0.0.0', async () => {
try {
await connectDB();

View File

@@ -35,6 +35,19 @@ export const getActiveBrowserId = async(): Promise<string> => {
}
};
export const canCreateBrowserInState = async(state: "recording" | "run"): Promise<boolean> => {
try {
const response = await axios.get(`${apiUrl}/record/can-create/${state}`, { withCredentials: true });
if (response.status === 200) {
return response.data.canCreate;
} else {
return false;
}
} catch(error: any) {
return false;
}
};
export const interpretCurrentRecording = async(): Promise<boolean> => {
try {
const response = await axios.get(`${apiUrl}/record/interpret`);

View File

@@ -154,6 +154,27 @@ export const editRecordingFromStorage = async (browserId: string, id: string): P
}
};
export interface CreateRunResponseWithQueue extends CreateRunResponse {
queued?: boolean;
}
export const createAndRunRecording = async (id: string, settings: RunSettings): Promise<CreateRunResponseWithQueue> => {
try {
const response = await axios.put(
`${apiUrl}/storage/runs/${id}`,
{ ...settings, withCredentials: true }
);
if (response.status === 200) {
return response.data;
} else {
throw new Error(`Couldn't create and run recording ${id}`);
}
} catch (error: any) {
console.log(error);
return { browserId: '', runId: '', robotMetaId: '', queued: false };
}
}
export const createRunForStoredRecording = async (id: string, settings: RunSettings): Promise<CreateRunResponse> => {
try {
const response = await axios.put(

View File

@@ -39,7 +39,7 @@ import { useGlobalInfoStore } from "../../context/globalInfo";
import { checkRunsForRecording, deleteRecordingFromStorage, getStoredRecordings } from "../../api/storage";
import { Add } from "@mui/icons-material";
import { useNavigate } from 'react-router-dom';
import { getActiveBrowserId, stopRecording } from "../../api/recording";
import { canCreateBrowserInState, getActiveBrowserId, stopRecording } from "../../api/recording";
import { GenericModal } from '../ui/GenericModal';
declare global {
@@ -274,11 +274,16 @@ export const RecordingsTable = ({
}, [setRecordings, notify, t]);
const handleNewRecording = useCallback(async () => {
const activeBrowserId = await getActiveBrowserId();
const canCreateRecording = await canCreateBrowserInState("recording");
if (activeBrowserId) {
setActiveBrowserId(activeBrowserId);
setWarningModalOpen(true);
if (!canCreateRecording) {
const activeBrowserId = await getActiveBrowserId();
if (activeBrowserId) {
setActiveBrowserId(activeBrowserId);
setWarningModalOpen(true);
} else {
notify('warning', t('recordingtable.notifications.browser_limit_warning'));
}
} else {
setModalOpen(true);
}
@@ -314,7 +319,6 @@ export const RecordingsTable = ({
};
const handleRetrainRobot = useCallback(async (id: string, name: string) => {
const activeBrowserId = await getActiveBrowserId();
const robot = rows.find(row => row.id === id);
let targetUrl;
@@ -340,11 +344,18 @@ export const RecordingsTable = ({
window.sessionStorage.setItem('initialUrl', targetUrl);
}
if (activeBrowserId) {
setActiveBrowserId(activeBrowserId);
setWarningModalOpen(true);
const canCreateRecording = await canCreateBrowserInState("recording");
if (!canCreateRecording) {
const activeBrowserId = await getActiveBrowserId();
if (activeBrowserId) {
setActiveBrowserId(activeBrowserId);
setWarningModalOpen(true);
} else {
notify('warning', t('recordingtable.notifications.browser_limit_warning'));
}
} else {
startRetrainRecording(id, name, targetUrl);
startRetrainRecording(id, name, targetUrl);
}
}, [rows, setInitialUrl, setRecordingUrl]);

View File

@@ -1,4 +1,4 @@
import React, { useCallback, useEffect } from 'react';
import React, { useCallback, useContext, useEffect } from 'react';
import { useTranslation } from 'react-i18next';
import { MainMenu } from "../components/dashboard/MainMenu";
import { Stack } from "@mui/material";
@@ -7,13 +7,14 @@ import { Runs } from "../components/run/Runs";
import ProxyForm from '../components/proxy/ProxyForm';
import ApiKey from '../components/api/ApiKey';
import { useGlobalInfoStore } from "../context/globalInfo";
import { createRunForStoredRecording, interpretStoredRecording, notifyAboutAbort, scheduleStoredRecording } from "../api/storage";
import { createAndRunRecording, createRunForStoredRecording, CreateRunResponseWithQueue, interpretStoredRecording, notifyAboutAbort, scheduleStoredRecording } from "../api/storage";
import { io, Socket } from "socket.io-client";
import { stopRecording } from "../api/recording";
import { RunSettings } from "../components/run/RunSettings";
import { ScheduleSettings } from "../components/robot/ScheduleSettings";
import { apiUrl } from "../apiConfig";
import { useNavigate } from 'react-router-dom';
import { AuthContext } from '../context/auth';
interface MainPageProps {
handleEditRecording: (id: string, fileName: string) => void;
@@ -43,12 +44,16 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
runId: '',
robotMetaId: ''
});
const [queuedRuns, setQueuedRuns] = React.useState<Set<string>>(new Set());
let aborted = false;
const { notify, setRerenderRuns, setRecordingId } = useGlobalInfoStore();
const navigate = useNavigate();
const { state } = useContext(AuthContext);
const { user } = state;
const abortRunHandler = (runId: string) => {
aborted = true;
notifyAboutAbort(runId).then(async (response) => {
@@ -90,48 +95,109 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
}, [currentInterpretationLog])
const handleRunRecording = useCallback((settings: RunSettings) => {
createRunForStoredRecording(runningRecordingId, settings).then(({ browserId, runId, robotMetaId }: CreateRunResponse) => {
createAndRunRecording(runningRecordingId, settings).then((response: CreateRunResponseWithQueue) => {
const { browserId, runId, robotMetaId, queued } = response;
setIds({ browserId, runId, robotMetaId });
navigate(`/runs/${robotMetaId}/run/${runId}`);
const socket =
io(`${apiUrl}/${browserId}`, {
if (queued) {
console.log('Creating queue socket for queued run:', runId);
setQueuedRuns(prev => new Set([...prev, runId]));
const queueSocket = io(`${apiUrl}/queued-run`, {
transports: ["websocket"],
rejectUnauthorized: false,
query: { userId: user?.id }
});
queueSocket.on('connect', () => {
console.log('Queue socket connected for user:', user?.id);
});
queueSocket.on('connect_error', (error) => {
console.log('Queue socket connection error:', error);
});
queueSocket.on('run-completed', (completionData) => {
if (completionData.runId === runId) {
setRunningRecordingName('');
setCurrentInterpretationLog('');
setRerenderRuns(true);
setQueuedRuns(prev => {
const newSet = new Set(prev);
newSet.delete(runId);
return newSet;
});
const robotName = completionData.robotName || runningRecordingName;
if (completionData.status === 'success') {
notify('success', t('main_page.notifications.interpretation_success', { name: robotName }));
} else {
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
}
queueSocket.disconnect();
}
});
setSockets(sockets => [...sockets, queueSocket]);
notify('info', `Run queued: ${runningRecordingName}`);
} else {
const socket = io(`${apiUrl}/${browserId}`, {
transports: ["websocket"],
rejectUnauthorized: false
});
setSockets(sockets => [...sockets, socket]);
socket.on('ready-for-run', () => readyForRunHandler(browserId, runId));
socket.on('debugMessage', debugMessageHandler);
socket.on('run-completed', (data) => {
setRerenderRuns(true);
const robotName = data.robotName;
setSockets(sockets => [...sockets, socket]);
if (data.status === 'success') {
notify('success', t('main_page.notifications.interpretation_success', { name: robotName }));
socket.on('debugMessage', debugMessageHandler);
socket.on('run-completed', (data) => {
setRunningRecordingName('');
setCurrentInterpretationLog('');
setRerenderRuns(true);
const robotName = data.robotName;
if (data.status === 'success') {
notify('success', t('main_page.notifications.interpretation_success', { name: robotName }));
} else {
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
}
});
socket.on('connect_error', (error) => {
console.log('error', `Failed to connect to browser ${browserId}: ${error}`);
notify('error', t('main_page.notifications.connection_failed', { name: runningRecordingName }));
});
socket.on('disconnect', (reason) => {
console.log('warn', `Disconnected from browser ${browserId}: ${reason}`);
});
if (runId) {
notify('info', t('main_page.notifications.run_started', { name: runningRecordingName }));
} else {
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
notify('error', t('main_page.notifications.run_start_failed', { name: runningRecordingName }));
}
});
socket.on('run-aborted', (data) => {
setRerenderRuns(true);
const abortedRobotName = data.robotName;
notify('success', t('main_page.notifications.abort_success', { name: abortedRobotName }));
});
setContent('runs');
if (browserId) {
notify('info', t('main_page.notifications.run_started', { name: runningRecordingName }));
} else {
notify('error', t('main_page.notifications.run_start_failed', { name: runningRecordingName }));
}
})
return (socket: Socket, browserId: string, runId: string) => {
socket.off('ready-for-run', () => readyForRunHandler(browserId, runId));
setContent('runs');
}).catch((error: any) => {
console.error('Error in createAndRunRecording:', error); // ✅ Debug log
});
return (socket: Socket) => {
socket.off('debugMessage', debugMessageHandler);
socket.off('run-completed');
socket.off('connect_error');
socket.off('disconnect');
}
}, [runningRecordingName, sockets, ids, readyForRunHandler, debugMessageHandler])
}, [runningRecordingName, sockets, ids, debugMessageHandler, user?.id, t, notify, setRerenderRuns, setQueuedRuns, navigate, setContent, setIds]);
const handleScheduleRecording = (settings: ScheduleSettings) => {
scheduleStoredRecording(runningRecordingId, settings)