From 6a854f953122a36658db34278ed2683bee42520f Mon Sep 17 00:00:00 2001 From: AmitChauhan63390 Date: Thu, 30 Jan 2025 15:56:00 +0530 Subject: [PATCH] proper integration but writing data to airtable yet to be fixed --- server/src/routes/auth.ts | 49 ++++- server/src/routes/storage.ts | 10 + .../integrations/airtable.ts | 199 ++++++++++++++++++ .../integration/IntegrationSettings.tsx | 123 ++++++++++- 4 files changed, 371 insertions(+), 10 deletions(-) create mode 100644 server/src/workflow-management/integrations/airtable.ts diff --git a/server/src/routes/auth.ts b/server/src/routes/auth.ts index 268134f4..86749313 100644 --- a/server/src/routes/auth.ts +++ b/server/src/routes/auth.ts @@ -733,8 +733,8 @@ router.get("/airtable/bases", async (req: AuthenticatedRequest, res) => { }); // Update robot with selected base -router.post("/airtable/update", requireSignIn, async (req: AuthenticatedRequest, res) => { - const { baseId, baseName, robotId } = req.body; +router.post("/airtable/update", async (req: AuthenticatedRequest, res) => { + const { baseId, robotId , tableName} = req.body; if (!baseId || !robotId) { return res.status(400).json({ message: "Base ID and Robot ID are required" }); @@ -751,7 +751,8 @@ router.post("/airtable/update", requireSignIn, async (req: AuthenticatedRequest, await robot.update({ airtable_base_id: baseId, - airtable_table_name: baseName, + airtable_table_name: tableName, + }); capture("maxun-oss-airtable-integration-created", { @@ -803,3 +804,45 @@ router.post("/airtable/remove", requireSignIn, async (req: AuthenticatedRequest, } }); + + +// Fetch tables from an Airtable base +router.get("/airtable/tables", async (req: AuthenticatedRequest, res) => { + try { + const { baseId, robotId } = req.query; + + if (!baseId || !robotId) { + return res.status(400).json({ message: "Base ID and Robot ID are required" }); + } + + const robot = await Robot.findOne({ + where: { "recording_meta.id": robotId.toString() }, + raw: true, + }); + + if (!robot?.airtable_access_token) { + return res.status(400).json({ message: "Robot not authenticated with Airtable" }); + } + + const response = await fetch(`https://api.airtable.com/v0/meta/bases/${baseId}/tables`, { + headers: { + 'Authorization': `Bearer ${robot.airtable_access_token}` + } + }); + + if (!response.ok) { + const errorData = await response.json(); + throw new Error(errorData.error.message || 'Failed to fetch tables'); + } + + const data = await response.json(); + res.json(data.tables.map((table: any) => ({ + id: table.id, + name: table.name, + fields: table.fields + }))); + + } catch (error: any) { + res.status(500).json({ message: error.message }); + } +}); \ No newline at end of file diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index ddadf240..0fdd69ce 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -20,6 +20,7 @@ import { capture } from "../utils/analytics"; import { tryCatch } from 'bullmq'; import { WorkflowFile } from 'maxun-core'; import { Page } from 'playwright'; +import { airtableUpdateTasks, processAirtableUpdates } from '../workflow-management/integrations/airtable'; chromium.use(stealthPlugin()); export const router = Router(); @@ -514,6 +515,15 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re status: 'pending', retries: 5, }; + + airtableUpdateTasks[plainRun.runId] = { + robotId: plainRun.robotMetaId, + runId: plainRun.runId, + status: 'pending', + retries: 5, + }; + + processAirtableUpdates(); processGoogleSheetUpdates(); } catch (err: any) { logger.log('error', `Failed to update Google Sheet for run: ${plainRun.runId}: ${err.message}`); diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts new file mode 100644 index 00000000..9ace046c --- /dev/null +++ b/server/src/workflow-management/integrations/airtable.ts @@ -0,0 +1,199 @@ +import Airtable from "airtable"; +import axios from "axios"; +import logger from "../../logger"; +import Run from "../../models/Run"; +import Robot from "../../models/Robot"; + +interface AirtableUpdateTask { + robotId: string; + runId: string; + status: 'pending' | 'completed' | 'failed'; + retries: number; +} + +const MAX_RETRIES = 5; +const BASE_API_DELAY = 2000; + +export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {}; + +export async function updateAirtable(robotId: string, runId: string) { + try { + const run = await Run.findOne({ where: { runId } }); + if (!run) throw new Error(`Run not found for runId: ${runId}`); + + const plainRun = run.toJSON(); + if (plainRun.status !== 'success') { + console.log('Run status is not success'); + return; + } + + let data: { [key: string]: any }[] = []; + if (plainRun.serializableOutput?.['item-0']) { + data = plainRun.serializableOutput['item-0'] as { [key: string]: any }[]; + } else if (plainRun.binaryOutput?.['item-0']) { + data = [{ "File URL": plainRun.binaryOutput['item-0'] }]; + } + + const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); + if (!robot) throw new Error(`Robot not found for robotId: ${robotId}`); + + const plainRobot = robot.toJSON(); + if (plainRobot.airtable_base_id && plainRobot.airtable_table_name) { + console.log(`Writing to Airtable base ${plainRobot.airtable_base_id}`); + await writeDataToAirtable( + robotId, + plainRobot.airtable_base_id, + plainRobot.airtable_table_name, + data + ); + console.log(`Data written to Airtable for ${robotId}`); + } + } catch (error: any) { + console.error(`Airtable update failed: ${error.message}`); + throw error; + } +} + +export async function writeDataToAirtable( + robotId: string, + baseId: string, + tableName: string, + data: any[] +) { + try { + const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); + if (!robot) throw new Error('Robot not found'); + + const accessToken = robot.get('airtable_access_token'); + if (!accessToken) throw new Error('Airtable not connected'); + + const airtable = new Airtable({ apiKey: accessToken }); + const base = airtable.base(baseId); + + const existingFields = await getExistingFields(base, tableName); + const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))]; + const missingFields = dataFields.filter(field => !existingFields.includes(field)); + + for (const field of missingFields) { + const sampleValue = data.find(row => row[field])?.[field]; + if (sampleValue) { + await createAirtableField(baseId, tableName, field, sampleValue, accessToken); + } + } + + const batchSize = 10; + for (let i = 0; i < data.length; i += batchSize) { + const batch = data.slice(i, i + batchSize); + await retryableAirtableWrite(base, tableName, batch); + } + + logger.log('info', `Successfully wrote ${data.length} records to Airtable`); + } catch (error: any) { + logger.log('error', `Airtable write failed: ${error.message}`); + throw error; + } +} + +async function getExistingFields(base: Airtable.Base, tableName: string): Promise { + try { + const records = await base(tableName).select({ maxRecords: 1 }).firstPage(); + return records[0] ? Object.keys(records[0].fields) : []; + } catch (error) { + return []; + } +} + +async function createAirtableField( + baseId: string, + tableName: string, + fieldName: string, + sampleValue: any, + accessToken: string, + retries = MAX_RETRIES +): Promise { + try { + let fieldType = inferFieldType(sampleValue); + + // Fallback if field type is unknown + if (!fieldType) { + fieldType = 'singleLineText'; + logger.log('warn', `Unknown field type for ${fieldName}, defaulting to singleLineText`); + } + + console.log(`Creating field: ${fieldName}, Type: ${fieldType}`); + + await axios.post( + `https://api.airtable.com/v0/meta/bases/${baseId}/tables/${tableName}/fields`, + { name: fieldName, type: fieldType }, + { + headers: { + Authorization: `Bearer ${accessToken}`, + 'Content-Type': 'application/json' + } + } + ); + + logger.log('info', `Created field: ${fieldName} (${fieldType})`); + } catch (error: any) { + if (retries > 0 && error.response?.status === 429) { + await delay(BASE_API_DELAY * (MAX_RETRIES - retries + 2)); + return createAirtableField(baseId, tableName, fieldName, sampleValue, accessToken, retries - 1); + } + throw new Error(`Field creation failed: ${error.response?.data?.error?.message || 'Unknown error'}`); + } +} + +function inferFieldType(value: any): string { + if (typeof value === 'number') return 'number'; + if (typeof value === 'boolean') return 'checkbox'; + if (value instanceof Date) return 'dateTime'; + if (Array.isArray(value)) return 'multipleSelects'; + return 'singleLineText'; +} + +async function retryableAirtableWrite( + base: Airtable.Base, + tableName: string, + batch: any[], + retries = MAX_RETRIES +): Promise { + try { + await base(tableName).create(batch.map(row => ({ fields: row }))); + } catch (error) { + if (retries > 0) { + await delay(BASE_API_DELAY); + return retryableAirtableWrite(base, tableName, batch, retries - 1); + } + throw error; + } +} + +function delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +export const processAirtableUpdates = async () => { + while (true) { + let hasPendingTasks = false; + + for (const runId in airtableUpdateTasks) { + const task = airtableUpdateTasks[runId]; + if (task.status !== 'pending') continue; + + hasPendingTasks = true; + try { + await updateAirtable(task.robotId, task.runId); + delete airtableUpdateTasks[runId]; + } catch (error: any) { + task.retries += 1; + if (task.retries >= MAX_RETRIES) { + task.status = 'failed'; + logger.log('error', `Permanent failure for run ${runId}`); + } + } + } + + if (!hasPendingTasks) break; + await delay(5000); + } +}; diff --git a/src/components/integration/IntegrationSettings.tsx b/src/components/integration/IntegrationSettings.tsx index 9dd3bdc9..3fc62979 100644 --- a/src/components/integration/IntegrationSettings.tsx +++ b/src/components/integration/IntegrationSettings.tsx @@ -15,6 +15,7 @@ import { getStoredRecording } from "../../api/storage"; import { apiUrl } from "../../apiConfig.js"; import Cookies from "js-cookie"; import { useTranslation } from "react-i18next"; +import { SignalCellularConnectedNoInternet0BarSharp } from "@mui/icons-material"; interface IntegrationProps { isOpen: boolean; @@ -27,8 +28,11 @@ export interface IntegrationSettings { spreadsheetName?: string; airtableBaseId?: string; airtableBaseName?: string; + airtableTableName?: string, + airtableTableId?: string, data: string; integrationType: "googleSheets" | "airtable"; + } // Helper functions to replace js-cookie functionality @@ -56,12 +60,17 @@ export const IntegrationSettingsModal = ({ spreadsheetName: "", airtableBaseId: "", airtableBaseName: "", + airtableTableName: "", + airtableTableId: "", + data: "", integrationType: "googleSheets", + }); const [spreadsheets, setSpreadsheets] = useState<{ id: string; name: string }[]>([]); const [airtableBases, setAirtableBases] = useState<{ id: string; name: string }[]>([]); + const [airtableTables, setAirtableTables] = useState<{ id: string; name: string }[]>([]); const [loading, setLoading] = useState(false); const [error, setError] = useState(null); @@ -112,6 +121,23 @@ export const IntegrationSettingsModal = ({ } }; + + const fetchAirtableTables = async (baseId: string, recordingId: string) => { + try { + const response = await axios.get( + `${apiUrl}/auth/airtable/tables?robotId=${recordingId}&baseId=${baseId}`, + { withCredentials: true } + ); + setAirtableTables(response.data); + } + catch (error: any) { + console.error("Error fetching Airtable tables:", error); + notify("error", t("integration_settings.errors.fetch_error", { + message: error.response?.data?.message || error.message, + })); + } + } + // Handle Google Sheets selection const handleSpreadsheetSelect = (e: React.ChangeEvent) => { const selectedSheet = spreadsheets.find((sheet) => sheet.id === e.target.value); @@ -125,17 +151,74 @@ export const IntegrationSettingsModal = ({ }; // Handle Airtable base selection - const handleAirtableBaseSelect = (e: React.ChangeEvent) => { + const handleAirtableBaseSelect = async (e: React.ChangeEvent) => { const selectedBase = airtableBases.find((base) => base.id === e.target.value); + console.log(selectedBase); + if (selectedBase) { - setSettings({ - ...settings, + // Update local state + setSettings((prevSettings) => ({ + ...prevSettings, airtableBaseId: selectedBase.id, airtableBaseName: selectedBase.name, - }); + })); + + // Fetch tables for the selected base + if (recordingId) { + await fetchAirtableTables(selectedBase.id, recordingId); + } else { + console.error("Recording ID is null"); + } + + + + // try { + // // Ensure recordingId is available + // if (!recordingId) { + // throw new Error("Recording ID is missing"); + // } + + // // Make API call to update the base in the database + // const response = await axios.post( + // `${apiUrl}/auth/airtable/update`, + // { + // baseId: selectedBase.id, + // baseName: selectedBase.name, + // robotId: recordingId, // Use recordingId from the global context + // }, + // { withCredentials: true } + // ); + + // if (response.status !== 200) { + // throw new Error("Failed to update Airtable base in the database"); + // } + + // console.log("Airtable base updated successfully:", response.data); + // } catch (error) { + // console.error("Error updating Airtable base:", error); + // notify("error", t("integration_settings.errors.update_error", { + // message: error instanceof Error ? error.message : "Unknown error", + // })); + // } } }; + const handleAirtabletableSelect = (e: React.ChangeEvent) => { + console.log( e.target.value); + const selectedTable = airtableTables.find((table) => table.id === e.target.value); + if (selectedTable) { + setSettings((prevSettings) => ({ + ...prevSettings, + airtableTableId: e.target.value, + + airtableTableName: selectedTable?.name||"", + + })); + } + }; + + + // Update Google Sheets integration const updateGoogleSheetId = async () => { try { @@ -159,6 +242,10 @@ export const IntegrationSettingsModal = ({ // Update Airtable integration const updateAirtableBase = async () => { + console.log(settings.airtableBaseId); + console.log(settings.airtableTableName); + console.log(recordingId); + console.log(settings.airtableBaseName); try { await axios.post( `${apiUrl}/auth/airtable/update`, @@ -166,6 +253,7 @@ export const IntegrationSettingsModal = ({ baseId: settings.airtableBaseId, baseName: settings.airtableBaseName, robotId: recordingId, + tableName: settings.airtableTableName, }, { withCredentials: true } ); @@ -206,7 +294,7 @@ export const IntegrationSettingsModal = ({ { withCredentials: true } ); setAirtableBases([]); - setSettings({ ...settings, airtableBaseId: "", airtableBaseName: "" }); + setSettings({ ...settings, airtableBaseId: "", airtableBaseName: "", airtableTableName:"" }); notify("success", t("integration_settings.notifications.integration_removed")); } catch (error: any) { console.error("Error removing Airtable integration:", error); @@ -239,7 +327,13 @@ export const IntegrationSettingsModal = ({ if (recording.google_sheet_id) { setSettings({ ...settings, integrationType: "googleSheets" }); } else if (recording.airtable_base_id) { - setSettings({ ...settings, integrationType: "airtable" }); + setSettings(prev => ({ + ...prev, + airtableBaseId: recording.airtable_base_id || "", + airtableBaseName: recording.airtable_base_name || "", + airtableTableName: recording.airtable_table_name || "", + integrationType: recording.airtable_base_id ? "airtable" : "googleSheets" + })); } } }; @@ -475,7 +569,7 @@ if (!selectedIntegrationType) { <> {t("integration_settings.descriptions.authenticated_as", { - email: "hghghg", + email: "amit63390@gmail.com", })} {loading ? ( @@ -507,6 +601,21 @@ if (!selectedIntegrationType) { ))} + + {airtableTables.map((table) => ( + + {table.name} + + ))} +