feat: handle empty rows and refresh token logic
This commit is contained in:
@@ -11,11 +11,34 @@ interface AirtableUpdateTask {
|
|||||||
retries: number;
|
retries: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
const MAX_RETRIES = 5;
|
const MAX_RETRIES = 3;
|
||||||
const BASE_API_DELAY = 2000;
|
const BASE_API_DELAY = 2000;
|
||||||
|
|
||||||
export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {};
|
export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {};
|
||||||
|
|
||||||
|
async function refreshAirtableToken(refreshToken: string) {
|
||||||
|
try {
|
||||||
|
const response = await axios.post(
|
||||||
|
"https://airtable.com/oauth2/v1/token",
|
||||||
|
new URLSearchParams({
|
||||||
|
grant_type: "refresh_token",
|
||||||
|
refresh_token: refreshToken,
|
||||||
|
client_id: process.env.AIRTABLE_CLIENT_ID!,
|
||||||
|
}),
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/x-www-form-urlencoded",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return response.data;
|
||||||
|
} catch (error: any) {
|
||||||
|
logger.log("error", `Failed to refresh Airtable token: ${error.message}`);
|
||||||
|
throw new Error(`Token refresh failed: ${error.response?.data?.error_description || error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function updateAirtable(robotId: string, runId: string) {
|
export async function updateAirtable(robotId: string, runId: string) {
|
||||||
try {
|
try {
|
||||||
const run = await Run.findOne({ where: { runId } });
|
const run = await Run.findOne({ where: { runId } });
|
||||||
@@ -44,7 +67,7 @@ export async function updateAirtable(robotId: string, runId: string) {
|
|||||||
robotId,
|
robotId,
|
||||||
plainRobot.airtable_base_id,
|
plainRobot.airtable_base_id,
|
||||||
plainRobot.airtable_table_name,
|
plainRobot.airtable_table_name,
|
||||||
plainRobot.airtable_table_id ,
|
plainRobot.airtable_table_id,
|
||||||
data
|
data
|
||||||
);
|
);
|
||||||
console.log(`Data written to Airtable for ${robotId}`);
|
console.log(`Data written to Airtable for ${robotId}`);
|
||||||
@@ -55,6 +78,46 @@ export async function updateAirtable(robotId: string, runId: string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function withTokenRefresh<T>(robotId: string, apiCall: (accessToken: string) => Promise<T>): Promise<T> {
|
||||||
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
|
||||||
|
if (!robot) throw new Error(`Robot not found for robotId: ${robotId}`);
|
||||||
|
|
||||||
|
let accessToken = robot.get('airtable_access_token') as string;
|
||||||
|
let refreshToken = robot.get('airtable_refresh_token') as string;
|
||||||
|
|
||||||
|
if (!accessToken || !refreshToken) {
|
||||||
|
throw new Error('Airtable credentials not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return await apiCall(accessToken);
|
||||||
|
} catch (error: any) {
|
||||||
|
if (error.response?.status === 401 ||
|
||||||
|
(error.statusCode === 401) ||
|
||||||
|
error.message.includes('unauthorized') ||
|
||||||
|
error.message.includes('expired')) {
|
||||||
|
|
||||||
|
logger.log("info", `Refreshing expired Airtable token for robot: ${robotId}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const tokens = await refreshAirtableToken(refreshToken);
|
||||||
|
|
||||||
|
await robot.update({
|
||||||
|
airtable_access_token: tokens.access_token,
|
||||||
|
airtable_refresh_token: tokens.refresh_token || refreshToken
|
||||||
|
});
|
||||||
|
|
||||||
|
return await apiCall(tokens.access_token);
|
||||||
|
} catch (refreshError: any) {
|
||||||
|
logger.log("error", `Failed to refresh token: ${refreshError.message}`);
|
||||||
|
throw new Error(`Token refresh failed: ${refreshError.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function writeDataToAirtable(
|
export async function writeDataToAirtable(
|
||||||
robotId: string,
|
robotId: string,
|
||||||
baseId: string,
|
baseId: string,
|
||||||
@@ -63,49 +126,115 @@ export async function writeDataToAirtable(
|
|||||||
data: any[]
|
data: any[]
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
|
return await withTokenRefresh(robotId, async (accessToken: string) => {
|
||||||
if (!robot) throw new Error('Robot not found');
|
const airtable = new Airtable({ apiKey: accessToken });
|
||||||
|
const base = airtable.base(baseId);
|
||||||
|
|
||||||
const accessToken = robot.get('airtable_access_token');
|
const existingFields = await getExistingFields(base, tableName);
|
||||||
if (!accessToken) throw new Error('Airtable not connected');
|
const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))];
|
||||||
|
const missingFields = dataFields.filter(field => !existingFields.includes(field));
|
||||||
|
|
||||||
const airtable = new Airtable({ apiKey: accessToken });
|
for (const field of missingFields) {
|
||||||
const base = airtable.base(baseId);
|
const sampleRow = data.find(row => field in row);
|
||||||
|
if (sampleRow) {
|
||||||
// Dynamic field creation logic
|
const sampleValue = sampleRow[field];
|
||||||
const existingFields = await getExistingFields(base, tableName);
|
try {
|
||||||
const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))];
|
await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId);
|
||||||
const missingFields = dataFields.filter(field => !existingFields.includes(field));
|
console.log(`Successfully created field: ${field}`);
|
||||||
|
} catch (fieldError: any) {
|
||||||
for (const field of missingFields) {
|
console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`);
|
||||||
const sampleRow = data.find(row => field in row);
|
}
|
||||||
if (sampleRow) {
|
}
|
||||||
const sampleValue = sampleRow[field];
|
|
||||||
await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId);
|
|
||||||
console.log("from airtable.ts",tableId);
|
|
||||||
console.log("from airtable.ts",tableName);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Batch processing with retries
|
await deleteEmptyRecords(base, tableName);
|
||||||
const batchSize = 10;
|
|
||||||
for (let i = 0; i < data.length; i += batchSize) {
|
|
||||||
const batch = data.slice(i, i + batchSize);
|
|
||||||
await retryableAirtableWrite(base, tableName, batch);
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.log('info', `Successfully wrote ${data.length} records to Airtable`);
|
const records = data.map(row => {
|
||||||
|
const validFields = Object.keys(row).filter(field =>
|
||||||
|
existingFields.includes(field) || missingFields.includes(field)
|
||||||
|
);
|
||||||
|
|
||||||
|
const validData: { [key: string]: any } = {};
|
||||||
|
validFields.forEach((field: any) => {
|
||||||
|
validData[field] = row[field];
|
||||||
|
});
|
||||||
|
|
||||||
|
return { fields: validData };
|
||||||
|
});
|
||||||
|
|
||||||
|
await createRecordsWithRetry(base, tableName, records);
|
||||||
|
|
||||||
|
logger.log('info', `Successfully wrote ${data.length} records to Airtable`);
|
||||||
|
});
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
logger.log('error', `Airtable write failed: ${error.message}`);
|
logger.log('error', `Airtable write failed: ${error.message}`);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise<void> {
|
||||||
|
console.log('Checking for empty records to clear...');
|
||||||
|
|
||||||
|
try {
|
||||||
|
const existingRecords = await base(tableName).select().all();
|
||||||
|
console.log(`Found ${existingRecords.length} total records`);
|
||||||
|
|
||||||
|
const emptyRecords = existingRecords.filter(record => {
|
||||||
|
const fields = record.fields;
|
||||||
|
return !fields || Object.keys(fields).length === 0 ||
|
||||||
|
Object.values(fields).every(value =>
|
||||||
|
value === null || value === undefined || value === '');
|
||||||
|
});
|
||||||
|
|
||||||
|
if (emptyRecords.length > 0) {
|
||||||
|
const BATCH_SIZE = 10;
|
||||||
|
for (let i = 0; i < emptyRecords.length; i += BATCH_SIZE) {
|
||||||
|
const batch = emptyRecords.slice(i, i + BATCH_SIZE);
|
||||||
|
const recordIds = batch.map(record => record.id);
|
||||||
|
await base(tableName).destroy(recordIds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error: any) {
|
||||||
|
console.warn(`Warning: Could not clear empty records: ${error.message}`);
|
||||||
|
console.warn('Will continue without deleting empty records');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createRecordsWithRetry(
|
||||||
|
base: Airtable.Base,
|
||||||
|
tableName: string,
|
||||||
|
records: any[],
|
||||||
|
retries = MAX_RETRIES
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
await base(tableName).create(records);
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error(`Error creating records: ${error.message}`);
|
||||||
|
|
||||||
|
if (retries > 0) {
|
||||||
|
await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY));
|
||||||
|
console.log(`Retrying create operation, ${retries} attempts remaining`);
|
||||||
|
return createRecordsWithRetry(base, tableName, records, retries - 1);
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Helper functions
|
// Helper functions
|
||||||
async function getExistingFields(base: Airtable.Base, tableName: string): Promise<string[]> {
|
async function getExistingFields(base: Airtable.Base, tableName: string): Promise<string[]> {
|
||||||
try {
|
try {
|
||||||
const records = await base(tableName).select({ maxRecords: 1 }).firstPage();
|
const records = await base(tableName).select({ pageSize: 5 }).firstPage();
|
||||||
return records[0] ? Object.keys(records[0].fields) : [];
|
if (records.length > 0) {
|
||||||
|
const fieldNames = new Set<string>();
|
||||||
|
records.forEach(record => {
|
||||||
|
Object.keys(record.fields).forEach(field => fieldNames.add(field));
|
||||||
|
});
|
||||||
|
|
||||||
|
const headers = Array.from(fieldNames);
|
||||||
|
console.log(`Found ${headers.length} headers from records: ${headers.join(', ')}`);
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
return [];
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
@@ -118,28 +247,36 @@ async function createAirtableField(
|
|||||||
sampleValue: any,
|
sampleValue: any,
|
||||||
accessToken: string,
|
accessToken: string,
|
||||||
tableId: string,
|
tableId: string,
|
||||||
|
|
||||||
retries = MAX_RETRIES
|
retries = MAX_RETRIES
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const sanitizedFieldName = sanitizeFieldName(fieldName);
|
const sanitizedFieldName = sanitizeFieldName(fieldName);
|
||||||
const fieldType = inferFieldType(sampleValue);
|
const fieldType = inferFieldType(sampleValue);
|
||||||
|
|
||||||
await axios.post(
|
console.log(`Creating field ${sanitizedFieldName} with type ${fieldType}`);
|
||||||
|
|
||||||
|
const response = await axios.post(
|
||||||
`https://api.airtable.com/v0/meta/bases/${baseId}/tables/${tableId}/fields`,
|
`https://api.airtable.com/v0/meta/bases/${baseId}/tables/${tableId}/fields`,
|
||||||
{ name: sanitizedFieldName, type: fieldType },
|
{ name: sanitizedFieldName, type: fieldType },
|
||||||
{ headers: { Authorization: `Bearer ${accessToken}` } }
|
{ headers: { Authorization: `Bearer ${accessToken}` } }
|
||||||
);
|
);
|
||||||
|
|
||||||
logger.log('info', `Created field: ${sanitizedFieldName} (${fieldType})`);
|
logger.log('info', `Created field: ${sanitizedFieldName} (${fieldType})`);
|
||||||
|
return response.data;
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
if (retries > 0 && error.response?.status === 429) {
|
if (retries > 0 && error.response?.status === 429) {
|
||||||
await delay(BASE_API_DELAY * (MAX_RETRIES - retries + 1));
|
await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY));
|
||||||
return createAirtableField(baseId, tableName, fieldName, sampleValue, accessToken, tableId, retries - 1);
|
return createAirtableField(baseId, tableName, fieldName, sampleValue, accessToken, tableId, retries - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (error.response?.status === 422) {
|
||||||
|
console.log(`Field ${fieldName} may already exist or has validation issues`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const errorMessage = error.response?.data?.error?.message || error.message;
|
const errorMessage = error.response?.data?.error?.message || error.message;
|
||||||
const statusCode = error.response?.status || 'No Status Code';
|
const statusCode = error.response?.status || 'No Status Code';
|
||||||
throw new Error(`Field creation failed (${statusCode}): ${errorMessage}`);
|
console.warn(`Field creation issue (${statusCode}): ${errorMessage}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,27 +309,6 @@ function isValidUrl(str: string): boolean {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function retryableAirtableWrite(
|
|
||||||
base: Airtable.Base,
|
|
||||||
tableName: string,
|
|
||||||
batch: any[],
|
|
||||||
retries = MAX_RETRIES
|
|
||||||
): Promise<void> {
|
|
||||||
try {
|
|
||||||
await base(tableName).create(batch.map(row => ({ fields: row })));
|
|
||||||
} catch (error) {
|
|
||||||
if (retries > 0) {
|
|
||||||
await delay(BASE_API_DELAY);
|
|
||||||
return retryableAirtableWrite(base, tableName, batch, retries - 1);
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function delay(ms: number): Promise<void> {
|
|
||||||
return new Promise(resolve => setTimeout(resolve, ms));
|
|
||||||
}
|
|
||||||
|
|
||||||
export const processAirtableUpdates = async () => {
|
export const processAirtableUpdates = async () => {
|
||||||
while (true) {
|
while (true) {
|
||||||
let hasPendingTasks = false;
|
let hasPendingTasks = false;
|
||||||
@@ -214,7 +330,11 @@ export const processAirtableUpdates = async () => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasPendingTasks) break;
|
if (!hasPendingTasks) {
|
||||||
await delay(5000);
|
console.log('No pending Airtable update tasks, exiting processor');
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Reference in New Issue
Block a user