diff --git a/server/src/server.ts b/server/src/server.ts index 1e5c5a03..0a107b88 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -160,10 +160,27 @@ app.use((req, res, next) => { next(); }); +io.of('/queued-run').on('connection', (socket) => { + const userId = socket.handshake.query.userId as string; + + if (userId) { + socket.join(`user-${userId}`); + logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`); + + socket.on('disconnect', () => { + logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`); + }); + } else { + logger.log('warn', `Client connected to queued-run namespace without userId: ${socket.id}`); + socket.disconnect(); + } +}); + setInterval(() => { processQueuedRuns(); }, 5000); + server.listen(SERVER_PORT, '0.0.0.0', async () => { try { await connectDB(); diff --git a/src/context/socket.tsx b/src/context/socket.tsx index 4d9c95e1..05a8c989 100644 --- a/src/context/socket.tsx +++ b/src/context/socket.tsx @@ -1,4 +1,4 @@ -import React, { createContext, useCallback, useContext, useMemo, useState } from 'react'; +import React, { createContext, useCallback, useContext, useState, useRef, useEffect } from 'react'; import { io, Socket } from 'socket.io-client'; import { apiUrl } from "../apiConfig"; @@ -6,12 +6,16 @@ const SERVER_ENDPOINT = apiUrl; interface SocketState { socket: Socket | null; + queueSocket: Socket | null; id: string; setId: (id: string) => void; + connectToQueueSocket: (userId: string, onRunCompleted?: (data: any) => void) => void; + disconnectQueueSocket: () => void; }; class SocketStore implements Partial { - socket = null; + socket: Socket | null = null; + queueSocket: Socket | null = null; id = ''; }; @@ -22,7 +26,9 @@ export const useSocketStore = () => useContext(socketStoreContext); export const SocketProvider = ({ children }: { children: JSX.Element }) => { const [socket, setSocket] = useState(socketStore.socket); + const [queueSocket, setQueueSocket] = useState(socketStore.queueSocket); const [id, setActiveId] = useState(socketStore.id); + const runCompletedCallbackRef = useRef<((data: any) => void) | null>(null); const setId = useCallback((id: string) => { // the socket client connection is recomputed whenever id changes -> the new browser has been initialized @@ -39,12 +45,70 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => { setActiveId(id); }, [setSocket]); + const connectToQueueSocket = useCallback((userId: string, onRunCompleted?: (data: any) => void) => { + runCompletedCallbackRef.current = onRunCompleted || null; + + const newQueueSocket = io(`${SERVER_ENDPOINT}/queued-run`, { + transports: ["websocket"], + rejectUnauthorized: false, + query: { userId } + }); + + newQueueSocket.on('connect', () => { + console.log('Queue socket connected for user:', userId); + }); + + newQueueSocket.on('connect_error', (error) => { + console.log('Queue socket connection error:', error); + }); + + newQueueSocket.on('run-completed', (completionData) => { + console.log('Run completed event received:', completionData); + if (runCompletedCallbackRef.current) { + runCompletedCallbackRef.current(completionData); + } + }); + + setQueueSocket(currentSocket => { + if (currentSocket) { + currentSocket.disconnect(); + } + return newQueueSocket; + }); + + socketStore.queueSocket = newQueueSocket; + }, []); + + const disconnectQueueSocket = useCallback(() => { + setQueueSocket(currentSocket => { + if (currentSocket) { + currentSocket.disconnect(); + } + return null; + }); + + socketStore.queueSocket = null; + runCompletedCallbackRef.current = null; + }, []); + + // Cleanup on unmount + useEffect(() => { + return () => { + if (queueSocket) { + queueSocket.disconnect(); + } + }; + }, [queueSocket]); + return ( {children} diff --git a/src/pages/MainPage.tsx b/src/pages/MainPage.tsx index ed223ba1..c71478b3 100644 --- a/src/pages/MainPage.tsx +++ b/src/pages/MainPage.tsx @@ -15,6 +15,7 @@ import { ScheduleSettings } from "../components/robot/ScheduleSettings"; import { apiUrl } from "../apiConfig"; import { useNavigate } from 'react-router-dom'; import { AuthContext } from '../context/auth'; +import { useSocketStore } from '../context/socket'; interface MainPageProps { handleEditRecording: (id: string, fileName: string) => void; @@ -54,6 +55,8 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) const { state } = useContext(AuthContext); const { user } = state; + const { connectToQueueSocket, disconnectQueueSocket } = useSocketStore(); + const abortRunHandler = (runId: string, robotName: string, browserId: string) => { notify('info', t('main_page.notifications.abort_initiated', { name: robotName })); @@ -138,50 +141,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) navigate(`/runs/${robotMetaId}/run/${runId}`); if (queued) { - console.log('Creating queue socket for queued run:', runId); - setQueuedRuns(prev => new Set([...prev, runId])); - - const queueSocket = io(`${apiUrl}/queued-run`, { - transports: ["websocket"], - rejectUnauthorized: false, - query: { userId: user?.id } - }); - - queueSocket.on('connect', () => { - console.log('Queue socket connected for user:', user?.id); - }); - - queueSocket.on('connect_error', (error) => { - console.log('Queue socket connection error:', error); - }); - - queueSocket.on('run-completed', (completionData) => { - if (completionData.runId === runId) { - setRunningRecordingName(''); - setCurrentInterpretationLog(''); - setRerenderRuns(true); - - setQueuedRuns(prev => { - const newSet = new Set(prev); - newSet.delete(runId); - return newSet; - }); - - const robotName = completionData.robotName || runningRecordingName; - - if (completionData.status === 'success') { - notify('success', t('main_page.notifications.interpretation_success', { name: robotName })); - } else { - notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); - } - - queueSocket.disconnect(); - } - }); - - setSockets(sockets => [...sockets, queueSocket]); - notify('info', `Run queued: ${runningRecordingName}`); } else { const socket = io(`${apiUrl}/${browserId}`, { @@ -245,6 +205,36 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps) return message === 'success'; } + useEffect(() => { + if (user?.id) { + const handleRunCompleted = (completionData: any) => { + setRerenderRuns(true); + + if (queuedRuns.has(completionData.runId)) { + setQueuedRuns(prev => { + const newSet = new Set(prev); + newSet.delete(completionData.runId); + return newSet; + }); + } + + const robotName = completionData.robotName || 'Unknown Robot'; + + if (completionData.status === 'success') { + notify('success', t('main_page.notifications.interpretation_success', { name: robotName })); + } else { + notify('error', t('main_page.notifications.interpretation_failed', { name: robotName })); + } + }; + + connectToQueueSocket(user.id, handleRunCompleted); + + return () => { + disconnectQueueSocket(); + }; + } + }, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns]); + const DisplayContent = () => { switch (content) { case 'robots':