diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index b501f6c9..c1511b2f 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -333,6 +333,12 @@ async function processRunExecution(job: Job) { // Schedule updates for Google Sheets and Airtable await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); + // Flush any remaining persistence buffer before emitting socket event + if (browser && browser.interpreter) { + await browser.interpreter.flushPersistenceBuffer(); + logger.log('debug', `Flushed persistence buffer before emitting run-completed for run ${data.runId}`); + } + const completionData = { runId: data.runId, robotMetaId: plainRun.robotMetaId, diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index f46ae160..5e843a80 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -300,6 +300,10 @@ export class WorkflowInterpreter { this.socket.emit('log', `----- The interpretation finished with status: ${status} -----`, false); logger.log('debug', `Interpretation finished`); + + // Flush any remaining data in persistence buffer before completing + await this.flushPersistenceBuffer(); + this.interpreter = null; this.socket.emit('activePairId', -1); this.interpretationIsPaused = false; @@ -606,9 +610,9 @@ export class WorkflowInterpreter { /** * Flushes persistence buffer to database in a single transaction - * @private + * @public - Made public to allow external flush before socket emission */ - private async flushPersistenceBuffer(): Promise { + public async flushPersistenceBuffer(): Promise { if (this.persistenceBuffer.length === 0 || this.persistenceInProgress || !this.currentRunId) { return; } diff --git a/src/components/run/RunsTable.tsx b/src/components/run/RunsTable.tsx index 4bd3a40c..3729b1ca 100644 --- a/src/components/run/RunsTable.tsx +++ b/src/components/run/RunsTable.tsx @@ -276,12 +276,14 @@ export const RunsTable: React.FC = ({ }, [debouncedSearch]); + // Handle rerender requests using cache invalidation useEffect(() => { if (rerenderRuns) { + // Invalidate cache to force refetch refetch(); setRerenderRuns(false); } - }, [rerenderRuns, setRerenderRuns, refetch]); + }, [rerenderRuns, refetch, setRerenderRuns]); const handleDelete = useCallback(() => { notify('success', t('runstable.notifications.delete_success')); @@ -373,7 +375,7 @@ export const RunsTable: React.FC = ({ urlRunId={urlRunId} /> )); - }, [getPaginationState, accordionSortConfigs, expandedRows, handleRowExpand, handleDelete, currentInterpretationLog, abortRunHandler, runningRecordingName, urlRunId]); + }, [paginationStates, runId, runningRecordingName, currentInterpretationLog, abortRunHandler, handleDelete, accordionSortConfigs]); const renderSortIcon = useCallback((column: Column, robotMetaId: string) => { const sortConfig = accordionSortConfigs[robotMetaId]; diff --git a/src/context/socket.tsx b/src/context/socket.tsx index 25440f50..ea5b049a 100644 --- a/src/context/socket.tsx +++ b/src/context/socket.tsx @@ -115,6 +115,7 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => { }); socketStore.queueSocket = null; + runStartedCallbackRef.current = null; runCompletedCallbackRef.current = null; runRecoveredCallbackRef.current = null; runScheduledCallbackRef.current = null; diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index a0a86707..86178117 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -50,7 +50,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) let aborted = false; const { notify, setRerenderRuns, setRecordingId } = useGlobalInfoStore(); - const { invalidateRuns } = useCacheInvalidation(); + const { invalidateRuns, addOptimisticRun } = useCacheInvalidation(); const navigate = useNavigate(); const { state } = useContext(AuthContext); @@ -132,7 +132,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) setRerenderRuns(true); invalidateRuns(); }) - }, [runningRecordingName, aborted, currentInterpretationLog, notify, setRerenderRuns, invalidateRuns]); + }, [runningRecordingName, aborted, currentInterpretationLog, notify, setRerenderRuns]); const debugMessageHandler = useCallback((msg: string) => { setCurrentInterpretationLog((prevState) => @@ -140,11 +140,26 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) }, [currentInterpretationLog]) const handleRunRecording = useCallback((settings: RunSettings) => { + // Add optimistic run to cache immediately + const optimisticRun = { + id: runningRecordingId, + runId: `temp-${Date.now()}`, // Temporary ID until we get the real one + status: 'running', + name: runningRecordingName, + startedAt: new Date().toISOString(), + finishedAt: '', + robotMetaId: runningRecordingId, + log: 'Starting...', + isOptimistic: true + }; + + addOptimisticRun(optimisticRun); + createAndRunRecording(runningRecordingId, settings).then((response: CreateRunResponseWithQueue) => { + invalidateRuns(); const { browserId, runId, robotMetaId, queued } = response; setIds({ browserId, runId, robotMetaId }); - invalidateRuns(); navigate(`/runs/${robotMetaId}/run/${runId}`); if (queued) { @@ -160,8 +175,6 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) socket.on('debugMessage', debugMessageHandler); socket.on('run-completed', (data) => { - setRunningRecordingName(''); - setCurrentInterpretationLog(''); setRerenderRuns(true); invalidateRuns(); @@ -201,7 +214,13 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) socket.off('connect_error'); socket.off('disconnect'); } - }, [runningRecordingName, sockets, ids, debugMessageHandler, user?.id, t, notify, setRerenderRuns, setQueuedRuns, navigate, setContent, setIds, invalidateRuns]); + }, [runningRecordingName, sockets, ids, debugMessageHandler, user?.id, t, notify, setRerenderRuns, setQueuedRuns, navigate, setContent, setIds, invalidateRuns, addOptimisticRun, runningRecordingId]); + + useEffect(() => { + return () => { + queuedRuns.clear(); + }; + }, []); const handleScheduleRecording = async (settings: ScheduleSettings) => { const { message, runId }: ScheduleRunResponse = await scheduleStoredRecording(runningRecordingId, settings); @@ -225,7 +244,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) const handleRunCompleted = (completionData: any) => { setRerenderRuns(true); - invalidateRuns(); + invalidateRuns(); // Invalidate cache to show completed run status if (queuedRuns.has(completionData.runId)) { setQueuedRuns(prev => {