feat: send jobs and wait for completion

This commit is contained in:
Rohit
2025-03-10 16:04:12 +05:30
parent 65cb201474
commit be1b90feef

View File

@@ -17,7 +17,7 @@ import stealthPlugin from 'puppeteer-extra-plugin-stealth';
import logger from "../logger";
import { getDecryptedProxyConfig } from './proxy';
import { requireSignIn } from '../middlewares/auth';
import { pgBoss } from '../server'; // Import pgBoss reference
import { pgBoss } from '../pgboss-worker';
export const router = Router();
chromium.use(stealthPlugin());
@@ -26,6 +26,41 @@ export interface AuthenticatedRequest extends Request {
user?: any;
}
async function waitForJobCompletion(jobId: string, queueName: string, timeout = 15000): Promise<any> {
return new Promise((resolve, reject) => {
const startTime = Date.now();
const checkJobStatus = async () => {
if (Date.now() - startTime > timeout) {
return reject(new Error(`Timeout waiting for job ${jobId} to complete`));
}
try {
const job = await pgBoss.getJobById(queueName, jobId);
if (!job) {
return reject(new Error(`Job ${jobId} not found`));
}
if (job.state === 'completed') {
return resolve(job.output);
}
if (job.state === 'failed') {
return reject(new Error(`Job ${jobId} failed.`));
}
setTimeout(checkJobStatus, 200);
} catch (error) {
reject(error);
}
};
// Start checking
checkJobStatus();
});
}
/**
* Logs information about remote browser recording session.
*/
@@ -34,33 +69,52 @@ router.all('/', requireSignIn, (req, res, next) => {
next() // pass control to the next handler
})
/**
* GET endpoint for starting the remote browser recording session.
* returns session's id or job id
* GET endpoint for starting the remote browser recording session
* Waits for job completion
*/
router.get('/start', requireSignIn, async (req: AuthenticatedRequest, res: Response) => {
if (!req.user) {
return res.status(401).send('User not authenticated');
}
try {
const job = await pgBoss.send('initialize-browser-recording', {
userId: req.user.id
await pgBoss.createQueue('initialize-browser-recording');
const jobId = await pgBoss.send('initialize-browser-recording', {
userId: req.user.id,
timestamp: new Date().toISOString()
});
if (!job) {
if (!jobId) {
logger.log('warn', 'pgBoss.send returned null, falling back to direct initialization');
const browserId = initializeRemoteBrowserForRecording(req.user.id);
return res.send(browserId);
return res.send( browserId );
}
logger.log('info', `Queued browser initialization job: ${job}`);
return res.send(job);
logger.log('info', `Queued browser initialization job: ${jobId}, waiting for completion...`);
try {
const result = await waitForJobCompletion(jobId, 'initialize-browser-recording', 15000);
if (result && result.browserId) {
logger.log('info', `Job completed with browserId: ${result.browserId}`);
return res.send(result.browserId);
} else {
logger.log('warn', 'Job completed but returned unexpected result');
return res.send(jobId);
}
} catch (waitError: any) {
logger.log('warn', `Error waiting for job completion: ${waitError.message}`);
return res.send(jobId);
}
} catch (error: any) {
logger.log('error', `Failed to queue browser initialization job: ${error.message}`);
try {
const browserId = initializeRemoteBrowserForRecording(req.user.id);
return res.send(browserId);
return res.send( browserId );
} catch (directError: any) {
logger.log('error', `Direct initialization also failed: ${directError.message}`);
return res.status(500).send('Failed to start recording');
@@ -70,116 +124,54 @@ router.get('/start', requireSignIn, async (req: AuthenticatedRequest, res: Respo
/**
* POST endpoint for starting the remote browser recording session accepting browser launch options.
* returns session's id or job id
* returns session's id
*/
router.post('/start', requireSignIn, async (req: AuthenticatedRequest, res: Response) => {
router.post('/start', requireSignIn, (req: AuthenticatedRequest, res:Response) => {
if (!req.user) {
return res.status(401).send('User not authenticated');
}
try {
const job = await pgBoss.send('initialize-browser-recording', {
userId: req.user.id
});
if (!job) {
logger.log('warn', 'pgBoss.send returned null, falling back to direct initialization');
const browserId = initializeRemoteBrowserForRecording(req.user.id);
return res.send(browserId);
}
logger.log('info', `Queued browser initialization job: ${job}`);
return res.send(job);
} catch (error: any) {
logger.log('error', `Failed to queue browser initialization job: ${error.message}`);
try {
const browserId = initializeRemoteBrowserForRecording(req.user.id);
return res.send(browserId);
} catch (directError: any) {
logger.log('error', `Direct initialization also failed: ${directError.message}`);
return res.status(500).send('Failed to start recording');
}
}
});
/**
* GET endpoint for getting the job status of a browser initialization job
*/
router.get('/job-status/:jobId', requireSignIn, async (req, res) => {
try {
logger.log('debug', `Checking status for job ${req.params.jobId}`);
const job = await pgBoss.getJobById("job-status", req.params.jobId);
if (!job) {
logger.log('warn', `Job ${req.params.jobId} not found`);
return res.status(404).send('Job not found');
}
logger.log('debug', `Job state: ${job.state}, hasOutput: ${!!job.output}`);
if (job.state === 'completed' && job.output) {
const output = job.output as { browserId?: string };
if (output.browserId) {
logger.log('info', `Job completed with browserId: ${output.browserId}`);
} else {
logger.log('warn', `Job completed but missing browserId in output`);
}
return res.send(output); // Return the browser ID from the completed job
}
return res.send({
state: job.state,
createdAt: job.createdOn,
startedAt: job.startedOn || null
});
} catch (error: any) {
logger.log('error', `Failed to get job status: ${error.message}`);
return res.status(500).send('Failed to get job status');
}
const id = initializeRemoteBrowserForRecording(req.user.id);
return res.send(id);
});
/**
* GET endpoint for terminating the remote browser recording session.
* returns whether the termination was successful
*/
router.get('/stop/:browserId', requireSignIn, async (req, res) => {
router.get('/stop/:browserId', requireSignIn, async (req: AuthenticatedRequest, res) => {
if (!req.user) {
return res.status(401).send('User not authenticated');
}
try {
if (req.params.browserId.startsWith('job_')) {
logger.log('debug', `Stopping job ${req.params.browserId}`);
try {
const job = await pgBoss.getJobById("stop", req.params.browserId);
if (job && job.state === 'completed' && job.output) {
const output = job.output as { browserId?: string };
if (output.browserId) {
await pgBoss.send('destroy-browser', {
browserId: output.browserId
});
logger.log('info', `Queued destroy job for browser ${output.browserId}`);
return res.send(true);
}
} else if (job && (job.state === 'created' || job.state === 'active')) {
await pgBoss.cancel("cancel", req.params.browserId);
logger.log('info', `Cancelled job ${req.params.browserId}`);
return res.send(true);
}
} catch (jobError: any) {
logger.log('error', `Error handling job termination: ${jobError.message}`);
}
}
await pgBoss.createQueue('destroy-browser');
const jobId = await pgBoss.send('destroy-browser', {
browserId: req.params.browserId,
timestamp: new Date().toISOString()
});
if (!jobId) {
logger.log('warn', 'pgBoss.send returned null, falling back to direct destruction');
const browserId = initializeRemoteBrowserForRecording(req.user.id);
return res.send( browserId );
}
logger.log('info', `Queued browser destruction job: ${jobId}, waiting for completion...`);
try {
await pgBoss.send('destroy-browser', {
browserId: req.params.browserId
});
logger.log('info', `Queued destroy job for browser ${req.params.browserId}`);
return res.send(true);
} catch (queueError: any) {
logger.log('error', `Failed to queue destroy job: ${queueError.message}`);
const result = await waitForJobCompletion(jobId, 'destroy-browser', 15000);
const success = await destroyRemoteBrowser(req.params.browserId);
logger.log('info', `Direct browser destruction result: ${success}`);
return res.send(success);
if (result) {
logger.log('info', `Browser destruction job completed with result: ${result.success}`);
return res.send(result.success);
} else {
logger.log('warn', 'Job completed but returned unexpected result');
return res.send(false);
}
} catch (waitError: any) {
logger.log('warn', `Error waiting for job completion: ${waitError.message}`);
return res.send(false);
}
} catch (error: any) {
logger.log('error', `Failed to stop browser: ${error.message}`);
@@ -222,55 +214,27 @@ router.get('/active/tabs', requireSignIn, (req, res) => {
/**
* GET endpoint for starting an interpretation of the currently generated workflow.
*/
router.get('/interpret', requireSignIn, async (req, res) => {
router.get('/interpret', requireSignIn, async (req: AuthenticatedRequest, res) => {
try {
const job = await pgBoss.send('interpret-workflow', {});
if (!job) {
logger.log('warn', 'pgBoss.send returned null for interpret, falling back to direct interpretation');
await interpretWholeWorkflow();
return res.send('interpretation complete (direct)');
}
logger.log('info', `Queued interpretation job: ${job}`);
return res.send('interpretation queued');
} catch (error: any) {
logger.log('error', `Failed to queue interpretation job: ${error.message}`);
try {
await interpretWholeWorkflow();
return res.send('interpretation complete (fallback)');
} catch (directError: any) {
return res.status(500).send('interpretation failed');
if (!req.user) {
return res.status(401).send('User not authenticated');
}
await interpretWholeWorkflow();
return res.send('interpretation done');
} catch (e) {
return res.send('interpretation failed');
}
});
/**
* GET endpoint for stopping an ongoing interpretation of the currently generated workflow.
*/
router.get('/interpret/stop', requireSignIn, async (req, res) => {
try {
const job = await pgBoss.send('stop-interpretation', {});
if (!job) {
logger.log('warn', 'pgBoss.send returned null for stop-interpretation, falling back to direct stop');
await stopRunningInterpretation();
return res.send('interpretation stopped (direct)');
}
logger.log('info', `Queued stop interpretation job: ${job}`);
return res.send('interpretation stop queued');
} catch (error: any) {
logger.log('error', `Failed to queue stop interpretation job: ${error.message}`);
try {
await stopRunningInterpretation();
return res.send('interpretation stopped (fallback)');
} catch (directError: any) {
return res.status(500).send('interpretation stop failed');
}
router.get('/interpret/stop', requireSignIn, async (req: AuthenticatedRequest, res) => {
if (!req.user) {
return res.status(401).send('User not authenticated');
}
await stopRunningInterpretation();
return res.send('interpretation stopped');
});
export default router;