From 0e6bc22dce0bc397dbf1c5f560d1c21ee880bb39 Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Sat, 29 Nov 2025 11:11:02 +0530 Subject: [PATCH] feat: add max queue integration tasks --- .../integrations/airtable.ts | 80 ++++++++++++------- .../integrations/gsheet.ts | 51 +++++++++--- 2 files changed, 93 insertions(+), 38 deletions(-) diff --git a/server/src/workflow-management/integrations/airtable.ts b/server/src/workflow-management/integrations/airtable.ts index 0437afc3..788cb60b 100644 --- a/server/src/workflow-management/integrations/airtable.ts +++ b/server/src/workflow-management/integrations/airtable.ts @@ -18,8 +18,25 @@ interface SerializableOutput { const MAX_RETRIES = 3; const BASE_API_DELAY = 2000; +const MAX_QUEUE_SIZE = 1000; export let airtableUpdateTasks: { [runId: string]: AirtableUpdateTask } = {}; +let isProcessingAirtable = false; + +export function addAirtableUpdateTask(runId: string, task: AirtableUpdateTask): boolean { + const currentSize = Object.keys(airtableUpdateTasks).length; + + if (currentSize >= MAX_QUEUE_SIZE) { + logger.log('warn', `Airtable task queue full (${currentSize}/${MAX_QUEUE_SIZE}), dropping oldest task`); + const oldestKey = Object.keys(airtableUpdateTasks)[0]; + if (oldestKey) { + delete airtableUpdateTasks[oldestKey]; + } + } + + airtableUpdateTasks[runId] = task; + return true; +} async function refreshAirtableToken(refreshToken: string) { try { @@ -44,15 +61,13 @@ async function refreshAirtableToken(refreshToken: string) { } } - function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Record) { const allRecords: Record[] = []; - + const schemaData: Array<{ Group: string; Field: string; Value: any }> = []; const listData: any[] = []; - const screenshotData: Array<{key: string, url: string}> = []; - - // Collect schema data + const screenshotData: Array<{ key: string; url: string }> = []; + if (serializableOutput.scrapeSchema) { if (Array.isArray(serializableOutput.scrapeSchema)) { for (const schemaArray of serializableOutput.scrapeSchema) { @@ -82,8 +97,7 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: } } } - - // Collect list data + if (serializableOutput.scrapeList) { if (Array.isArray(serializableOutput.scrapeList)) { for (const listArray of serializableOutput.scrapeList) { @@ -107,8 +121,8 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: } } } - - // Collect screenshot data + + // Collect screenshot data (handles both string and object forms safely) // if (binaryOutput && Object.keys(binaryOutput).length > 0) { // Object.entries(binaryOutput).forEach(([key, rawValue]: [string, any]) => { // if (!key || key.trim() === "") return; @@ -136,37 +150,38 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: // } // }); // } - - // Mix all data types together to create consecutive records + + // --- Merge all types into Airtable rows --- const maxLength = Math.max(schemaData.length, listData.length, screenshotData.length); - + for (let i = 0; i < maxLength; i++) { const record: Record = {}; - + if (i < schemaData.length) { record.Group = schemaData[i].Group; record.Label = schemaData[i].Field; record.Value = schemaData[i].Value; } - + if (i < listData.length) { - Object.entries(listData[i]).forEach(([key, value]) => { - if (value !== null && value !== undefined && value !== '') { + Object.entries(listData[i] || {}).forEach(([key, value]) => { + if (value !== null && value !== undefined && value !== "") { record[key] = value; } }); } - + if (i < screenshotData.length) { record.Key = screenshotData[i].key; record.Screenshot = screenshotData[i].url; } - + if (Object.keys(record).length > 0) { allRecords.push(record); } } - + + // Push leftovers for (let i = maxLength; i < schemaData.length; i++) { allRecords.push({ Label: schemaData[i].Field, Value: schemaData[i].Value }); } @@ -179,7 +194,7 @@ function mergeRelatedData(serializableOutput: SerializableOutput, binaryOutput: Screenshot: screenshotData[i].url, }); } - + return allRecords; } @@ -497,10 +512,18 @@ function isValidUrl(str: string): boolean { } export const processAirtableUpdates = async () => { - const maxProcessingTime = 60000; - const startTime = Date.now(); + if (isProcessingAirtable) { + logger.log('info', 'Airtable processing already in progress, skipping'); + return; + } - while (Date.now() - startTime < maxProcessingTime) { + isProcessingAirtable = true; + + try { + const maxProcessingTime = 60000; + const startTime = Date.now(); + + while (Date.now() - startTime < maxProcessingTime) { let hasPendingTasks = false; for (const runId in airtableUpdateTasks) { @@ -535,9 +558,12 @@ export const processAirtableUpdates = async () => { break; } - console.log('Waiting for 5 seconds before checking again...'); - await new Promise(resolve => setTimeout(resolve, 5000)); - } + console.log('Waiting for 5 seconds before checking again...'); + await new Promise(resolve => setTimeout(resolve, 5000)); + } - console.log('Airtable processing completed or timed out'); + console.log('Airtable processing completed or timed out'); + } finally { + isProcessingAirtable = false; + } }; \ No newline at end of file diff --git a/server/src/workflow-management/integrations/gsheet.ts b/server/src/workflow-management/integrations/gsheet.ts index c32e4fe0..b0871b75 100644 --- a/server/src/workflow-management/integrations/gsheet.ts +++ b/server/src/workflow-management/integrations/gsheet.ts @@ -11,13 +11,31 @@ interface GoogleSheetUpdateTask { } interface SerializableOutput { - scrapeSchema?: any[]; - scrapeList?: any[]; + scrapeSchema?: Record; + scrapeList?: Record; } + const MAX_RETRIES = 5; +const MAX_QUEUE_SIZE = 1000; export let googleSheetUpdateTasks: { [runId: string]: GoogleSheetUpdateTask } = {}; +let isProcessingGoogleSheets = false; + +export function addGoogleSheetUpdateTask(runId: string, task: GoogleSheetUpdateTask): boolean { + const currentSize = Object.keys(googleSheetUpdateTasks).length; + + if (currentSize >= MAX_QUEUE_SIZE) { + logger.log('warn', `Google Sheets task queue full (${currentSize}/${MAX_QUEUE_SIZE}), dropping oldest task`); + const oldestKey = Object.keys(googleSheetUpdateTasks)[0]; + if (oldestKey) { + delete googleSheetUpdateTasks[oldestKey]; + } + } + + googleSheetUpdateTasks[runId] = task; + return true; +} export async function updateGoogleSheet(robotId: string, runId: string) { try { @@ -144,7 +162,7 @@ async function ensureSheetExists(spreadsheetId: string, sheetName: string, robot fields: 'sheets.properties.title' }); - const existingSheets = response.data.sheets?.map(sheet => sheet.properties?.title) || []; + const existingSheets = response.data.sheets?.map((sheet: any) => sheet.properties?.title) || []; if (!existingSheets.includes(sheetName)) { await sheets.spreadsheets.batchUpdate({ @@ -219,7 +237,7 @@ export async function writeDataToSheet( refresh_token: robot.google_refresh_token, }); - oauth2Client.on('tokens', async (tokens) => { + oauth2Client.once('tokens', async (tokens: any) => { if (tokens.refresh_token || tokens.access_token) { const robotModel = await Robot.findOne({ where: { 'recording_meta.id': robotId } }); if (robotModel) { @@ -292,10 +310,18 @@ export async function writeDataToSheet( } export const processGoogleSheetUpdates = async () => { - const maxProcessingTime = 60000; - const startTime = Date.now(); + if (isProcessingGoogleSheets) { + logger.log('info', 'Google Sheets processing already in progress, skipping'); + return; + } - while (Date.now() - startTime < maxProcessingTime) { + isProcessingGoogleSheets = true; + + try { + const maxProcessingTime = 60000; + const startTime = Date.now(); + + while (Date.now() - startTime < maxProcessingTime) { let hasPendingTasks = false; for (const runId in googleSheetUpdateTasks) { @@ -328,9 +354,12 @@ export const processGoogleSheetUpdates = async () => { break; } - console.log('Waiting for 5 seconds before checking again...'); - await new Promise(resolve => setTimeout(resolve, 5000)); - } + console.log('Waiting for 5 seconds before checking again...'); + await new Promise(resolve => setTimeout(resolve, 5000)); + } - console.log('Google Sheets processing completed or timed out'); + console.log('Google Sheets processing completed or timed out'); + } finally { + isProcessingGoogleSheets = false; + } }; \ No newline at end of file