From 6b7a1f8d26443de4a5d108c7b694648132c3844e Mon Sep 17 00:00:00 2001 From: Rohit Rajan Date: Mon, 29 Sep 2025 18:26:55 +0530 Subject: [PATCH] fix: socket conn handling --- .../classes/Interpreter.ts | 4 +++ src/context/socket.tsx | 33 ++++++++++++++++-- src/pages/MainPage.tsx | 34 +++++++++++++++++-- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index 9dc72e67..f46ae160 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -689,6 +689,10 @@ export class WorkflowInterpreter { } } finally { this.persistenceInProgress = false; + + if (this.persistenceBuffer.length > 0 && !this.persistenceTimer) { + this.scheduleBatchFlush(); + } } }; } diff --git a/src/context/socket.tsx b/src/context/socket.tsx index 05a8c989..25440f50 100644 --- a/src/context/socket.tsx +++ b/src/context/socket.tsx @@ -9,7 +9,7 @@ interface SocketState { queueSocket: Socket | null; id: string; setId: (id: string) => void; - connectToQueueSocket: (userId: string, onRunCompleted?: (data: any) => void) => void; + connectToQueueSocket: (userId: string, onRunCompleted?: (data: any) => void, onRunStarted?: (data: any) => void, onRunRecovered?: (data: any) => void, onRunScheduled?: (data: any) => void) => void; disconnectQueueSocket: () => void; }; @@ -29,6 +29,9 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => { const [queueSocket, setQueueSocket] = useState(socketStore.queueSocket); const [id, setActiveId] = useState(socketStore.id); const runCompletedCallbackRef = useRef<((data: any) => void) | null>(null); + const runStartedCallbackRef = useRef<((data: any) => void) | null>(null); + const runRecoveredCallbackRef = useRef<((data: any) => void) | null>(null); + const runScheduledCallbackRef = useRef<((data: any) => void) | null>(null); const setId = useCallback((id: string) => { // the socket client connection is recomputed whenever id changes -> the new browser has been initialized @@ -45,8 +48,11 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => { setActiveId(id); }, [setSocket]); - const connectToQueueSocket = useCallback((userId: string, onRunCompleted?: (data: any) => void) => { + const connectToQueueSocket = useCallback((userId: string, onRunCompleted?: (data: any) => void, onRunStarted?: (data: any) => void, onRunRecovered?: (data: any) => void, onRunScheduled?: (data: any) => void) => { runCompletedCallbackRef.current = onRunCompleted || null; + runStartedCallbackRef.current = onRunStarted || null; + runRecoveredCallbackRef.current = onRunRecovered || null; + runScheduledCallbackRef.current = onRunScheduled || null; const newQueueSocket = io(`${SERVER_ENDPOINT}/queued-run`, { transports: ["websocket"], @@ -69,6 +75,27 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => { } }); + newQueueSocket.on('run-started', (startedData) => { + console.log('Run started event received:', startedData); + if (runStartedCallbackRef.current) { + runStartedCallbackRef.current(startedData); + } + }); + + newQueueSocket.on('run-recovered', (recoveredData) => { + console.log('Run recovered event received:', recoveredData); + if (runRecoveredCallbackRef.current) { + runRecoveredCallbackRef.current(recoveredData); + } + }); + + newQueueSocket.on('run-scheduled', (scheduledData) => { + console.log('Run scheduled event received:', scheduledData); + if (runScheduledCallbackRef.current) { + runScheduledCallbackRef.current(scheduledData); + } + }); + setQueueSocket(currentSocket => { if (currentSocket) { currentSocket.disconnect(); @@ -89,6 +116,8 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => { socketStore.queueSocket = null; runCompletedCallbackRef.current = null; + runRecoveredCallbackRef.current = null; + runScheduledCallbackRef.current = null; }, []); // Cleanup on unmount diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index 2067fc88..a0a86707 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -215,6 +215,14 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) useEffect(() => { if (user?.id) { + const handleRunStarted = (startedData: any) => { + setRerenderRuns(true); + invalidateRuns(); + + const robotName = startedData.robotName || 'Unknown Robot'; + notify('info', t('main_page.notifications.run_started', { name: robotName })); + }; + const handleRunCompleted = (completionData: any) => { setRerenderRuns(true); invalidateRuns(); @@ -235,14 +243,36 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); } }; + + const handleRunRecovered = (recoveredData: any) => { + setRerenderRuns(true); + invalidateRuns(); + + if (queuedRuns.has(recoveredData.runId)) { + setQueuedRuns(prev => { + const newSet = new Set(prev); + newSet.delete(recoveredData.runId); + return newSet; + }); + } + + const robotName = recoveredData.robotName || 'Unknown Robot'; + notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); + }; + + const handleRunScheduled = (scheduledData: any) => { + setRerenderRuns(true); + invalidateRuns(); + }; - connectToQueueSocket(user.id, handleRunCompleted); + connectToQueueSocket(user.id, handleRunCompleted, handleRunStarted, handleRunRecovered, handleRunScheduled); return () => { + console.log('Disconnecting persistent queue socket for user:', user.id); disconnectQueueSocket(); }; } - }, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns, invalidateRuns]); + }, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns]); const DisplayContent = () => { switch (content) {