Files
parcer/server/src/workflow-management/integrations/gsheet.ts

162 lines
5.4 KiB
TypeScript
Raw Normal View History

2024-09-18 19:54:31 +05:30
import { google } from "googleapis";
2024-09-19 17:42:20 +05:30
import logger from "../../logger";
import Run from "../../models/Run";
import Robot from "../../models/Robot";
interface GoogleSheetUpdateTask {
robotId: string;
runId: string;
2024-11-11 04:40:48 +05:30
status: 'pending' | 'completed' | 'failed';
2024-09-19 18:14:56 +05:30
retries: number;
}
2024-09-19 18:14:56 +05:30
const MAX_RETRIES = 5;
2024-11-11 04:40:48 +05:30
export let googleSheetUpdateTasks: { [runId: string]: GoogleSheetUpdateTask } = {};
export async function updateGoogleSheet(robotId: string, runId: string) {
try {
const run = await Run.findOne({ where: { runId } });
if (!run) {
throw new Error(`Run not found for runId: ${runId}`);
}
const plainRun = run.toJSON();
2024-11-11 04:40:48 +05:30
if (plainRun.status === 'success') {
2024-10-29 03:46:13 +05:30
let data: { [key: string]: any }[] = [];
2024-11-11 04:40:48 +05:30
if (plainRun.serializableOutput && Object.keys(plainRun.serializableOutput).length > 0) {
data = plainRun.serializableOutput['item-0'] as { [key: string]: any }[];
} else if (plainRun.binaryOutput && plainRun.binaryOutput['item-0']) {
2024-10-29 03:46:13 +05:30
// Handle binaryOutput by setting the URL as a data entry
2024-11-11 04:40:48 +05:30
const binaryUrl = plainRun.binaryOutput['item-0'] as string;
2024-10-29 03:46:13 +05:30
// Create a placeholder object with the binary URL
data = [{ "Screenshot URL": binaryUrl }];
}
2024-11-11 04:40:48 +05:30
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
2024-10-18 00:13:43 +05:30
if (!robot) {
throw new Error(`Robot not found for robotId: ${robotId}`);
}
const plainRobot = robot.toJSON();
2024-11-11 04:40:48 +05:30
const spreadsheetId = plainRobot.google_sheet_id;
if (plainRobot.google_sheet_email && spreadsheetId) {
console.log(`Preparing to write data to Google Sheet for robot: ${robotId}, spreadsheetId: ${spreadsheetId}`);
2024-10-18 00:30:58 +05:30
2024-10-17 21:00:35 +05:30
const headers = Object.keys(data[0]);
2024-11-11 04:40:48 +05:30
const rows = data.map((row: { [key: string]: any }) => Object.values(row));
2024-10-17 21:00:35 +05:30
const outputData = [headers, ...rows];
2024-10-17 21:00:35 +05:30
await writeDataToSheet(robotId, spreadsheetId, outputData);
2024-11-11 04:40:48 +05:30
console.log(`Data written to Google Sheet successfully for Robot: ${robotId} and Run: ${runId}`);
2024-10-18 00:13:43 +05:30
} else {
2024-11-11 04:40:48 +05:30
console.log('Google Sheets integration not configured.');
}
2024-10-18 00:13:43 +05:30
} else {
2024-11-11 04:40:48 +05:30
console.log('Run status is not success or serializableOutput is missing.');
2024-10-17 21:00:35 +05:30
}
} catch (error: any) {
2024-11-11 04:40:48 +05:30
console.error(`Failed to write data to Google Sheet for Robot: ${robotId} and Run: ${runId}: ${error.message}`);
}
2024-11-11 04:40:48 +05:30
};
2024-09-18 19:54:31 +05:30
2024-11-11 04:40:48 +05:30
export async function writeDataToSheet(robotId: string, spreadsheetId: string, data: any[]) {
2024-09-18 19:54:31 +05:30
try {
2024-11-11 04:40:48 +05:30
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
2024-10-17 21:22:34 +05:30
if (!robot) {
throw new Error(`Robot not found for robotId: ${robotId}`);
}
const plainRobot = robot.toJSON();
2024-11-11 04:40:48 +05:30
if (!plainRobot.google_access_token || !plainRobot.google_refresh_token) {
throw new Error('Google Sheets access not configured for user');
2024-10-17 21:22:34 +05:30
}
const oauth2Client = new google.auth.OAuth2(
process.env.GOOGLE_CLIENT_ID,
process.env.GOOGLE_CLIENT_SECRET,
process.env.GOOGLE_REDIRECT_URI
2024-10-17 21:22:34 +05:30
);
oauth2Client.setCredentials({
2024-11-11 04:40:48 +05:30
access_token: plainRobot.google_access_token,
refresh_token: plainRobot.google_refresh_token,
2024-10-17 21:22:34 +05:30
});
2024-11-11 04:40:48 +05:30
oauth2Client.on('tokens', async (tokens) => {
2024-10-17 21:22:34 +05:30
if (tokens.refresh_token) {
await robot.update({ google_refresh_token: tokens.refresh_token });
}
if (tokens.access_token) {
await robot.update({ google_access_token: tokens.access_token });
}
2024-09-18 19:54:31 +05:30
});
2024-11-11 04:40:48 +05:30
const sheets = google.sheets({ version: 'v4', auth: oauth2Client });
2024-09-18 19:54:31 +05:30
const resource = { values: data };
2024-11-11 04:40:48 +05:30
console.log('Attempting to write to spreadsheet:', spreadsheetId);
2024-09-18 19:54:31 +05:30
const response = await sheets.spreadsheets.values.append({
2024-09-18 19:54:31 +05:30
spreadsheetId,
2024-11-11 04:40:48 +05:30
range: 'Sheet1!A1',
valueInputOption: 'USER_ENTERED',
2024-09-18 19:54:31 +05:30
requestBody: resource,
});
2024-10-18 00:13:43 +05:30
if (response.status === 200) {
2024-11-11 04:40:48 +05:30
console.log('Data successfully appended to Google Sheet.');
2024-10-18 00:13:43 +05:30
} else {
2024-11-11 04:40:48 +05:30
console.error('Google Sheets append failed:', response);
2024-10-18 00:13:43 +05:30
}
2024-10-17 21:22:34 +05:30
logger.log(`info`, `Data written to Google Sheet: ${spreadsheetId}`);
2024-09-18 22:05:06 +05:30
} catch (error: any) {
2024-09-19 17:42:20 +05:30
logger.log(`error`, `Error writing data to Google Sheet: ${error.message}`);
2024-09-18 19:54:31 +05:30
throw error;
}
}
export const processGoogleSheetUpdates = async () => {
while (true) {
2024-09-19 18:14:56 +05:30
let hasPendingTasks = false;
for (const runId in googleSheetUpdateTasks) {
const task = googleSheetUpdateTasks[runId];
2024-11-11 04:40:48 +05:30
console.log(`Processing task for runId: ${runId}, status: ${task.status}`);
2024-11-11 04:40:48 +05:30
if (task.status === 'pending') {
2024-09-19 18:14:56 +05:30
hasPendingTasks = true;
try {
await updateGoogleSheet(task.robotId, task.runId);
console.log(`Successfully updated Google Sheet for runId: ${runId}`);
delete googleSheetUpdateTasks[runId];
} catch (error: any) {
2024-11-11 04:40:48 +05:30
console.error(`Failed to update Google Sheets for run ${task.runId}:`, error);
2024-09-19 18:14:56 +05:30
if (task.retries < MAX_RETRIES) {
googleSheetUpdateTasks[runId].retries += 1;
2024-11-11 04:40:48 +05:30
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries}`);
2024-09-19 18:14:56 +05:30
} else {
2024-11-11 04:40:48 +05:30
googleSheetUpdateTasks[runId].status = 'failed';
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
2024-09-19 18:14:56 +05:30
}
}
}
}
if (!hasPendingTasks) {
2024-11-11 04:40:48 +05:30
console.log('No pending tasks. Exiting loop.');
break;
}
2024-11-11 04:40:48 +05:30
console.log('Waiting for 5 seconds before checking again...');
await new Promise(resolve => setTimeout(resolve, 5000));
}
2024-11-11 04:40:48 +05:30
};