feat: use pgBossClient

This commit is contained in:
amhsirak
2025-11-28 15:51:45 +05:30
parent cbe5685dd1
commit ffe248810d

View File

@@ -15,8 +15,8 @@ import { computeNextRun } from '../utils/schedule';
import { capture } from "../utils/analytics"; import { capture } from "../utils/analytics";
import { encrypt, decrypt } from '../utils/auth'; import { encrypt, decrypt } from '../utils/auth';
import { WorkflowFile } from 'maxun-core'; import { WorkflowFile } from 'maxun-core';
import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; import { cancelScheduledWorkflow, scheduleWorkflow } from '../storage/schedule';
import { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue } from '../pgboss-worker'; import { pgBossClient } from '../storage/pgboss';
chromium.use(stealthPlugin()); chromium.use(stealthPlugin());
export const router = Router(); export const router = Router();
@@ -590,7 +590,7 @@ router.delete('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res)
* PUT endpoint for starting a remote browser instance and saving run metadata to the storage. * PUT endpoint for starting a remote browser instance and saving run metadata to the storage.
* Making it ready for interpretation and returning a runId. * Making it ready for interpretation and returning a runId.
* *
* If the user has reached their browser limit, the run will be queued using PgBoss. * If the user has reached their browser limit, the run will be queued using pgBossClient.
*/ */
router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => {
try { try {
@@ -664,10 +664,9 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) =>
try { try {
const userQueueName = `execute-run-user-${req.user.id}`; const userQueueName = `execute-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName); await pgBossClient.createQueue(userQueueName);
await registerWorkerForQueue(userQueueName);
const jobId = await pgBossClient.send(userQueueName, {
const jobId = await pgBoss.send(userQueueName, {
userId: req.user.id, userId: req.user.id,
runId: runId, runId: runId,
browserId: browserId, browserId: browserId,
@@ -782,10 +781,9 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re
const userQueueName = `execute-run-user-${req.user.id}`; const userQueueName = `execute-run-user-${req.user.id}`;
// Queue the execution job // Queue the execution job
await pgBoss.createQueue(userQueueName); await pgBossClient.createQueue(userQueueName);
await registerWorkerForQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, { const jobId = await pgBossClient.send(userQueueName, {
userId: req.user.id, userId: req.user.id,
runId: req.params.id, runId: req.params.id,
browserId: plainRun.browserId browserId: plainRun.browserId
@@ -974,7 +972,7 @@ router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest,
return res.status(404).json({ error: 'Robot not found' }); return res.status(404).json({ error: 'Robot not found' });
} }
// Cancel the scheduled job in PgBoss // Cancel the scheduled job in pgBossClient
try { try {
await cancelScheduledWorkflow(id); await cancelScheduledWorkflow(id);
} catch (error) { } catch (error) {
@@ -1055,10 +1053,9 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest,
} }
const userQueueName = `abort-run-user-${req.user.id}`; const userQueueName = `abort-run-user-${req.user.id}`;
await pgBoss.createQueue(userQueueName); await pgBossClient.createQueue(userQueueName);
await registerAbortWorkerForQueue(userQueueName);
const jobId = await pgBossClient.send(userQueueName, {
const jobId = await pgBoss.send(userQueueName, {
userId: req.user.id, userId: req.user.id,
runId: req.params.id runId: req.params.id
}); });
@@ -1123,10 +1120,9 @@ async function processQueuedRuns() {
}); });
const userQueueName = `execute-run-user-${userId}`; const userQueueName = `execute-run-user-${userId}`;
await pgBoss.createQueue(userQueueName); await pgBossClient.createQueue(userQueueName);
await registerWorkerForQueue(userQueueName);
const jobId = await pgBoss.send(userQueueName, { const jobId = await pgBossClient.send(userQueueName, {
userId: userId, userId: userId,
runId: queuedRun.runId, runId: queuedRun.runId,
browserId: newBrowserId, browserId: newBrowserId,