From 109afffb4bef6e35827c99da44addae4703b984b Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 29 Apr 2025 20:15:48 +0530 Subject: [PATCH] feat: revamp airtable integration multiple actions --- .../integrations/airtable.ts | 323 +++++++++++++++--- 1 file changed, 278 insertions(+), 45 deletions(-) diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index 93474721..a78ec72d 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -11,6 +11,12 @@ interface AirtableUpdateTask { retries: number; } +interface SerializableOutput { + scrapeSchema?: any[]; + scrapeList?: any[]; + other?: any[]; +} + const MAX_RETRIES = 3; const BASE_API_DELAY = 2000; @@ -39,38 +45,117 @@ async function refreshAirtableToken(refreshToken: string) { } } +function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record) { + const mergedRecords: Record[] = []; + + const maxLength = Math.max( + ...(serializableOutput.scrapeSchema?.map(array => array?.length || 0) || [0]), + ...(serializableOutput.scrapeList?.map(array => array?.length || 0) || [0]), + ...(serializableOutput.other?.map(array => array?.length || 0) || [0]) + ); + + for (let i = 0; i < maxLength; i++) { + mergedRecords.push({}); + } + + if (serializableOutput.scrapeSchema) { + for (const schemaArray of serializableOutput.scrapeSchema) { + if (!Array.isArray(schemaArray)) continue; + + for (let i = 0; i < schemaArray.length; i++) { + if (i >= mergedRecords.length) break; + mergedRecords[i] = { ...mergedRecords[i], ...schemaArray[i] }; + } + } + } + + if (serializableOutput.scrapeList) { + for (const listArray of serializableOutput.scrapeList) { + if (!Array.isArray(listArray)) continue; + + for (let i = 0; i < listArray.length; i++) { + if (i >= mergedRecords.length) break; + mergedRecords[i] = { ...mergedRecords[i], ...listArray[i] }; + } + } + } + + if (serializableOutput.other) { + for (const otherArray of serializableOutput.other) { + if (!Array.isArray(otherArray)) continue; + + for (let i = 0; i < otherArray.length; i++) { + if (i >= mergedRecords.length) break; + mergedRecords[i] = { ...mergedRecords[i], ...otherArray[i] }; + } + } + } + + if (binaryOutput && Object.keys(binaryOutput).length > 0) { + for (let i = 0; i < mergedRecords.length; i++) { + const screenshotKey = `item-${i}`; + if (binaryOutput[screenshotKey]) { + mergedRecords[i].Screenshot = binaryOutput[screenshotKey]; + mergedRecords[i].Key = screenshotKey; + } + } + + for (const [key, url] of Object.entries(binaryOutput)) { + if (mergedRecords.some(record => record.Key === key)) { + continue; + } + + mergedRecords.push({ + "Key": key, + "Screenshot": url + }); + } + } + + return mergedRecords; +} + export async function updateAirtable(robotId: string, runId: string) { try { + console.log(`Starting Airtable update for run: ${runId}, robot: ${robotId}`); + const run = await Run.findOne({ where: { runId } }); if (!run) throw new Error(`Run not found for runId: ${runId}`); const plainRun = run.toJSON(); if (plainRun.status !== 'success') { - console.log('Run status is not success'); + console.log('Run status is not success, skipping Airtable update'); return; } - let data: { [key: string]: any }[] = []; - if (plainRun.serializableOutput?.['item-0']) { - data = plainRun.serializableOutput['item-0'] as { [key: string]: any }[]; - } else if (plainRun.binaryOutput?.['item-0']) { - data = [{ "File URL": plainRun.binaryOutput['item-0'] }]; - } - const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); if (!robot) throw new Error(`Robot not found for robotId: ${robotId}`); const plainRobot = robot.toJSON(); - if (plainRobot.airtable_base_id && plainRobot.airtable_table_name && plainRobot.airtable_table_id) { - console.log(`Writing to Airtable base ${plainRobot.airtable_base_id}`); + + if (!plainRobot.airtable_base_id || !plainRobot.airtable_table_name || !plainRobot.airtable_table_id) { + console.log('Airtable integration not configured'); + return; + } + + console.log(`Airtable configuration found - Base: ${plainRobot.airtable_base_id}, Table: ${plainRobot.airtable_table_name}`); + + const serializableOutput = plainRun.serializableOutput as SerializableOutput; + const binaryOutput = plainRun.binaryOutput || {}; + + const mergedData = mergeRelatedData(serializableOutput, binaryOutput); + + if (mergedData.length > 0) { await writeDataToAirtable( robotId, plainRobot.airtable_base_id, plainRobot.airtable_table_name, plainRobot.airtable_table_id, - data + mergedData ); - console.log(`Data written to Airtable for ${robotId}`); + console.log(`All data written to Airtable for ${robotId}`); + } else { + console.log(`No data to write to Airtable for ${robotId}`); } } catch (error: any) { console.error(`Airtable update failed: ${error.message}`); @@ -125,42 +210,142 @@ export async function writeDataToAirtable( tableId: string, data: any[] ) { + if (!data || data.length === 0) { + console.log('No data to write. Skipping.'); + return; + } + try { return await withTokenRefresh(robotId, async (accessToken: string) => { const airtable = new Airtable({ apiKey: accessToken }); const base = airtable.base(baseId); - const existingFields = await getExistingFields(base, tableName); - console.log(`Found ${existingFields.length} existing fields in Airtable`); + const processedData = data.map(item => { + const cleanedItem: Record = {}; + + for (const [key, value] of Object.entries(item)) { + if (value === null || value === undefined) { + cleanedItem[key] = ''; + } else if (typeof value === 'object' && !Array.isArray(value)) { + cleanedItem[key] = JSON.stringify(value); + } else { + cleanedItem[key] = value; + } + } + + return cleanedItem; + }); - const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))]; + const existingFields = await getExistingFields(base, tableName); + console.log(`Found ${existingFields.length} existing fields in Airtable: ${existingFields.join(', ')}`); + + const dataFields = [...new Set(processedData.flatMap(row => Object.keys(row)))]; console.log(`Found ${dataFields.length} fields in data: ${dataFields.join(', ')}`); const missingFields = dataFields.filter(field => !existingFields.includes(field)); - console.log(`Found ${missingFields.length} missing fields: ${missingFields.join(', ')}`); + const hasNewColumns = missingFields.length > 0; + console.log(`Found ${missingFields.length} new fields: ${missingFields.join(', ')}`); for (const field of missingFields) { - const sampleRow = data.find(row => field in row); + const sampleRow = processedData.find(row => field in row); if (sampleRow) { const sampleValue = sampleRow[field]; try { await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId); console.log(`Successfully created field: ${field}`); + + await new Promise(resolve => setTimeout(resolve, 200)); } catch (fieldError: any) { console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`); } } } - - await deleteEmptyRecords(base, tableName); - const BATCH_SIZE = 10; - for (let i = 0; i < data.length; i += BATCH_SIZE) { - const batch = data.slice(i, i + BATCH_SIZE); - await retryableAirtableWrite(base, tableName, batch); + let existingRecords: Array<{ id: string, fields: Record }> = []; + + if (hasNewColumns) { + existingRecords = await fetchAllRecords(base, tableName); + console.log(`Found ${existingRecords.length} existing records in Airtable`); } - logger.log('info', `Successfully wrote ${data.length} records to Airtable`); + if (hasNewColumns && existingRecords.length > 0) { + const recordsToUpdate = []; + const recordsToCreate = []; + + const newColumnData = processedData.map(record => { + const newColumnsOnly: Record = {}; + missingFields.forEach(field => { + if (field in record) { + newColumnsOnly[field] = record[field]; + } + }); + return newColumnsOnly; + }); + + for (let i = 0; i < Math.min(existingRecords.length, newColumnData.length); i++) { + if (Object.keys(newColumnData[i]).length > 0) { + recordsToUpdate.push({ + id: existingRecords[i].id, + fields: newColumnData[i] + }); + } + } + + const existingColumnsBeingUpdated = dataFields.filter(field => + existingFields.includes(field) && !missingFields.includes(field) + ); + + if (existingColumnsBeingUpdated.length > 0) { + recordsToCreate.push(...processedData.map(record => ({ fields: record }))); + console.log(`Will append ${recordsToCreate.length} new records with all data`); + } else { + if (processedData.length > existingRecords.length) { + const additionalRecords = processedData.slice(existingRecords.length); + recordsToCreate.push(...additionalRecords.map(record => ({ fields: record }))); + console.log(`Will append ${recordsToCreate.length} additional records`); + } + } + + if (recordsToUpdate.length > 0) { + console.log(`Updating ${recordsToUpdate.length} existing records with new columns`); + const BATCH_SIZE = 10; + for (let i = 0; i < recordsToUpdate.length; i += BATCH_SIZE) { + const batch = recordsToUpdate.slice(i, i + BATCH_SIZE); + console.log(`Updating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToUpdate.length/BATCH_SIZE)}`); + + try { + await retryableAirtableUpdate(base, tableName, batch); + } catch (batchError: any) { + console.error(`Error updating batch: ${batchError.message}`); + throw batchError; + } + + await new Promise(resolve => setTimeout(resolve, 500)); + } + } + } else { + console.log(`Appending all ${processedData.length} records to Airtable`); + const recordsToCreate = processedData.map(record => ({ fields: record })); + + const BATCH_SIZE = 10; + for (let i = 0; i < recordsToCreate.length; i += BATCH_SIZE) { + const batch = recordsToCreate.slice(i, i + BATCH_SIZE); + console.log(`Creating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToCreate.length/BATCH_SIZE)}`); + + try { + await retryableAirtableCreate(base, tableName, batch); + } catch (batchError: any) { + console.error(`Error creating batch: ${batchError.message}`); + throw batchError; + } + + await new Promise(resolve => setTimeout(resolve, 500)); + } + } + + await deleteEmptyRecords(base, tableName); + + logger.log('info', `Successfully processed ${processedData.length} records in Airtable`); }); } catch (error: any) { logger.log('error', `Airtable write failed: ${error.message}`); @@ -168,6 +353,20 @@ export async function writeDataToAirtable( } } +async function fetchAllRecords(base: Airtable.Base, tableName: string): Promise }>> { + try { + console.log(`Fetching all records from ${tableName}...`); + const records = await base(tableName).select().all(); + return records.map(record => ({ + id: record.id, + fields: record.fields + })); + } catch (error: any) { + console.warn(`Warning: Could not fetch all records: ${error.message}`); + return []; + } +} + async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise { console.log('Checking for empty records to clear...'); @@ -183,31 +382,53 @@ async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promi }); if (emptyRecords.length > 0) { + console.log(`Found ${emptyRecords.length} empty records to delete`); const BATCH_SIZE = 10; for (let i = 0; i < emptyRecords.length; i += BATCH_SIZE) { const batch = emptyRecords.slice(i, i + BATCH_SIZE); const recordIds = batch.map(record => record.id); await base(tableName).destroy(recordIds); + console.log(`Deleted batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(emptyRecords.length/BATCH_SIZE)}`); } - } + console.log(`Successfully deleted ${emptyRecords.length} empty records`); + } else { + console.log('No empty records found to delete'); + } } catch (error: any) { console.warn(`Warning: Could not clear empty records: ${error.message}`); console.warn('Will continue without deleting empty records'); } } -async function retryableAirtableWrite( +async function retryableAirtableCreate( base: Airtable.Base, tableName: string, batch: any[], retries = MAX_RETRIES ): Promise { try { - await base(tableName).create(batch.map(row => ({ fields: row }))); + await base(tableName).create(batch); } catch (error) { if (retries > 0) { await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY)); - return retryableAirtableWrite(base, tableName, batch, retries - 1); + return retryableAirtableCreate(base, tableName, batch, retries - 1); + } + throw error; + } +} + +async function retryableAirtableUpdate( + base: Airtable.Base, + tableName: string, + batch: any[], + retries = MAX_RETRIES +): Promise { + try { + await base(tableName).update(batch); + } catch (error) { + if (retries > 0) { + await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY)); + return retryableAirtableUpdate(base, tableName, batch, retries - 1); } throw error; } @@ -217,18 +438,19 @@ async function retryableAirtableWrite( async function getExistingFields(base: Airtable.Base, tableName: string): Promise { try { const records = await base(tableName).select({ pageSize: 5 }).firstPage(); + const fieldNames = new Set(); + if (records.length > 0) { - const fieldNames = new Set(); records.forEach(record => { Object.keys(record.fields).forEach(field => fieldNames.add(field)); }); - - const headers = Array.from(fieldNames); - console.log(`Found ${headers.length} headers from records: ${headers.join(', ')}`); - return headers; } - return []; + + const headers = Array.from(fieldNames); + console.log(`Found ${headers.length} headers from records: ${headers.join(', ')}`); + return headers; } catch (error) { + console.warn(`Warning: Error fetching existing fields: ${error}`); return []; } } @@ -299,17 +521,27 @@ export const processAirtableUpdates = async () => { for (const runId in airtableUpdateTasks) { const task = airtableUpdateTasks[runId]; - if (task.status !== 'pending') continue; - - hasPendingTasks = true; - try { - await updateAirtable(task.robotId, task.runId); - delete airtableUpdateTasks[runId]; - } catch (error: any) { - task.retries += 1; - if (task.retries >= MAX_RETRIES) { - task.status = 'failed'; - logger.log('error', `Permanent failure for run ${runId}: ${error.message}`); + + if (task.status === 'pending') { + hasPendingTasks = true; + console.log(`Processing Airtable update for run: ${runId}`); + + try { + await updateAirtable(task.robotId, task.runId); + console.log(`Successfully updated Airtable for runId: ${runId}`); + airtableUpdateTasks[runId].status = 'completed'; + delete airtableUpdateTasks[runId]; + } catch (error: any) { + console.error(`Failed to update Airtable for run ${task.runId}:`, error); + + if (task.retries < MAX_RETRIES) { + airtableUpdateTasks[runId].retries += 1; + console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries + 1}`); + } else { + airtableUpdateTasks[runId].status = 'failed'; + console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`); + logger.log('error', `Permanent failure for run ${runId}: ${error.message}`); + } } } } @@ -319,6 +551,7 @@ export const processAirtableUpdates = async () => { break; } + console.log('Waiting for 5 seconds before checking again...'); await new Promise(resolve => setTimeout(resolve, 5000)); } }; \ No newline at end of file