feat: revamp airtable integration multiple actions
This commit is contained in:
@@ -11,6 +11,12 @@ interface AirtableUpdateTask {
|
|||||||
retries: number;
|
retries: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface SerializableOutput {
|
||||||
|
scrapeSchema?: any[];
|
||||||
|
scrapeList?: any[];
|
||||||
|
other?: any[];
|
||||||
|
}
|
||||||
|
|
||||||
const MAX_RETRIES = 3;
|
const MAX_RETRIES = 3;
|
||||||
const BASE_API_DELAY = 2000;
|
const BASE_API_DELAY = 2000;
|
||||||
|
|
||||||
@@ -39,38 +45,117 @@ async function refreshAirtableToken(refreshToken: string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record<string, string>) {
|
||||||
|
const mergedRecords: Record<string, any>[] = [];
|
||||||
|
|
||||||
|
const maxLength = Math.max(
|
||||||
|
...(serializableOutput.scrapeSchema?.map(array => array?.length || 0) || [0]),
|
||||||
|
...(serializableOutput.scrapeList?.map(array => array?.length || 0) || [0]),
|
||||||
|
...(serializableOutput.other?.map(array => array?.length || 0) || [0])
|
||||||
|
);
|
||||||
|
|
||||||
|
for (let i = 0; i < maxLength; i++) {
|
||||||
|
mergedRecords.push({});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (serializableOutput.scrapeSchema) {
|
||||||
|
for (const schemaArray of serializableOutput.scrapeSchema) {
|
||||||
|
if (!Array.isArray(schemaArray)) continue;
|
||||||
|
|
||||||
|
for (let i = 0; i < schemaArray.length; i++) {
|
||||||
|
if (i >= mergedRecords.length) break;
|
||||||
|
mergedRecords[i] = { ...mergedRecords[i], ...schemaArray[i] };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (serializableOutput.scrapeList) {
|
||||||
|
for (const listArray of serializableOutput.scrapeList) {
|
||||||
|
if (!Array.isArray(listArray)) continue;
|
||||||
|
|
||||||
|
for (let i = 0; i < listArray.length; i++) {
|
||||||
|
if (i >= mergedRecords.length) break;
|
||||||
|
mergedRecords[i] = { ...mergedRecords[i], ...listArray[i] };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (serializableOutput.other) {
|
||||||
|
for (const otherArray of serializableOutput.other) {
|
||||||
|
if (!Array.isArray(otherArray)) continue;
|
||||||
|
|
||||||
|
for (let i = 0; i < otherArray.length; i++) {
|
||||||
|
if (i >= mergedRecords.length) break;
|
||||||
|
mergedRecords[i] = { ...mergedRecords[i], ...otherArray[i] };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (binaryOutput && Object.keys(binaryOutput).length > 0) {
|
||||||
|
for (let i = 0; i < mergedRecords.length; i++) {
|
||||||
|
const screenshotKey = `item-${i}`;
|
||||||
|
if (binaryOutput[screenshotKey]) {
|
||||||
|
mergedRecords[i].Screenshot = binaryOutput[screenshotKey];
|
||||||
|
mergedRecords[i].Key = screenshotKey;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [key, url] of Object.entries(binaryOutput)) {
|
||||||
|
if (mergedRecords.some(record => record.Key === key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
mergedRecords.push({
|
||||||
|
"Key": key,
|
||||||
|
"Screenshot": url
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return mergedRecords;
|
||||||
|
}
|
||||||
|
|
||||||
export async function updateAirtable(robotId: string, runId: string) {
|
export async function updateAirtable(robotId: string, runId: string) {
|
||||||
try {
|
try {
|
||||||
|
console.log(`Starting Airtable update for run: ${runId}, robot: ${robotId}`);
|
||||||
|
|
||||||
const run = await Run.findOne({ where: { runId } });
|
const run = await Run.findOne({ where: { runId } });
|
||||||
if (!run) throw new Error(`Run not found for runId: ${runId}`);
|
if (!run) throw new Error(`Run not found for runId: ${runId}`);
|
||||||
|
|
||||||
const plainRun = run.toJSON();
|
const plainRun = run.toJSON();
|
||||||
if (plainRun.status !== 'success') {
|
if (plainRun.status !== 'success') {
|
||||||
console.log('Run status is not success');
|
console.log('Run status is not success, skipping Airtable update');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let data: { [key: string]: any }[] = [];
|
|
||||||
if (plainRun.serializableOutput?.['item-0']) {
|
|
||||||
data = plainRun.serializableOutput['item-0'] as { [key: string]: any }[];
|
|
||||||
} else if (plainRun.binaryOutput?.['item-0']) {
|
|
||||||
data = [{ "File URL": plainRun.binaryOutput['item-0'] }];
|
|
||||||
}
|
|
||||||
|
|
||||||
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
|
const robot = await Robot.findOne({ where: { 'recording_meta.id': robotId } });
|
||||||
if (!robot) throw new Error(`Robot not found for robotId: ${robotId}`);
|
if (!robot) throw new Error(`Robot not found for robotId: ${robotId}`);
|
||||||
|
|
||||||
const plainRobot = robot.toJSON();
|
const plainRobot = robot.toJSON();
|
||||||
if (plainRobot.airtable_base_id && plainRobot.airtable_table_name && plainRobot.airtable_table_id) {
|
|
||||||
console.log(`Writing to Airtable base ${plainRobot.airtable_base_id}`);
|
if (!plainRobot.airtable_base_id || !plainRobot.airtable_table_name || !plainRobot.airtable_table_id) {
|
||||||
|
console.log('Airtable integration not configured');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Airtable configuration found - Base: ${plainRobot.airtable_base_id}, Table: ${plainRobot.airtable_table_name}`);
|
||||||
|
|
||||||
|
const serializableOutput = plainRun.serializableOutput as SerializableOutput;
|
||||||
|
const binaryOutput = plainRun.binaryOutput || {};
|
||||||
|
|
||||||
|
const mergedData = mergeRelatedData(serializableOutput, binaryOutput);
|
||||||
|
|
||||||
|
if (mergedData.length > 0) {
|
||||||
await writeDataToAirtable(
|
await writeDataToAirtable(
|
||||||
robotId,
|
robotId,
|
||||||
plainRobot.airtable_base_id,
|
plainRobot.airtable_base_id,
|
||||||
plainRobot.airtable_table_name,
|
plainRobot.airtable_table_name,
|
||||||
plainRobot.airtable_table_id,
|
plainRobot.airtable_table_id,
|
||||||
data
|
mergedData
|
||||||
);
|
);
|
||||||
console.log(`Data written to Airtable for ${robotId}`);
|
console.log(`All data written to Airtable for ${robotId}`);
|
||||||
|
} else {
|
||||||
|
console.log(`No data to write to Airtable for ${robotId}`);
|
||||||
}
|
}
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
console.error(`Airtable update failed: ${error.message}`);
|
console.error(`Airtable update failed: ${error.message}`);
|
||||||
@@ -125,42 +210,142 @@ export async function writeDataToAirtable(
|
|||||||
tableId: string,
|
tableId: string,
|
||||||
data: any[]
|
data: any[]
|
||||||
) {
|
) {
|
||||||
|
if (!data || data.length === 0) {
|
||||||
|
console.log('No data to write. Skipping.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await withTokenRefresh(robotId, async (accessToken: string) => {
|
return await withTokenRefresh(robotId, async (accessToken: string) => {
|
||||||
const airtable = new Airtable({ apiKey: accessToken });
|
const airtable = new Airtable({ apiKey: accessToken });
|
||||||
const base = airtable.base(baseId);
|
const base = airtable.base(baseId);
|
||||||
|
|
||||||
const existingFields = await getExistingFields(base, tableName);
|
const processedData = data.map(item => {
|
||||||
console.log(`Found ${existingFields.length} existing fields in Airtable`);
|
const cleanedItem: Record<string, any> = {};
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(item)) {
|
||||||
|
if (value === null || value === undefined) {
|
||||||
|
cleanedItem[key] = '';
|
||||||
|
} else if (typeof value === 'object' && !Array.isArray(value)) {
|
||||||
|
cleanedItem[key] = JSON.stringify(value);
|
||||||
|
} else {
|
||||||
|
cleanedItem[key] = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return cleanedItem;
|
||||||
|
});
|
||||||
|
|
||||||
const dataFields = [...new Set(data.flatMap(row => Object.keys(row)))];
|
const existingFields = await getExistingFields(base, tableName);
|
||||||
|
console.log(`Found ${existingFields.length} existing fields in Airtable: ${existingFields.join(', ')}`);
|
||||||
|
|
||||||
|
const dataFields = [...new Set(processedData.flatMap(row => Object.keys(row)))];
|
||||||
console.log(`Found ${dataFields.length} fields in data: ${dataFields.join(', ')}`);
|
console.log(`Found ${dataFields.length} fields in data: ${dataFields.join(', ')}`);
|
||||||
|
|
||||||
const missingFields = dataFields.filter(field => !existingFields.includes(field));
|
const missingFields = dataFields.filter(field => !existingFields.includes(field));
|
||||||
console.log(`Found ${missingFields.length} missing fields: ${missingFields.join(', ')}`);
|
const hasNewColumns = missingFields.length > 0;
|
||||||
|
console.log(`Found ${missingFields.length} new fields: ${missingFields.join(', ')}`);
|
||||||
|
|
||||||
for (const field of missingFields) {
|
for (const field of missingFields) {
|
||||||
const sampleRow = data.find(row => field in row);
|
const sampleRow = processedData.find(row => field in row);
|
||||||
if (sampleRow) {
|
if (sampleRow) {
|
||||||
const sampleValue = sampleRow[field];
|
const sampleValue = sampleRow[field];
|
||||||
try {
|
try {
|
||||||
await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId);
|
await createAirtableField(baseId, tableName, field, sampleValue, accessToken, tableId);
|
||||||
console.log(`Successfully created field: ${field}`);
|
console.log(`Successfully created field: ${field}`);
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 200));
|
||||||
} catch (fieldError: any) {
|
} catch (fieldError: any) {
|
||||||
console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`);
|
console.warn(`Warning: Could not create field "${field}": ${fieldError.message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await deleteEmptyRecords(base, tableName);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 10;
|
let existingRecords: Array<{ id: string, fields: Record<string, any> }> = [];
|
||||||
for (let i = 0; i < data.length; i += BATCH_SIZE) {
|
|
||||||
const batch = data.slice(i, i + BATCH_SIZE);
|
if (hasNewColumns) {
|
||||||
await retryableAirtableWrite(base, tableName, batch);
|
existingRecords = await fetchAllRecords(base, tableName);
|
||||||
|
console.log(`Found ${existingRecords.length} existing records in Airtable`);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.log('info', `Successfully wrote ${data.length} records to Airtable`);
|
if (hasNewColumns && existingRecords.length > 0) {
|
||||||
|
const recordsToUpdate = [];
|
||||||
|
const recordsToCreate = [];
|
||||||
|
|
||||||
|
const newColumnData = processedData.map(record => {
|
||||||
|
const newColumnsOnly: Record<string, any> = {};
|
||||||
|
missingFields.forEach(field => {
|
||||||
|
if (field in record) {
|
||||||
|
newColumnsOnly[field] = record[field];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return newColumnsOnly;
|
||||||
|
});
|
||||||
|
|
||||||
|
for (let i = 0; i < Math.min(existingRecords.length, newColumnData.length); i++) {
|
||||||
|
if (Object.keys(newColumnData[i]).length > 0) {
|
||||||
|
recordsToUpdate.push({
|
||||||
|
id: existingRecords[i].id,
|
||||||
|
fields: newColumnData[i]
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const existingColumnsBeingUpdated = dataFields.filter(field =>
|
||||||
|
existingFields.includes(field) && !missingFields.includes(field)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (existingColumnsBeingUpdated.length > 0) {
|
||||||
|
recordsToCreate.push(...processedData.map(record => ({ fields: record })));
|
||||||
|
console.log(`Will append ${recordsToCreate.length} new records with all data`);
|
||||||
|
} else {
|
||||||
|
if (processedData.length > existingRecords.length) {
|
||||||
|
const additionalRecords = processedData.slice(existingRecords.length);
|
||||||
|
recordsToCreate.push(...additionalRecords.map(record => ({ fields: record })));
|
||||||
|
console.log(`Will append ${recordsToCreate.length} additional records`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (recordsToUpdate.length > 0) {
|
||||||
|
console.log(`Updating ${recordsToUpdate.length} existing records with new columns`);
|
||||||
|
const BATCH_SIZE = 10;
|
||||||
|
for (let i = 0; i < recordsToUpdate.length; i += BATCH_SIZE) {
|
||||||
|
const batch = recordsToUpdate.slice(i, i + BATCH_SIZE);
|
||||||
|
console.log(`Updating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToUpdate.length/BATCH_SIZE)}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await retryableAirtableUpdate(base, tableName, batch);
|
||||||
|
} catch (batchError: any) {
|
||||||
|
console.error(`Error updating batch: ${batchError.message}`);
|
||||||
|
throw batchError;
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
console.log(`Appending all ${processedData.length} records to Airtable`);
|
||||||
|
const recordsToCreate = processedData.map(record => ({ fields: record }));
|
||||||
|
|
||||||
|
const BATCH_SIZE = 10;
|
||||||
|
for (let i = 0; i < recordsToCreate.length; i += BATCH_SIZE) {
|
||||||
|
const batch = recordsToCreate.slice(i, i + BATCH_SIZE);
|
||||||
|
console.log(`Creating batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(recordsToCreate.length/BATCH_SIZE)}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await retryableAirtableCreate(base, tableName, batch);
|
||||||
|
} catch (batchError: any) {
|
||||||
|
console.error(`Error creating batch: ${batchError.message}`);
|
||||||
|
throw batchError;
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 500));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await deleteEmptyRecords(base, tableName);
|
||||||
|
|
||||||
|
logger.log('info', `Successfully processed ${processedData.length} records in Airtable`);
|
||||||
});
|
});
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
logger.log('error', `Airtable write failed: ${error.message}`);
|
logger.log('error', `Airtable write failed: ${error.message}`);
|
||||||
@@ -168,6 +353,20 @@ export async function writeDataToAirtable(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function fetchAllRecords(base: Airtable.Base, tableName: string): Promise<Array<{ id: string, fields: Record<string, any> }>> {
|
||||||
|
try {
|
||||||
|
console.log(`Fetching all records from ${tableName}...`);
|
||||||
|
const records = await base(tableName).select().all();
|
||||||
|
return records.map(record => ({
|
||||||
|
id: record.id,
|
||||||
|
fields: record.fields
|
||||||
|
}));
|
||||||
|
} catch (error: any) {
|
||||||
|
console.warn(`Warning: Could not fetch all records: ${error.message}`);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise<void> {
|
async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promise<void> {
|
||||||
console.log('Checking for empty records to clear...');
|
console.log('Checking for empty records to clear...');
|
||||||
|
|
||||||
@@ -183,31 +382,53 @@ async function deleteEmptyRecords(base: Airtable.Base, tableName: string): Promi
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (emptyRecords.length > 0) {
|
if (emptyRecords.length > 0) {
|
||||||
|
console.log(`Found ${emptyRecords.length} empty records to delete`);
|
||||||
const BATCH_SIZE = 10;
|
const BATCH_SIZE = 10;
|
||||||
for (let i = 0; i < emptyRecords.length; i += BATCH_SIZE) {
|
for (let i = 0; i < emptyRecords.length; i += BATCH_SIZE) {
|
||||||
const batch = emptyRecords.slice(i, i + BATCH_SIZE);
|
const batch = emptyRecords.slice(i, i + BATCH_SIZE);
|
||||||
const recordIds = batch.map(record => record.id);
|
const recordIds = batch.map(record => record.id);
|
||||||
await base(tableName).destroy(recordIds);
|
await base(tableName).destroy(recordIds);
|
||||||
|
console.log(`Deleted batch ${Math.floor(i/BATCH_SIZE) + 1} of ${Math.ceil(emptyRecords.length/BATCH_SIZE)}`);
|
||||||
}
|
}
|
||||||
}
|
console.log(`Successfully deleted ${emptyRecords.length} empty records`);
|
||||||
|
} else {
|
||||||
|
console.log('No empty records found to delete');
|
||||||
|
}
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
console.warn(`Warning: Could not clear empty records: ${error.message}`);
|
console.warn(`Warning: Could not clear empty records: ${error.message}`);
|
||||||
console.warn('Will continue without deleting empty records');
|
console.warn('Will continue without deleting empty records');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function retryableAirtableWrite(
|
async function retryableAirtableCreate(
|
||||||
base: Airtable.Base,
|
base: Airtable.Base,
|
||||||
tableName: string,
|
tableName: string,
|
||||||
batch: any[],
|
batch: any[],
|
||||||
retries = MAX_RETRIES
|
retries = MAX_RETRIES
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
await base(tableName).create(batch.map(row => ({ fields: row })));
|
await base(tableName).create(batch);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (retries > 0) {
|
if (retries > 0) {
|
||||||
await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY));
|
await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY));
|
||||||
return retryableAirtableWrite(base, tableName, batch, retries - 1);
|
return retryableAirtableCreate(base, tableName, batch, retries - 1);
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function retryableAirtableUpdate(
|
||||||
|
base: Airtable.Base,
|
||||||
|
tableName: string,
|
||||||
|
batch: any[],
|
||||||
|
retries = MAX_RETRIES
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
await base(tableName).update(batch);
|
||||||
|
} catch (error) {
|
||||||
|
if (retries > 0) {
|
||||||
|
await new Promise(resolve => setTimeout(resolve, BASE_API_DELAY));
|
||||||
|
return retryableAirtableUpdate(base, tableName, batch, retries - 1);
|
||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
@@ -217,18 +438,19 @@ async function retryableAirtableWrite(
|
|||||||
async function getExistingFields(base: Airtable.Base, tableName: string): Promise<string[]> {
|
async function getExistingFields(base: Airtable.Base, tableName: string): Promise<string[]> {
|
||||||
try {
|
try {
|
||||||
const records = await base(tableName).select({ pageSize: 5 }).firstPage();
|
const records = await base(tableName).select({ pageSize: 5 }).firstPage();
|
||||||
|
const fieldNames = new Set<string>();
|
||||||
|
|
||||||
if (records.length > 0) {
|
if (records.length > 0) {
|
||||||
const fieldNames = new Set<string>();
|
|
||||||
records.forEach(record => {
|
records.forEach(record => {
|
||||||
Object.keys(record.fields).forEach(field => fieldNames.add(field));
|
Object.keys(record.fields).forEach(field => fieldNames.add(field));
|
||||||
});
|
});
|
||||||
|
|
||||||
const headers = Array.from(fieldNames);
|
|
||||||
console.log(`Found ${headers.length} headers from records: ${headers.join(', ')}`);
|
|
||||||
return headers;
|
|
||||||
}
|
}
|
||||||
return [];
|
|
||||||
|
const headers = Array.from(fieldNames);
|
||||||
|
console.log(`Found ${headers.length} headers from records: ${headers.join(', ')}`);
|
||||||
|
return headers;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
console.warn(`Warning: Error fetching existing fields: ${error}`);
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -299,17 +521,27 @@ export const processAirtableUpdates = async () => {
|
|||||||
|
|
||||||
for (const runId in airtableUpdateTasks) {
|
for (const runId in airtableUpdateTasks) {
|
||||||
const task = airtableUpdateTasks[runId];
|
const task = airtableUpdateTasks[runId];
|
||||||
if (task.status !== 'pending') continue;
|
|
||||||
|
if (task.status === 'pending') {
|
||||||
hasPendingTasks = true;
|
hasPendingTasks = true;
|
||||||
try {
|
console.log(`Processing Airtable update for run: ${runId}`);
|
||||||
await updateAirtable(task.robotId, task.runId);
|
|
||||||
delete airtableUpdateTasks[runId];
|
try {
|
||||||
} catch (error: any) {
|
await updateAirtable(task.robotId, task.runId);
|
||||||
task.retries += 1;
|
console.log(`Successfully updated Airtable for runId: ${runId}`);
|
||||||
if (task.retries >= MAX_RETRIES) {
|
airtableUpdateTasks[runId].status = 'completed';
|
||||||
task.status = 'failed';
|
delete airtableUpdateTasks[runId];
|
||||||
logger.log('error', `Permanent failure for run ${runId}: ${error.message}`);
|
} catch (error: any) {
|
||||||
|
console.error(`Failed to update Airtable for run ${task.runId}:`, error);
|
||||||
|
|
||||||
|
if (task.retries < MAX_RETRIES) {
|
||||||
|
airtableUpdateTasks[runId].retries += 1;
|
||||||
|
console.log(`Retrying task for runId: ${runId}, attempt: ${task.retries + 1}`);
|
||||||
|
} else {
|
||||||
|
airtableUpdateTasks[runId].status = 'failed';
|
||||||
|
console.log(`Max retries reached for runId: ${runId}. Marking task as failed.`);
|
||||||
|
logger.log('error', `Permanent failure for run ${runId}: ${error.message}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -319,6 +551,7 @@ export const processAirtableUpdates = async () => {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log('Waiting for 5 seconds before checking again...');
|
||||||
await new Promise(resolve => setTimeout(resolve, 5000));
|
await new Promise(resolve => setTimeout(resolve, 5000));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Reference in New Issue
Block a user