2024-09-18 19:54:31 +05:30
|
|
|
import { google } from "googleapis";
|
|
|
|
|
import fs from 'fs';
|
|
|
|
|
import path from 'path';
|
2024-09-19 17:42:20 +05:30
|
|
|
import logger from "../../logger";
|
2024-09-19 17:50:59 +05:30
|
|
|
import { readFile } from "../storage";
|
2024-09-19 18:04:32 +05:30
|
|
|
interface GoogleSheetUpdateTask {
|
|
|
|
|
name: string;
|
|
|
|
|
runId: string;
|
2024-09-19 18:14:56 +05:30
|
|
|
status: 'pending' | 'completed' | 'failed';
|
|
|
|
|
retries: number;
|
2024-09-19 18:04:32 +05:30
|
|
|
}
|
|
|
|
|
|
2024-09-19 18:14:56 +05:30
|
|
|
const MAX_RETRIES = 5;
|
|
|
|
|
|
2024-09-19 18:04:32 +05:30
|
|
|
export let googleSheetUpdateTasks: { [runId: string]: GoogleSheetUpdateTask } = {};
|
|
|
|
|
|
|
|
|
|
|
2024-09-19 17:50:59 +05:30
|
|
|
// *** Temporary Path to the JSON file that will store the integration details ***
|
2024-09-19 18:40:45 +05:30
|
|
|
const getIntegrationsFilePath = (fileName: string) => path.join(__dirname, `integrations-${fileName}.json`);
|
2024-09-19 17:50:59 +05:30
|
|
|
|
2024-09-19 18:40:45 +05:30
|
|
|
export function loadIntegrations(fileName: string) {
|
|
|
|
|
const filePath = getIntegrationsFilePath(fileName);
|
|
|
|
|
if (fs.existsSync(filePath)) {
|
|
|
|
|
const data = fs.readFileSync(filePath, 'utf-8');
|
2024-09-19 17:50:59 +05:30
|
|
|
return JSON.parse(data);
|
|
|
|
|
}
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
|
2024-09-19 18:40:45 +05:30
|
|
|
export function saveIntegrations(fileName: string, integrations: any) {
|
|
|
|
|
const filePath = getIntegrationsFilePath(fileName);
|
|
|
|
|
fs.writeFileSync(filePath, JSON.stringify(integrations, null, 2));
|
2024-09-19 17:50:59 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export async function updateGoogleSheet(fileName: string, runId: string) {
|
|
|
|
|
try {
|
|
|
|
|
const run = await readFile(`./../storage/runs/${fileName}_${runId}.json`);
|
|
|
|
|
const parsedRun = JSON.parse(run);
|
|
|
|
|
|
|
|
|
|
if (parsedRun.status === 'success' && parsedRun.serializableOutput) {
|
|
|
|
|
const data = parsedRun.serializableOutput['item-0'] as { [key: string]: any }[];
|
2024-09-19 18:40:45 +05:30
|
|
|
const integrationConfig = await loadIntegrations(fileName);
|
2024-09-19 17:50:59 +05:30
|
|
|
|
|
|
|
|
if (integrationConfig) {
|
2024-09-19 18:56:15 +05:30
|
|
|
const { fileName, spreadsheetId, range, credentials } = integrationConfig;
|
2024-09-19 17:50:59 +05:30
|
|
|
|
2024-09-19 18:56:15 +05:30
|
|
|
if (fileName && spreadsheetId && range && credentials) {
|
2024-09-19 17:50:59 +05:30
|
|
|
// Convert data to Google Sheets format (headers and rows)
|
|
|
|
|
const headers = Object.keys(data[0]);
|
|
|
|
|
const rows = data.map((row: { [key: string]: any }) => Object.values(row));
|
|
|
|
|
const outputData = [headers, ...rows];
|
|
|
|
|
|
2024-09-19 18:56:15 +05:30
|
|
|
await writeDataToSheet(fileName, spreadsheetId, range, outputData);
|
2024-09-19 17:50:59 +05:30
|
|
|
logger.log('info', `Data written to Google Sheet successfully for ${fileName}_${runId}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
logger.log('error', `Google Sheet integration not configured for ${fileName}_${runId}`);
|
|
|
|
|
}
|
|
|
|
|
logger.log('error', `Run not successful or no data to update for ${fileName}_${runId}`);
|
|
|
|
|
} catch (error: any) {
|
|
|
|
|
logger.log('error', `Failed to write data to Google Sheet for ${fileName}_${runId}: ${error.message}`);
|
|
|
|
|
}
|
|
|
|
|
};
|
2024-09-18 19:54:31 +05:30
|
|
|
|
2024-09-19 18:56:15 +05:30
|
|
|
export async function writeDataToSheet(fileName: string, spreadsheetId: string, range: string, data: any[]) {
|
2024-09-18 19:54:31 +05:30
|
|
|
try {
|
2024-09-19 18:56:15 +05:30
|
|
|
const integrationCredentialsPath = getIntegrationsFilePath(fileName);
|
|
|
|
|
const integrationCredentials = JSON.parse(fs.readFileSync(integrationCredentialsPath, 'utf-8'));;
|
2024-09-18 19:54:31 +05:30
|
|
|
|
|
|
|
|
const auth = new google.auth.GoogleAuth({
|
|
|
|
|
credentials: {
|
2024-09-19 17:42:20 +05:30
|
|
|
client_email: integrationCredentials.credentials.client_email,
|
|
|
|
|
private_key: integrationCredentials.credentials.private_key,
|
2024-09-18 19:54:31 +05:30
|
|
|
},
|
|
|
|
|
scopes: ['https://www.googleapis.com/auth/spreadsheets'],
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const authToken = await auth.getClient();
|
|
|
|
|
const sheets = google.sheets({ version: 'v4', auth: authToken as any });
|
|
|
|
|
|
|
|
|
|
const resource = { values: data };
|
|
|
|
|
|
|
|
|
|
await sheets.spreadsheets.values.append({
|
|
|
|
|
spreadsheetId,
|
|
|
|
|
range,
|
|
|
|
|
valueInputOption: 'USER_ENTERED',
|
|
|
|
|
requestBody: resource,
|
|
|
|
|
});
|
|
|
|
|
|
2024-09-19 17:42:20 +05:30
|
|
|
logger.log(`info`, `Data written to Google Sheet: ${spreadsheetId}, Range: ${range}`);
|
2024-09-18 22:05:06 +05:30
|
|
|
} catch (error: any) {
|
2024-09-19 17:42:20 +05:30
|
|
|
logger.log(`error`, `Error writing data to Google Sheet: ${error.message}`);
|
2024-09-18 19:54:31 +05:30
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-09-19 18:04:32 +05:30
|
|
|
|
2024-09-19 19:36:08 +05:30
|
|
|
export const processGoogleSheetUpdates = async () => {
|
2024-09-19 18:04:32 +05:30
|
|
|
while (true) {
|
2024-09-19 18:14:56 +05:30
|
|
|
let hasPendingTasks = false;
|
2024-09-19 18:04:32 +05:30
|
|
|
for (const runId in googleSheetUpdateTasks) {
|
|
|
|
|
const task = googleSheetUpdateTasks[runId];
|
|
|
|
|
if (task.status === 'pending') {
|
2024-09-19 18:14:56 +05:30
|
|
|
hasPendingTasks = true;
|
2024-09-19 18:04:32 +05:30
|
|
|
try {
|
|
|
|
|
await updateGoogleSheet(task.name, task.runId);
|
|
|
|
|
delete googleSheetUpdateTasks[runId];
|
|
|
|
|
} catch (error: any) {
|
2024-09-19 18:14:56 +05:30
|
|
|
if (task.retries < MAX_RETRIES) {
|
|
|
|
|
googleSheetUpdateTasks[runId].retries += 1;
|
|
|
|
|
} else {
|
|
|
|
|
// Mark as failed after maximum retries
|
|
|
|
|
googleSheetUpdateTasks[runId].status = 'failed';
|
|
|
|
|
}
|
2024-09-19 18:04:32 +05:30
|
|
|
console.error(`Failed to update Google Sheets for run ${task.runId}:`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!hasPendingTasks) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 5000));
|
|
|
|
|
}
|
|
|
|
|
};
|