diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index 7ef2f2fb..6fc63f46 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -11,11 +11,34 @@ interface AirtableUpdateTask { retries: number; } -const MAX_RETRIES = 5; +const MAX_RETRIES = 3; const BASE_API_DELAY = 2000; export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {}; +async function refreshAirtableToken(refreshToken: string) { + try { + const response = await axios.post( + "https://airtable.com/oauth2/v1/token", + new URLSearchParams({ + grant_type: "refresh_token", + refresh_token: refreshToken, + client_id: process.env.AIRTABLE_CLIENT_ID!, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + }, + } + ); + + return response.data; + } catch (error: any) { + logger.log("error", `Failed to refresh Airtable token: ${error.message}`); + throw new Error(`Token refresh failed: ${error.response?.data?.error_description || error.message}`); + } +} + export async function updateAirtable(robotId: string, runId: string) { try { const run = await Run.findOne({ where: { runId } }); @@ -44,7 +67,7 @@ export async function updateAirtable(robotId: string, runId: string) { robotId, plainRobot.airtable_base_id, plainRobot.airtable_table_name, - plainRobot.airtable_table_id , + plainRobot.airtable_table_id, data ); console.log(`Data written to Airtable for ${robotId}`); @@ -55,6 +78,46 @@ export async function updateAirtable(robotId: string, runId: string) { } } +async function withTokenRefresh(robotId: string, apiCall: (accessToken: string) => Promise): Promise { + const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); + if (!robot) throw new Error(`Robot not found for robotId: ${robotId}`); + + let accessToken = robot.get('airtable_access_token') as string; + let refreshToken = robot.get('airtable_refresh_token') as string; + + if (!accessToken || !refreshToken) { + throw new Error('Airtable credentials not configured'); + } + + try { + return await apiCall(accessToken); + } catch (error: any) { + if (error.response?.status === 401 || + (error.statusCode === 401) || + error.message.includes('unauthorized') || + error.message.includes('expired')) { + + logger.log("info", `Refreshing expired Airtable token for robot: ${robotId}`); + + try { + const tokens = await refreshAirtableToken(refreshToken); + + await robot.update({ + airtable_access_token: tokens.access_token, + airtable_refresh_token: tokens.refresh_token || refreshToken + }); + + return await apiCall(tokens.access_token); + } catch (refreshError: any) { + logger.log("error", `Failed to refresh token: ${refreshError.message}`); + throw new Error(`Token refresh failed: ${refreshError.message}`); + } + } + + throw error; + } +} + export async function writeDataToAirtable( robotId: string, baseId: string, @@ -63,49 +126,115 @@ export async function writeDataToAirtable( data: any[] ) { try { - const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); - if (!robot) throw new Error('Robot not found'); - - const accessToken = robot.get('airtable_access_token'); - if (!accessToken) throw new Error('Airtable not connected'); + return await withTokenRefresh(robotId, async (accessToken: string) => { + const airtable = new Airtable({ apiKey: accessToken }); + const base = airtable.base(baseId); - const airtable = new Airtable({ apiKey: accessToken }); - const base = airtable.base(baseId); + const existingFields = await getExistingFields(base, tableName); + const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))]; + const missingFields = dataFields.filter(field => !existingFields.includes(field)); - // Dynamic field creation logic - const existingFields = await getExistingFields(base, tableName); - const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))]; - const missingFields = dataFields.filter(field => !existingFields.includes(field)); - - for (const field of missingFields) { - const sampleRow = data.find(row => field in row); - if (sampleRow) { - const sampleValue = sampleRow[field]; - await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId); - console.log("from airtable.ts",tableId); - console.log("from airtable.ts",tableName); + for (const field of missingFields) { + const sampleRow = data.find(row => field in row); + if (sampleRow) { + const sampleValue = sampleRow[field]; + try { + await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId); + console.log(`Successfully created field: ${field}`); + } catch (fieldError: any) { + console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`); + } + } } - } - // Batch processing with retries - const batchSize = 10; - for (let i = 0; i < data.length; i += batchSize) { - const batch = data.slice(i, i + batchSize); - await retryableAirtableWrite(base, tableName, batch); - } + await deleteEmptyRecords(base, tableName); - logger.log('info', `Successfully wrote ${data.length} records to Airtable`); + const records = data.map(row => { + const validFields = Object.keys(row).filter(field => + existingFields.includes(field) || missingFields.includes(field) + ); + + const validData: { [key: string]: any } = {}; + validFields.forEach((field: any) => { + validData[field] = row[field]; + }); + + return { fields: validData }; + }); + + await createRecordsWithRetry(base, tableName, records); + + logger.log('info', `Successfully wrote ${data.length} records to Airtable`); + }); } catch (error: any) { logger.log('error', `Airtable write failed: ${error.message}`); throw error; } } +async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise { + console.log('Checking for empty records to clear...'); + + try { + const existingRecords = await base(tableName).select().all(); + console.log(`Found ${existingRecords.length} total records`); + + const emptyRecords = existingRecords.filter(record => { + const fields = record.fields; + return !fields || Object.keys(fields).length === 0 || + Object.values(fields).every(value => + value === null || value === undefined || value === ''); + }); + + if (emptyRecords.length > 0) { + const BATCH_SIZE = 10; + for (let i = 0; i < emptyRecords.length; i += BATCH_SIZE) { + const batch = emptyRecords.slice(i, i + BATCH_SIZE); + const recordIds = batch.map(record => record.id); + await base(tableName).destroy(recordIds); + } + } + } catch (error: any) { + console.warn(`Warning: Could not clear empty records: ${error.message}`); + console.warn('Will continue without deleting empty records'); + } +} + +async function createRecordsWithRetry( + base: Airtable.Base, + tableName: string, + records: any[], + retries = MAX_RETRIES +): Promise { + try { + await base(tableName).create(records); + } catch (error: any) { + console.error(`Error creating records: ${error.message}`); + + if (retries > 0) { + await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY)); + console.log(`Retrying create operation, ${retries} attempts remaining`); + return createRecordsWithRetry(base, tableName, records, retries - 1); + } + throw error; + } +} + // Helper functions async function getExistingFields(base: Airtable.Base, tableName: string): Promise { try { - const records = await base(tableName).select({ maxRecords: 1 }).firstPage(); - return records[0] ? Object.keys(records[0].fields) : []; + const records = await base(tableName).select({ pageSize: 5 }).firstPage(); + if (records.length > 0) { + const fieldNames = new Set(); + records.forEach(record => { + Object.keys(record.fields).forEach(field => fieldNames.add(field)); + }); + + const headers = Array.from(fieldNames); + console.log(`Found ${headers.length} headers from records: ${headers.join(', ')}`); + return headers; + } + return []; } catch (error) { return []; } @@ -118,28 +247,36 @@ async function createAirtableField( sampleValue: any, accessToken: string, tableId: string, - retries = MAX_RETRIES ): Promise { try { const sanitizedFieldName = sanitizeFieldName(fieldName); const fieldType = inferFieldType(sampleValue); - await axios.post( + console.log(`Creating field ${sanitizedFieldName} with type ${fieldType}`); + + const response = await axios.post( `https://api.airtable.com/v0/meta/bases/${baseId}/tables/${tableId}/fields`, { name: sanitizedFieldName, type: fieldType }, { headers: { Authorization: `Bearer ${accessToken}` } } ); + logger.log('info', `Created field: ${sanitizedFieldName} (${fieldType})`); + return response.data; } catch (error: any) { if (retries > 0 && error.response?.status === 429) { - await delay(BASE_API_DELAY * (MAX_RETRIES - retries + 1)); + await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY)); return createAirtableField(baseId, tableName, fieldName, sampleValue, accessToken, tableId, retries - 1); } + if (error.response?.status === 422) { + console.log(`Field ${fieldName} may already exist or has validation issues`); + return; + } + const errorMessage = error.response?.data?.error?.message || error.message; const statusCode = error.response?.status || 'No Status Code'; - throw new Error(`Field creation failed (${statusCode}): ${errorMessage}`); + console.warn(`Field creation issue (${statusCode}): ${errorMessage}`); } } @@ -172,27 +309,6 @@ function isValidUrl(str: string): boolean { } } -async function retryableAirtableWrite( - base: Airtable.Base, - tableName: string, - batch: any[], - retries = MAX_RETRIES -): Promise { - try { - await base(tableName).create(batch.map(row => ({ fields: row }))); - } catch (error) { - if (retries > 0) { - await delay(BASE_API_DELAY); - return retryableAirtableWrite(base, tableName, batch, retries - 1); - } - throw error; - } -} - -function delay(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)); -} - export const processAirtableUpdates = async () => { while (true) { let hasPendingTasks = false; @@ -201,10 +317,10 @@ export const processAirtableUpdates = async () => { const task = airtableUpdateTasks[runId]; if (task.status !== 'pending') continue; - hasPendingTasks = true; + hasPendingTasks = true; try { await updateAirtable(task.robotId, task.runId); - delete airtableUpdateTasks[runId]; + delete airtableUpdateTasks[runId]; } catch (error: any) { task.retries += 1; if (task.retries >= MAX_RETRIES) { @@ -214,7 +330,11 @@ export const processAirtableUpdates = async () => { } } - if (!hasPendingTasks) break; - await delay(5000); + if (!hasPendingTasks) { + console.log('No pending Airtable update tasks, exiting processor'); + break; + } + + await new Promise(resolve => setTimeout(resolve, 5000)); } }; \ No newline at end of file