diff --git a/package.json b/package.json index 0daed06c..1a6d7d8b 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "jsonwebtoken": "^9.0.2", "loglevel": "^1.8.0", "loglevel-plugin-remote": "^0.6.8", + "minio": "^8.0.1", "moment-timezone": "^0.5.45", "node-cron": "^3.0.3", "pg": "^8.13.0", diff --git a/server/src/api/record.ts b/server/src/api/record.ts index 5b2e8451..20e38928 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -11,6 +11,7 @@ import { createRemoteBrowserForRun, destroyRemoteBrowser } from "../browser-mana import logger from "../logger"; import { browserPool } from "../server"; import { io, Socket } from "socket.io-client"; +import { BinaryOutputService } from "../storage/mino"; const formatRecording = (recordingData: any) => { const recordingMeta = recordingData.recording_meta; @@ -307,6 +308,9 @@ async function executeRun(id: string) { recording.recording, currentPage, plainRun.interpreterSettings ); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + await destroyRemoteBrowser(plainRun.browserId); const updatedRun = await run.update({ @@ -316,7 +320,7 @@ async function executeRun(id: string) { browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), serializableOutput: interpretationInfo.serializableOutput, - binaryOutput: interpretationInfo.binaryOutput, + binaryOutput: uploadedBinaryOutput, }); return { diff --git a/server/src/models/Robot.ts b/server/src/models/Robot.ts index 2400a917..ed9d7780 100644 --- a/server/src/models/Robot.ts +++ b/server/src/models/Robot.ts @@ -1,5 +1,5 @@ import { Model, DataTypes, Optional } from 'sequelize'; -import sequelize from '../db/config'; +import sequelize from '../storage/db'; import { WorkflowFile, Where, What, WhereWhatPair } from 'maxun-core'; interface RobotMeta { diff --git a/server/src/models/Run.ts b/server/src/models/Run.ts index 5c14dfd9..1c8e4dc6 100644 --- a/server/src/models/Run.ts +++ b/server/src/models/Run.ts @@ -1,11 +1,7 @@ import { Model, DataTypes, Optional } from 'sequelize'; -import sequelize from '../db/config'; +import sequelize from '../storage/db'; import Robot from './Robot'; -// TODO: -// 1. rename variables -// 2. we might not need interpreter settings? -// 3. store binaryOutput in MinIO interface InterpreterSettings { maxConcurrency: number; maxRepeats: number; @@ -25,7 +21,7 @@ interface RunAttributes { log: string; runId: string; serializableOutput: Record; - binaryOutput: Record; + binaryOutput: Record; } interface RunCreationAttributes extends Optional { } @@ -104,6 +100,7 @@ Run.init( binaryOutput: { type: DataTypes.JSONB, allowNull: true, + defaultValue: {}, }, }, { diff --git a/server/src/models/User.ts b/server/src/models/User.ts index 83156fd2..b50c1e0f 100644 --- a/server/src/models/User.ts +++ b/server/src/models/User.ts @@ -1,5 +1,5 @@ import { DataTypes, Model, Optional } from 'sequelize'; -import sequelize from '../db/config'; +import sequelize from '../storage/db'; interface UserAttributes { id: number; diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index b43713f6..e7e92e64 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -11,6 +11,7 @@ import { getDecryptedProxyConfig } from './proxy'; import { requireSignIn } from '../middlewares/auth'; import Robot from '../models/Robot'; import Run from '../models/Run'; +import { BinaryOutputService } from '../storage/mino'; import { workflowQueue } from '../worker'; export const router = Router(); @@ -189,6 +190,8 @@ router.post('/runs/run/:id', requireSignIn, async (req, res) => { if (browser && currentPage) { const interpretationInfo = await browser.interpreter.InterpretRecording( recording.recording, currentPage, plainRun.interpreterSettings); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); await destroyRemoteBrowser(plainRun.browserId); await run.update({ ...run, @@ -197,7 +200,7 @@ router.post('/runs/run/:id', requireSignIn, async (req, res) => { browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), serializableOutput: interpretationInfo.serializableOutput, - binaryOutput: interpretationInfo.binaryOutput, + binaryOutput: uploadedBinaryOutput, }); googleSheetUpdateTasks[req.params.id] = { name: plainRun.name, @@ -279,16 +282,16 @@ router.put('/schedule/:id/', requireSignIn, async (req, res) => { const runId = uuid(); const userId = req.user.id; - await workflowQueue.add( - 'run workflow', + await workflowQueue.add( + 'run workflow', { id, runId, userId }, - { + { repeat: { pattern: cronExpression, tz: timezone - } + } } - ); + ); res.status(200).json({ message: 'success', diff --git a/server/src/server.ts b/server/src/server.ts index da5d3cbc..eb435345 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -7,7 +7,7 @@ dotenv.config(); import { record, workflow, storage, auth, integration, proxy } from './routes'; import { BrowserPool } from "./browser-management/classes/BrowserPool"; import logger from './logger'; -import { connectDB, syncDB } from './db/config'; +import { connectDB, syncDB } from './storage/db' import bodyParser from 'body-parser'; import cookieParser from 'cookie-parser'; import csrf from 'csurf'; @@ -62,7 +62,7 @@ const workerProcess = fork(path.resolve(__dirname, './worker.ts')); workerProcess.on('message', (message) => { console.log(`Message from worker: ${message}`); }); - workerProcess.on('error', (error) => { +workerProcess.on('error', (error) => { console.error(`Error in worker: ${error}`); }); workerProcess.on('exit', (code) => { @@ -81,6 +81,6 @@ server.listen(SERVER_PORT, async () => { process.on('SIGINT', () => { console.log('Main app shutting down...'); - workerProcess.kill(); + //workerProcess.kill(); process.exit(); }); diff --git a/server/src/storage/db.ts b/server/src/storage/db.ts new file mode 100644 index 00000000..56c68d8b --- /dev/null +++ b/server/src/storage/db.ts @@ -0,0 +1,35 @@ +import { Sequelize } from 'sequelize'; +import dotenv from 'dotenv'; +import setupAssociations from '../models/associations'; + +dotenv.config(); +const sequelize = new Sequelize( + `postgresql://${process.env.DB_USER}:${process.env.DB_PASSWORD}@${process.env.DB_HOST}:${process.env.DB_PORT}/${process.env.DB_NAME}`, + { + host: 'localhost', + dialect: 'postgres', + logging: false, + } +); + +export const connectDB = async () => { + try { + await sequelize.authenticate(); + console.log('Database connected successfully'); + } catch (error) { + console.error('Unable to connect to the database:', error); + } +}; + +export const syncDB = async () => { + try { + //setupAssociations(); + await sequelize.sync({ force: false }); // force: true will drop and recreate tables on every run + console.log('Database synced successfully!'); + } catch (error) { + console.error('Failed to sync database:', error); + } +}; + + +export default sequelize; \ No newline at end of file diff --git a/server/src/storage/mino.ts b/server/src/storage/mino.ts new file mode 100644 index 00000000..96e3d0c2 --- /dev/null +++ b/server/src/storage/mino.ts @@ -0,0 +1,136 @@ +import { Client } from 'minio'; +import Run from '../models/Run'; + +const minioClient = new Client({ + endPoint: process.env.MINIO_ENDPOINT || 'localhost', + port: parseInt(process.env.MINIO_PORT || '9000'), + useSSL: false, + accessKey: process.env.MINIO_ACCESS_KEY || 'minio-access-key', + secretKey: process.env.MINIO_SECRET_KEY || 'minio-secret-key', +}); + +minioClient.bucketExists('maxun-test') + .then((exists) => { + if (exists) { + console.log('MinIO was connected successfully.'); + } else { + console.log('Bucket does not exist, but MinIO was connected.'); + } + }) + .catch((err) => { + console.error('Error connecting to MinIO:', err); + }) + +class BinaryOutputService { + private bucketName: string; + + constructor(bucketName: string) { + this.bucketName = bucketName; + } + + /** + * Uploads binary data to Minio and stores references in PostgreSQL. + * @param run - The run object representing the current process. + * @param binaryOutput - The binary output object containing data to upload. + * @returns A map of Minio URLs pointing to the uploaded binary data. + */ + async uploadAndStoreBinaryOutput(run: Run, binaryOutput: Record): Promise> { + const uploadedBinaryOutput: Record = {}; + const plainRun = run.toJSON(); + + for (const key of Object.keys(binaryOutput)) { + let binaryData = binaryOutput[key]; + + if (!plainRun.runId) { + console.error('Run ID is undefined. Cannot upload binary data.'); + continue; + } + + console.log(`Processing binary output key: ${key}`); + + // Check if binaryData has a valid Buffer structure and parse it + if (binaryData && typeof binaryData.data === 'string') { + try { + const parsedData = JSON.parse(binaryData.data); + if (parsedData && parsedData.type === 'Buffer' && Array.isArray(parsedData.data)) { + binaryData = Buffer.from(parsedData.data); + } else { + console.error(`Invalid Buffer format for key: ${key}`); + continue; + } + } catch (error) { + console.error(`Failed to parse JSON for key: ${key}`, error); + continue; + } + } + + // Handle cases where binaryData might not be a Buffer + if (!Buffer.isBuffer(binaryData)) { + console.error(`Binary data for key ${key} is not a valid Buffer.`); + continue; + } + + try { + const minioKey = `${plainRun.runId}/${key}`; + + await this.uploadBinaryOutputToMinioBucket(run, minioKey, binaryData); + + // Construct the public URL for the uploaded object + const publicUrl = `http://${process.env.MINIO_ENDPOINT}:${process.env.MINIO_PORT}/${this.bucketName}/${minioKey}`; + + // Save the public URL in the result object + uploadedBinaryOutput[key] = publicUrl; + } catch (error) { + console.error(`Error uploading key ${key} to MinIO:`, error); + } + } + + console.log('Uploaded Binary Output:', uploadedBinaryOutput); + + try { + await run.update({ binaryOutput: uploadedBinaryOutput }); + console.log('Run successfully updated with binary output'); + } catch (updateError) { + console.error('Error updating run with binary output:', updateError); + } + + return uploadedBinaryOutput; + } + + async uploadBinaryOutputToMinioBucket(run: Run, key: string, data: Buffer): Promise { + const bucketName = 'maxun-run-screenshots'; + try { + console.log(`Uploading to bucket ${bucketName} with key ${key}`); + await minioClient.putObject(bucketName, key, data, data.length, { 'Content-Type': 'image/png' }); + const plainRun = run.toJSON(); + plainRun.binaryOutput[key] = `minio://${bucketName}/${key}`; + console.log(`Successfully uploaded to MinIO: minio://${bucketName}/${key}`); + } catch (error) { + console.error(`Error uploading to MinIO bucket: ${bucketName} with key: ${key}`, error); + throw error; + } + } + + public async getBinaryOutputFromMinioBucket(key: string): Promise { + const bucketName = 'maxun-run-screenshots'; + + try { + console.log(`Fetching from bucket ${bucketName} with key ${key}`); + const stream = await minioClient.getObject(bucketName, key); + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + stream.on('data', (chunk) => chunks.push(chunk)); + stream.on('end', () => resolve(Buffer.concat(chunks))); + stream.on('error', (error) => { + console.error('Error while reading the stream from MinIO:', error); + reject(error); + }); + }); + } catch (error) { + console.error(`Error fetching from MinIO bucket: ${bucketName} with key: ${key}`, error); + throw error; + } + } +} + +export { minioClient, BinaryOutputService }; \ No newline at end of file diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index 615a43fb..3ea6fe53 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -8,6 +8,7 @@ import { googleSheetUpdateTasks, processGoogleSheetUpdates } from "../integratio import Robot from "../../models/Robot"; import Run from "../../models/Run"; import { getDecryptedProxyConfig } from "../../routes/proxy"; +import { BinaryOutputService } from "../../storage/mino"; async function createWorkflowAndStoreMetadata(id: string, userId: string) { try { @@ -115,6 +116,9 @@ async function executeRun(id: string) { const interpretationInfo = await browser.interpreter.InterpretRecording( recording.recording, currentPage, plainRun.interpreterSettings); + const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); + const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); + await destroyRemoteBrowser(plainRun.browserId); await run.update({ @@ -124,7 +128,7 @@ async function executeRun(id: string) { browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), serializableOutput: interpretationInfo.serializableOutput, - binaryOutput: interpretationInfo.binaryOutput, + binaryOutput: uploadedBinaryOutput, }); googleSheetUpdateTasks[id] = { diff --git a/src/components/molecules/RunContent.tsx b/src/components/molecules/RunContent.tsx index 2ac4d369..56e6b9d7 100644 --- a/src/components/molecules/RunContent.tsx +++ b/src/components/molecules/RunContent.tsx @@ -7,7 +7,6 @@ import { TabPanel, TabContext } from "@mui/lab"; import SettingsIcon from '@mui/icons-material/Settings'; import ImageIcon from '@mui/icons-material/Image'; import ArticleIcon from '@mui/icons-material/Article'; -import { Buffer } from 'buffer'; import { useEffect, useState } from "react"; import AssignmentIcon from '@mui/icons-material/Assignment'; import Table from '@mui/material/Table'; @@ -184,19 +183,16 @@ export const RunContent = ({ row, currentLog, interpretationInProgress, logEndRe Binary output {Object.keys(row.binaryOutput).map((key) => { try { - const binaryBuffer = JSON.parse(row.binaryOutput[key].data); - const b64 = Buffer.from(binaryBuffer.data).toString('base64'); + const imageUrl = row.binaryOutput[key]; return ( {key}: - Download + Download - {key} + {key} ) } catch (e) {