fix: socket conn handling
This commit is contained in:
@@ -689,6 +689,10 @@ export class WorkflowInterpreter {
|
||||
}
|
||||
} finally {
|
||||
this.persistenceInProgress = false;
|
||||
|
||||
if (this.persistenceBuffer.length > 0 && !this.persistenceTimer) {
|
||||
this.scheduleBatchFlush();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ interface SocketState {
|
||||
queueSocket: Socket | null;
|
||||
id: string;
|
||||
setId: (id: string) => void;
|
||||
connectToQueueSocket: (userId: string, onRunCompleted?: (data: any) => void) => void;
|
||||
connectToQueueSocket: (userId: string, onRunCompleted?: (data: any) => void, onRunStarted?: (data: any) => void, onRunRecovered?: (data: any) => void, onRunScheduled?: (data: any) => void) => void;
|
||||
disconnectQueueSocket: () => void;
|
||||
};
|
||||
|
||||
@@ -29,6 +29,9 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
||||
const [queueSocket, setQueueSocket] = useState<Socket | null>(socketStore.queueSocket);
|
||||
const [id, setActiveId] = useState<string>(socketStore.id);
|
||||
const runCompletedCallbackRef = useRef<((data: any) => void) | null>(null);
|
||||
const runStartedCallbackRef = useRef<((data: any) => void) | null>(null);
|
||||
const runRecoveredCallbackRef = useRef<((data: any) => void) | null>(null);
|
||||
const runScheduledCallbackRef = useRef<((data: any) => void) | null>(null);
|
||||
|
||||
const setId = useCallback((id: string) => {
|
||||
// the socket client connection is recomputed whenever id changes -> the new browser has been initialized
|
||||
@@ -45,8 +48,11 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
||||
setActiveId(id);
|
||||
}, [setSocket]);
|
||||
|
||||
const connectToQueueSocket = useCallback((userId: string, onRunCompleted?: (data: any) => void) => {
|
||||
const connectToQueueSocket = useCallback((userId: string, onRunCompleted?: (data: any) => void, onRunStarted?: (data: any) => void, onRunRecovered?: (data: any) => void, onRunScheduled?: (data: any) => void) => {
|
||||
runCompletedCallbackRef.current = onRunCompleted || null;
|
||||
runStartedCallbackRef.current = onRunStarted || null;
|
||||
runRecoveredCallbackRef.current = onRunRecovered || null;
|
||||
runScheduledCallbackRef.current = onRunScheduled || null;
|
||||
|
||||
const newQueueSocket = io(`${SERVER_ENDPOINT}/queued-run`, {
|
||||
transports: ["websocket"],
|
||||
@@ -69,6 +75,27 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
||||
}
|
||||
});
|
||||
|
||||
newQueueSocket.on('run-started', (startedData) => {
|
||||
console.log('Run started event received:', startedData);
|
||||
if (runStartedCallbackRef.current) {
|
||||
runStartedCallbackRef.current(startedData);
|
||||
}
|
||||
});
|
||||
|
||||
newQueueSocket.on('run-recovered', (recoveredData) => {
|
||||
console.log('Run recovered event received:', recoveredData);
|
||||
if (runRecoveredCallbackRef.current) {
|
||||
runRecoveredCallbackRef.current(recoveredData);
|
||||
}
|
||||
});
|
||||
|
||||
newQueueSocket.on('run-scheduled', (scheduledData) => {
|
||||
console.log('Run scheduled event received:', scheduledData);
|
||||
if (runScheduledCallbackRef.current) {
|
||||
runScheduledCallbackRef.current(scheduledData);
|
||||
}
|
||||
});
|
||||
|
||||
setQueueSocket(currentSocket => {
|
||||
if (currentSocket) {
|
||||
currentSocket.disconnect();
|
||||
@@ -89,6 +116,8 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
||||
|
||||
socketStore.queueSocket = null;
|
||||
runCompletedCallbackRef.current = null;
|
||||
runRecoveredCallbackRef.current = null;
|
||||
runScheduledCallbackRef.current = null;
|
||||
}, []);
|
||||
|
||||
// Cleanup on unmount
|
||||
|
||||
@@ -215,6 +215,14 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
|
||||
|
||||
useEffect(() => {
|
||||
if (user?.id) {
|
||||
const handleRunStarted = (startedData: any) => {
|
||||
setRerenderRuns(true);
|
||||
invalidateRuns();
|
||||
|
||||
const robotName = startedData.robotName || 'Unknown Robot';
|
||||
notify('info', t('main_page.notifications.run_started', { name: robotName }));
|
||||
};
|
||||
|
||||
const handleRunCompleted = (completionData: any) => {
|
||||
setRerenderRuns(true);
|
||||
invalidateRuns();
|
||||
@@ -235,14 +243,36 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
|
||||
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
|
||||
}
|
||||
};
|
||||
|
||||
const handleRunRecovered = (recoveredData: any) => {
|
||||
setRerenderRuns(true);
|
||||
invalidateRuns();
|
||||
|
||||
if (queuedRuns.has(recoveredData.runId)) {
|
||||
setQueuedRuns(prev => {
|
||||
const newSet = new Set(prev);
|
||||
newSet.delete(recoveredData.runId);
|
||||
return newSet;
|
||||
});
|
||||
}
|
||||
|
||||
const robotName = recoveredData.robotName || 'Unknown Robot';
|
||||
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
|
||||
};
|
||||
|
||||
const handleRunScheduled = (scheduledData: any) => {
|
||||
setRerenderRuns(true);
|
||||
invalidateRuns();
|
||||
};
|
||||
|
||||
connectToQueueSocket(user.id, handleRunCompleted);
|
||||
connectToQueueSocket(user.id, handleRunCompleted, handleRunStarted, handleRunRecovered, handleRunScheduled);
|
||||
|
||||
return () => {
|
||||
console.log('Disconnecting persistent queue socket for user:', user.id);
|
||||
disconnectQueueSocket();
|
||||
};
|
||||
}
|
||||
}, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns, invalidateRuns]);
|
||||
}, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns]);
|
||||
|
||||
const DisplayContent = () => {
|
||||
switch (content) {
|
||||
|
||||
Reference in New Issue
Block a user