From 3916ac91c4b3ad07449ac3d29a523aa10e2eac39 Mon Sep 17 00:00:00 2001 From: Rohit Date: Tue, 8 Jul 2025 15:21:59 +0530 Subject: [PATCH] feat: struct capture text data airtable --- .../integrations/airtable.ts | 298 ++++++++---------- 1 file changed, 132 insertions(+), 166 deletions(-) diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index 401bc11d..0c9dfe71 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -44,65 +44,118 @@ async function refreshAirtableToken(refreshToken: string) { } } + function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record) { - const mergedRecords: Record[] = []; + const allRecords: Record[] = []; - const maxLength = Math.max( - ...[ - ...(serializableOutput.scrapeSchema ?? []).map(arr => arr?.length ?? 0), - ...(serializableOutput.scrapeList ?? []).map(arr => arr?.length ?? 0), - 0 - ] - ); + // Process all data types together, not separately + // This creates an interleaved/mixed result like in your image - for (let i = 0; i < maxLength; i++) { - mergedRecords.push({}); - } + let schemaIndex = 0; + let listIndex = 0; + let screenshotIndex = 0; + // Flatten all data arrays for processing + const schemaData: Array<{key: string, value: any}> = []; + const listData: any[] = []; + const screenshotData: Array<{key: string, url: string}> = []; + + // Collect schema data if (serializableOutput.scrapeSchema) { for (const schemaArray of serializableOutput.scrapeSchema) { if (!Array.isArray(schemaArray)) continue; - - for (let i = 0; i < schemaArray.length; i++) { - if (i >= mergedRecords.length) break; - mergedRecords[i] = { ...mergedRecords[i], ...schemaArray[i] }; + for (const schemaItem of schemaArray) { + Object.entries(schemaItem).forEach(([key, value]) => { + if (key && key.trim() !== '' && value !== null && value !== undefined && value !== '') { + schemaData.push({key, value}); + } + }); } } } + // Collect list data if (serializableOutput.scrapeList) { for (const listArray of serializableOutput.scrapeList) { if (!Array.isArray(listArray)) continue; - - for (let i = 0; i < listArray.length; i++) { - if (i >= mergedRecords.length) break; - mergedRecords[i] = { ...mergedRecords[i], ...listArray[i] }; - } - } - } - - if (binaryOutput && Object.keys(binaryOutput).length > 0) { - for (let i = 0; i < mergedRecords.length; i++) { - const screenshotKey = `item-${i}`; - if (binaryOutput[screenshotKey]) { - mergedRecords[i].Screenshot = binaryOutput[screenshotKey]; - mergedRecords[i].Key = screenshotKey; - } - } - - for (const [key, url] of Object.entries(binaryOutput)) { - if (mergedRecords.some(record => record.Key === key)) { - continue; - } - - mergedRecords.push({ - "Key": key, - "Screenshot": url + listArray.forEach(listItem => { + const hasContent = Object.values(listItem).some(value => + value !== null && value !== undefined && value !== '' + ); + if (hasContent) { + listData.push(listItem); + } }); } } - return mergedRecords; + // Collect screenshot data + if (binaryOutput && Object.keys(binaryOutput).length > 0) { + Object.entries(binaryOutput).forEach(([key, url]) => { + if (key && key.trim() !== '' && url && url.trim() !== '') { + screenshotData.push({key, url}); + } + }); + } + + // Mix all data types together to create consecutive records + const maxLength = Math.max(schemaData.length, listData.length, screenshotData.length); + + for (let i = 0; i < maxLength; i++) { + const record: Record = {}; + + // Add schema data if available + if (i < schemaData.length) { + record.Label = schemaData[i].key; + record.Value = schemaData[i].value; + } + + // Add list data if available + if (i < listData.length) { + Object.entries(listData[i]).forEach(([key, value]) => { + if (value !== null && value !== undefined && value !== '') { + record[key] = value; + } + }); + } + + // Add screenshot data if available + if (i < screenshotData.length) { + record.Key = screenshotData[i].key; + record.Screenshot = screenshotData[i].url; + } + + // Only add record if it has at least one value + if (Object.keys(record).length > 0) { + allRecords.push(record); + } + } + + // Add any remaining data that didn't fit in the parallel processing + // This handles cases where one data type has significantly more records than others + + // Add remaining schema data + for (let i = maxLength; i < schemaData.length; i++) { + allRecords.push({ + Label: schemaData[i].key, + Value: schemaData[i].value + }); + } + + // Add remaining list data + for (let i = maxLength; i < listData.length; i++) { + allRecords.push(listData[i]); + } + + // Add remaining screenshot data + for (let i = maxLength; i < screenshotData.length; i++) { + allRecords.push({ + Key: screenshotData[i].key, + Screenshot: screenshotData[i].url + }); + } + + return allRecords; } export async function updateAirtable(robotId: string, runId: string) { @@ -210,11 +263,13 @@ export async function writeDataToAirtable( const airtable = new Airtable({ apiKey: accessToken }); const base = airtable.base(baseId); + await deleteEmptyRecords(base, tableName); + const processedData = data.map(item => { const cleanedItem: Record = {}; for (const [key, value] of Object.entries(item)) { - if (value === null || value === undefined) { + if (value === null || value === undefined || value === '') { cleanedItem[key] = ''; } else if (typeof value === 'object' && !Array.isArray(value)) { cleanedItem[key] = JSON.stringify(value); @@ -224,113 +279,55 @@ export async function writeDataToAirtable( } return cleanedItem; + }).filter(record => { + return Object.values(record).some(value => value !== null && value !== undefined && value !== ''); }); - const existingFields = await getExistingFields(base, tableName); - console.log(`Found ${existingFields.length} existing fields in Airtable: ${existingFields.join(', ')}`); + if (processedData.length === 0) { + console.log('No valid data to write after filtering. Skipping.'); + return; + } - const dataFields = [...new Set(processedData.flatMap(row => Object.keys(row)))]; + const dataFields = [...new Set(processedData.flatMap(row => Object.keys(row)))]; console.log(`Found ${dataFields.length} fields in data: ${dataFields.join(', ')}`); + const existingFields = await getExistingFields(base, tableName); const missingFields = dataFields.filter(field => !existingFields.includes(field)); - const hasNewColumns = missingFields.length > 0; - console.log(`Found ${missingFields.length} new fields: ${missingFields.join(', ')}`); - - for (const field of missingFields) { - const sampleRow = processedData.find(row => field in row); - if (sampleRow) { - const sampleValue = sampleRow[field]; - try { - await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId); - console.log(`Successfully created field: ${field}`); - - await new Promise(resolve => setTimeout(resolve, 200)); - } catch (fieldError: any) { - console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`); - } - } - } - let existingRecords: Array<{ id: string, fields: Record }> = []; - - if (hasNewColumns) { - existingRecords = await fetchAllRecords(base, tableName); - console.log(`Found ${existingRecords.length} existing records in Airtable`); - } - - if (hasNewColumns && existingRecords.length > 0) { - const recordsToUpdate = []; - const recordsToCreate = []; + if (missingFields.length > 0) { + console.log(`Creating ${missingFields.length} new fields: ${missingFields.join(', ')}`); - const newColumnData = processedData.map(record => { - const newColumnsOnly: Record = {}; - missingFields.forEach(field => { - if (field in record) { - newColumnsOnly[field] = record[field]; - } - }); - return newColumnsOnly; - }); - - for (let i = 0; i < Math.min(existingRecords.length, newColumnData.length); i++) { - if (Object.keys(newColumnData[i]).length > 0) { - recordsToUpdate.push({ - id: existingRecords[i].id, - fields: newColumnData[i] - }); - } - } - - const existingColumnsBeingUpdated = dataFields.filter(field => - existingFields.includes(field) && !missingFields.includes(field) - ); - - if (existingColumnsBeingUpdated.length > 0) { - recordsToCreate.push(...processedData.map(record => ({ fields: record }))); - console.log(`Will append ${recordsToCreate.length} new records with all data`); - } else { - if (processedData.length > existingRecords.length) { - const additionalRecords = processedData.slice(existingRecords.length); - recordsToCreate.push(...additionalRecords.map(record => ({ fields: record }))); - console.log(`Will append ${recordsToCreate.length} additional records`); - } - } - - if (recordsToUpdate.length > 0) { - console.log(`Updating ${recordsToUpdate.length} existing records with new columns`); - const BATCH_SIZE = 10; - for (let i = 0; i < recordsToUpdate.length; i += BATCH_SIZE) { - const batch = recordsToUpdate.slice(i, i + BATCH_SIZE); - console.log(`Updating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToUpdate.length/BATCH_SIZE)}`); - + for (const field of missingFields) { + const sampleRow = processedData.find(row => field in row && row[field] !== ''); + if (sampleRow) { + const sampleValue = sampleRow[field]; try { - await retryableAirtableUpdate(base, tableName, batch); - } catch (batchError: any) { - console.error(`Error updating batch: ${batchError.message}`); - throw batchError; + await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId); + console.log(`Successfully created field: ${field}`); + await new Promise(resolve => setTimeout(resolve, 200)); + } catch (fieldError: any) { + console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`); } - - await new Promise(resolve => setTimeout(resolve, 500)); } } - } else { - console.log(`Appending all ${processedData.length} records to Airtable`); - const recordsToCreate = processedData.map(record => ({ fields: record })); + } + + console.log(`Appending all ${processedData.length} records to Airtable`); + const recordsToCreate = processedData.map(record => ({ fields: record })); + + const BATCH_SIZE = 10; + for (let i = 0; i < recordsToCreate.length; i += BATCH_SIZE) { + const batch = recordsToCreate.slice(i, i + BATCH_SIZE); + console.log(`Creating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToCreate.length/BATCH_SIZE)}`); - const BATCH_SIZE = 10; - for (let i = 0; i < recordsToCreate.length; i += BATCH_SIZE) { - const batch = recordsToCreate.slice(i, i + BATCH_SIZE); - console.log(`Creating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToCreate.length/BATCH_SIZE)}`); - - try { - await retryableAirtableCreate(base, tableName, batch); - } catch (batchError: any) { - console.error(`Error creating batch: ${batchError.message}`); - throw batchError; - } - - await new Promise(resolve => setTimeout(resolve, 500)); + try { + await retryableAirtableCreate(base, tableName, batch); + } catch (batchError: any) { + console.error(`Error creating batch: ${batchError.message}`); + throw batchError; } + + await new Promise(resolve => setTimeout(resolve, 500)); } await deleteEmptyRecords(base, tableName); @@ -343,20 +340,6 @@ export async function writeDataToAirtable( } } -async function fetchAllRecords(base: Airtable.Base, tableName: string): Promise }>> { - try { - console.log(`Fetching all records from ${tableName}...`); - const records = await base(tableName).select().all(); - return records.map(record => ({ - id: record.id, - fields: record.fields - })); - } catch (error: any) { - console.warn(`Warning: Could not fetch all records: ${error.message}`); - return []; - } -} - async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise { console.log('Checking for empty records to clear...'); @@ -407,23 +390,6 @@ async function retryableAirtableCreate( } } -async function retryableAirtableUpdate( - base: Airtable.Base, - tableName: string, - batch: any[], - retries = MAX_RETRIES -): Promise { - try { - await base(tableName).update(batch); - } catch (error) { - if (retries > 0) { - await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY)); - return retryableAirtableUpdate(base, tableName, batch, retries - 1); - } - throw error; - } -} - // Helper functions async function getExistingFields(base: Airtable.Base, tableName: string): Promise { try {