Files
parcer/server/src/storage/mino.ts

171 lines
5.8 KiB
TypeScript
Raw Normal View History

2024-10-14 23:48:31 +05:30
import { Client } from 'minio';
2024-10-15 19:16:48 +05:30
import Run from '../models/Run';
2024-10-14 23:48:31 +05:30
const minioClient = new Client({
endPoint: process.env.MINIO_ENDPOINT ? process.env.MINIO_ENDPOINT : 'localhost',
2024-10-14 23:48:31 +05:30
port: parseInt(process.env.MINIO_PORT || '9000'),
useSSL: false,
accessKey: process.env.MINIO_ACCESS_KEY || 'minio-access-key',
secretKey: process.env.MINIO_SECRET_KEY || 'minio-secret-key',
});
2024-10-14 23:51:15 +05:30
2024-10-15 22:47:19 +05:30
minioClient.bucketExists('maxun-test')
.then((exists) => {
if (exists) {
2024-11-15 23:40:21 +05:30
console.log('MinIO connected successfully.');
2024-10-15 22:47:19 +05:30
} else {
2024-11-15 23:40:21 +05:30
console.log('MinIO connected successfully.');
2024-10-15 22:47:19 +05:30
}
})
.catch((err) => {
console.error('Error connecting to MinIO:', err);
})
async function createBucketWithPolicy(bucketName: string, policy = 'public-read') {
2024-10-30 03:58:02 +05:30
try {
const bucketExists = await minioClient.bucketExists(bucketName);
if (!bucketExists) {
await minioClient.makeBucket(bucketName);
console.log(`Bucket ${bucketName} created successfully.`);
} else {
console.log(`Bucket ${bucketName} already exists.`);
}
if (policy === 'public-read') {
// Apply public-read policy after confirming the bucket exists
const policyJSON = {
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Principal: "*",
Action: ["s3:GetObject"],
Resource: [`arn:aws:s3:::${bucketName}/*`]
}
]
};
await minioClient.setBucketPolicy(bucketName, JSON.stringify(policyJSON));
console.log(`Public-read policy applied to bucket ${bucketName}.`);
}
2024-10-30 03:58:02 +05:30
} catch (error) {
console.error('Error in bucket creation or policy application:', error);
}
}
2024-10-15 19:16:48 +05:30
class BinaryOutputService {
private bucketName: string;
constructor(bucketName: string) {
this.bucketName = bucketName;
}
/**
* Uploads binary data to Minio and stores references in PostgreSQL.
* @param run - The run object representing the current process.
* @param binaryOutput - The binary output object containing data to upload.
* @returns A map of Minio URLs pointing to the uploaded binary data.
*/
2024-10-16 03:15:28 +05:30
async uploadAndStoreBinaryOutput(run: Run, binaryOutput: Record<string, any>): Promise<Record<string, string>> {
2024-10-15 19:16:48 +05:30
const uploadedBinaryOutput: Record<string, string> = {};
2024-10-16 03:15:28 +05:30
const plainRun = run.toJSON();
2024-10-15 19:16:48 +05:30
for (const key of Object.keys(binaryOutput)) {
2024-10-16 14:47:24 +05:30
let binaryData = binaryOutput[key];
2024-10-15 19:16:48 +05:30
2024-10-16 14:47:24 +05:30
if (!plainRun.runId) {
console.error('Run ID is undefined. Cannot upload binary data.');
continue;
}
2024-10-16 03:07:33 +05:30
2024-10-16 14:47:24 +05:30
console.log(`Processing binary output key: ${key}`);
2024-10-16 01:16:43 +05:30
2024-10-16 14:47:24 +05:30
// Check if binaryData has a valid Buffer structure and parse it
if (binaryData && typeof binaryData.data === 'string') {
try {
const parsedData = JSON.parse(binaryData.data);
if (parsedData && parsedData.type === 'Buffer' && Array.isArray(parsedData.data)) {
binaryData = Buffer.from(parsedData.data);
} else {
console.error(`Invalid Buffer format for key: ${key}`);
2024-10-16 03:07:33 +05:30
continue;
2024-10-16 14:47:24 +05:30
}
} catch (error) {
console.error(`Failed to parse JSON for key: ${key}`, error);
continue;
2024-10-16 01:16:43 +05:30
}
2024-10-16 14:47:24 +05:30
}
2024-10-15 19:16:48 +05:30
2024-10-16 14:47:24 +05:30
// Handle cases where binaryData might not be a Buffer
if (!Buffer.isBuffer(binaryData)) {
console.error(`Binary data for key ${key} is not a valid Buffer.`);
continue;
}
2024-10-16 00:45:30 +05:30
2024-10-16 14:47:24 +05:30
try {
const minioKey = `${plainRun.runId}/${key}`;
2024-10-16 00:45:30 +05:30
2024-10-16 14:47:24 +05:30
await this.uploadBinaryOutputToMinioBucket(run, minioKey, binaryData);
2024-10-16 00:45:30 +05:30
2024-10-16 14:47:24 +05:30
// Construct the public URL for the uploaded object
2024-11-04 23:41:50 +05:30
// todo: use minio endpoint
const publicUrl = `http://localhost:${process.env.MINIO_PORT}/${this.bucketName}/${minioKey}`;
2024-10-16 14:47:24 +05:30
// Save the public URL in the result object
uploadedBinaryOutput[key] = publicUrl;
} catch (error) {
console.error(`Error uploading key ${key} to MinIO:`, error);
}
2024-10-15 19:16:48 +05:30
}
2024-10-16 00:45:30 +05:30
console.log('Uploaded Binary Output:', uploadedBinaryOutput);
try {
2024-10-16 14:47:24 +05:30
await run.update({ binaryOutput: uploadedBinaryOutput });
console.log('Run successfully updated with binary output');
2024-10-16 00:45:30 +05:30
} catch (updateError) {
2024-10-16 14:47:24 +05:30
console.error('Error updating run with binary output:', updateError);
2024-10-16 00:45:30 +05:30
}
2024-10-15 19:16:48 +05:30
return uploadedBinaryOutput;
2024-10-16 14:47:24 +05:30
}
async uploadBinaryOutputToMinioBucket(run: Run, key: string, data: Buffer): Promise<void> {
await createBucketWithPolicy('maxun-run-screenshots', 'public-read');
const bucketName = 'maxun-run-screenshots';
try {
console.log(`Uploading to bucket ${bucketName} with key ${key}`);
await minioClient.putObject(bucketName, key, data, data.length, { 'Content-Type': 'image/png' });
const plainRun = run.toJSON();
plainRun.binaryOutput[key] = `minio://${bucketName}/${key}`;
console.log(`Successfully uploaded to MinIO: minio://${bucketName}/${key}`);
} catch (error) {
console.error(`Error uploading to MinIO bucket: ${bucketName} with key: ${key}`, error);
2024-10-16 13:40:44 +05:30
throw error;
}
}
public async getBinaryOutputFromMinioBucket(key: string): Promise<Buffer> {
const bucketName = 'maxun-run-screenshots';
try {
console.log(`Fetching from bucket ${bucketName} with key ${key}`);
const stream = await minioClient.getObject(bucketName, key);
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('end', () => resolve(Buffer.concat(chunks)));
stream.on('error', (error) => {
console.error('Error while reading the stream from MinIO:', error);
reject(error);
});
});
} catch (error) {
console.error(`Error fetching from MinIO bucket: ${bucketName} with key: ${key}`, error);
throw error;
}
}
2024-10-15 19:16:48 +05:30
}
2024-10-15 19:17:54 +05:30
export { minioClient, BinaryOutputService };