feat(temp): use in-memory store

This commit is contained in:
karishmas6
2024-09-19 17:39:33 +05:30
parent 1bc3b1d35f
commit 01295deb33

View File

@@ -7,12 +7,13 @@ import logger from "../logger";
import { deleteFile, readFile, readFiles, saveFile } from "../workflow-management/storage"; import { deleteFile, readFile, readFiles, saveFile } from "../workflow-management/storage";
import { createRemoteBrowserForRun, destroyRemoteBrowser } from "../browser-management/controller"; import { createRemoteBrowserForRun, destroyRemoteBrowser } from "../browser-management/controller";
import { chromium } from "playwright"; import { chromium } from "playwright";
import { browserPool, io } from "../server"; import { browserPool } from "../server";
import fs from "fs"; import fs from "fs";
import { uuid } from "uuidv4"; import { uuid } from "uuidv4";
import { workflowQueue } from '../workflow-management/scheduler'; import { workflowQueue } from '../workflow-management/scheduler';
import moment from 'moment-timezone'; import moment from 'moment-timezone';
import cron from 'node-cron'; import cron from 'node-cron';
import { updateGoogleSheet } from './integration';
export const router = Router(); export const router = Router();
@@ -20,7 +21,7 @@ export const router = Router();
* Logs information about recordings API. * Logs information about recordings API.
*/ */
router.all('/', (req, res, next) => { router.all('/', (req, res, next) => {
logger.log('debug',`The recordings API was invoked: ${req.url}`) logger.log('debug', `The recordings API was invoked: ${req.url}`)
next() // pass control to the next handler next() // pass control to the next handler
}) })
@@ -45,7 +46,7 @@ router.delete('/recordings/:fileName', async (req, res) => {
await deleteFile(`./../storage/recordings/${req.params.fileName}.waw.json`); await deleteFile(`./../storage/recordings/${req.params.fileName}.waw.json`);
return res.send(true); return res.send(true);
} catch (e) { } catch (e) {
const {message} = e as Error; const { message } = e as Error;
logger.log('info', `Error while deleting a recording with name: ${req.params.fileName}.waw.json`); logger.log('info', `Error while deleting a recording with name: ${req.params.fileName}.waw.json`);
return res.send(false); return res.send(false);
} }
@@ -72,7 +73,7 @@ router.delete('/runs/:fileName', async (req, res) => {
await deleteFile(`./../storage/runs/${req.params.fileName}.json`); await deleteFile(`./../storage/runs/${req.params.fileName}.json`);
return res.send(true); return res.send(true);
} catch (e) { } catch (e) {
const {message} = e as Error; const { message } = e as Error;
logger.log('info', `Error while deleting a run with name: ${req.params.fileName}.json`); logger.log('info', `Error while deleting a run with name: ${req.params.fileName}.json`);
return res.send(false); return res.send(false);
} }
@@ -117,7 +118,7 @@ router.put('/runs/:fileName', async (req, res) => {
runId: runId, runId: runId,
}); });
} catch (e) { } catch (e) {
const {message} = e as Error; const { message } = e as Error;
logger.log('info', `Error while creating a run with name: ${req.params.fileName}.json`); logger.log('info', `Error while creating a run with name: ${req.params.fileName}.json`);
return res.send(''); return res.send('');
} }
@@ -139,6 +140,16 @@ router.get('/runs/run/:fileName/:runId', async (req, res) => {
} }
}); });
interface GoogleSheetUpdateTask {
name: string;
runId: string;
status: 'pending' | 'completed';
}
let googleSheetUpdateTasks: { [runId: string]: GoogleSheetUpdateTask } = {};
console.log('googleSheetUpdateTasks:', googleSheetUpdateTasks);
/** /**
* PUT endpoint for finishing a run and saving it to the storage. * PUT endpoint for finishing a run and saving it to the storage.
*/ */
@@ -167,28 +178,36 @@ router.post('/runs/run/:fileName/:runId', async (req, res) => {
} }
})(); })();
await destroyRemoteBrowser(parsedRun.browserId); await destroyRemoteBrowser(parsedRun.browserId);
const run_meta = { const run_meta = {
...parsedRun, ...parsedRun,
status: interpretationInfo.result, status: 'success',
finishedAt: new Date().toLocaleString(), finishedAt: new Date().toLocaleString(),
duration: durString, duration: durString,
browserId: parsedRun.browserId, browserId: parsedRun.browserId,
log: interpretationInfo.log.join('\n'), log: interpretationInfo.log.join('\n'),
serializableOutput: interpretationInfo.serializableOutput, serializableOutput: interpretationInfo.serializableOutput,
binaryOutput: interpretationInfo.binaryOutput, binaryOutput: interpretationInfo.binaryOutput,
}; };
fs.mkdirSync('../storage/runs', { recursive: true }) fs.mkdirSync('../storage/runs', { recursive: true })
await saveFile( await saveFile(
`../storage/runs/${parsedRun.name}_${req.params.runId}.json`, `../storage/runs/${parsedRun.name}_${req.params.runId}.json`,
JSON.stringify(run_meta, null, 2) JSON.stringify(run_meta, null, 2)
); );
io.emit('run-finished', { success: true });
return res.send(true); res.send(true);
} else {
throw new Error('Could not destroy browser'); googleSheetUpdateTasks[req.params.runId] = {
} name: parsedRun.name,
runId: req.params.runId,
status: 'pending',
};
return;
} else {
throw new Error('Could not destroy browser');
}
} catch (e) { } catch (e) {
const {message} = e as Error; const { message } = e as Error;
logger.log('info', `Error while running a recording with name: ${req.params.fileName}_${req.params.runId}.json`); logger.log('info', `Error while running a recording with name: ${req.params.fileName}_${req.params.runId}.json`);
return res.send(false); return res.send(false);
} }
@@ -198,12 +217,12 @@ router.put('/schedule/:fileName/', async (req, res) => {
console.log(req.body); console.log(req.body);
try { try {
const { fileName } = req.params; const { fileName } = req.params;
const { const {
runEvery, runEvery,
runEveryUnit, runEveryUnit,
startFrom, startFrom,
atTime, atTime,
timezone timezone
} = req.body; } = req.body;
if (!fileName || !runEvery || !runEveryUnit || !startFrom || !atTime || !timezone) { if (!fileName || !runEvery || !runEveryUnit || !startFrom || !atTime || !timezone) {
@@ -258,7 +277,7 @@ router.put('/schedule/:fileName/', async (req, res) => {
await workflowQueue.add( await workflowQueue.add(
'run workflow', 'run workflow',
{ fileName, runId }, { fileName, runId },
{ {
repeat: { repeat: {
pattern: cronExpression, pattern: cronExpression,
tz: timezone tz: timezone
@@ -266,8 +285,8 @@ router.put('/schedule/:fileName/', async (req, res) => {
} }
); );
res.status(200).json({ res.status(200).json({
message: 'success', message: 'success',
runId, runId,
// cronExpression, // cronExpression,
// nextRunTime: getNextRunTime(cronExpression, timezone) // nextRunTime: getNextRunTime(cronExpression, timezone)
@@ -291,11 +310,9 @@ router.put('/schedule/:fileName/', async (req, res) => {
*/ */
router.post('/runs/abort/:fileName/:runId', async (req, res) => { router.post('/runs/abort/:fileName/:runId', async (req, res) => {
try { try {
// read the run from storage
const run = await readFile(`./../storage/runs/${req.params.fileName}_${req.params.runId}.json`) const run = await readFile(`./../storage/runs/${req.params.fileName}_${req.params.runId}.json`)
const parsedRun = JSON.parse(run); const parsedRun = JSON.parse(run);
//get current log
const browser = browserPool.getRemoteBrowser(parsedRun.browserId); const browser = browserPool.getRemoteBrowser(parsedRun.browserId);
const currentLog = browser?.interpreter.debugMessages.join('/n'); const currentLog = browser?.interpreter.debugMessages.join('/n');
const serializableOutput = browser?.interpreter.serializableData.reduce((reducedObject, item, index) => { const serializableOutput = browser?.interpreter.serializableData.reduce((reducedObject, item, index) => {
@@ -326,8 +343,33 @@ router.post('/runs/abort/:fileName/:runId', async (req, res) => {
); );
return res.send(true); return res.send(true);
} catch (e) { } catch (e) {
const {message} = e as Error; const { message } = e as Error;
logger.log('info', `Error while running a recording with name: ${req.params.fileName}_${req.params.runId}.json`); logger.log('info', `Error while running a recording with name: ${req.params.fileName}_${req.params.runId}.json`);
return res.send(false); return res.send(false);
} }
}); });
const processGoogleSheetUpdates = async () => {
while (true) {
for (const runId in googleSheetUpdateTasks) {
const task = googleSheetUpdateTasks[runId];
if (task.status === 'pending') {
try {
try {
await updateGoogleSheet(task.name, task.runId);
console.log(`Successfully updated Google Sheets for run ${task.runId}`);
googleSheetUpdateTasks[runId].status = 'completed';
} catch (error: any) {
console.error(`update google sheet error ${task.runId}:`, error);
}
} catch (error: any) {
console.error(`Failed to update Google Sheets for run ${task.runId}:`, error);
}
}
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
};
processGoogleSheetUpdates();