Merge pull request #736 from RohitR311/client-notifs
feat: enable client-side notifications for all runs
This commit is contained in:
@@ -160,10 +160,27 @@ app.use((req, res, next) => {
|
|||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
io.of('/queued-run').on('connection', (socket) => {
|
||||||
|
const userId = socket.handshake.query.userId as string;
|
||||||
|
|
||||||
|
if (userId) {
|
||||||
|
socket.join(`user-${userId}`);
|
||||||
|
logger.log('info', `Client joined queued-run namespace for user: ${userId}, socket: ${socket.id}`);
|
||||||
|
|
||||||
|
socket.on('disconnect', () => {
|
||||||
|
logger.log('info', `Client disconnected from queued-run namespace: ${socket.id}`);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
logger.log('warn', `Client connected to queued-run namespace without userId: ${socket.id}`);
|
||||||
|
socket.disconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
processQueuedRuns();
|
processQueuedRuns();
|
||||||
}, 5000);
|
}, 5000);
|
||||||
|
|
||||||
|
|
||||||
server.listen(SERVER_PORT, '0.0.0.0', async () => {
|
server.listen(SERVER_PORT, '0.0.0.0', async () => {
|
||||||
try {
|
try {
|
||||||
await connectDB();
|
await connectDB();
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import React, { createContext, useCallback, useContext, useMemo, useState } from 'react';
|
import React, { createContext, useCallback, useContext, useState, useRef, useEffect } from 'react';
|
||||||
import { io, Socket } from 'socket.io-client';
|
import { io, Socket } from 'socket.io-client';
|
||||||
import { apiUrl } from "../apiConfig";
|
import { apiUrl } from "../apiConfig";
|
||||||
|
|
||||||
@@ -6,12 +6,16 @@ const SERVER_ENDPOINT = apiUrl;
|
|||||||
|
|
||||||
interface SocketState {
|
interface SocketState {
|
||||||
socket: Socket | null;
|
socket: Socket | null;
|
||||||
|
queueSocket: Socket | null;
|
||||||
id: string;
|
id: string;
|
||||||
setId: (id: string) => void;
|
setId: (id: string) => void;
|
||||||
|
connectToQueueSocket: (userId: string, onRunCompleted?: (data: any) => void) => void;
|
||||||
|
disconnectQueueSocket: () => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SocketStore implements Partial<SocketState> {
|
class SocketStore implements Partial<SocketState> {
|
||||||
socket = null;
|
socket: Socket | null = null;
|
||||||
|
queueSocket: Socket | null = null;
|
||||||
id = '';
|
id = '';
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -22,7 +26,9 @@ export const useSocketStore = () => useContext(socketStoreContext);
|
|||||||
|
|
||||||
export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
||||||
const [socket, setSocket] = useState<Socket | null>(socketStore.socket);
|
const [socket, setSocket] = useState<Socket | null>(socketStore.socket);
|
||||||
|
const [queueSocket, setQueueSocket] = useState<Socket | null>(socketStore.queueSocket);
|
||||||
const [id, setActiveId] = useState<string>(socketStore.id);
|
const [id, setActiveId] = useState<string>(socketStore.id);
|
||||||
|
const runCompletedCallbackRef = useRef<((data: any) => void) | null>(null);
|
||||||
|
|
||||||
const setId = useCallback((id: string) => {
|
const setId = useCallback((id: string) => {
|
||||||
// the socket client connection is recomputed whenever id changes -> the new browser has been initialized
|
// the socket client connection is recomputed whenever id changes -> the new browser has been initialized
|
||||||
@@ -39,12 +45,70 @@ export const SocketProvider = ({ children }: { children: JSX.Element }) => {
|
|||||||
setActiveId(id);
|
setActiveId(id);
|
||||||
}, [setSocket]);
|
}, [setSocket]);
|
||||||
|
|
||||||
|
const connectToQueueSocket = useCallback((userId: string, onRunCompleted?: (data: any) => void) => {
|
||||||
|
runCompletedCallbackRef.current = onRunCompleted || null;
|
||||||
|
|
||||||
|
const newQueueSocket = io(`${SERVER_ENDPOINT}/queued-run`, {
|
||||||
|
transports: ["websocket"],
|
||||||
|
rejectUnauthorized: false,
|
||||||
|
query: { userId }
|
||||||
|
});
|
||||||
|
|
||||||
|
newQueueSocket.on('connect', () => {
|
||||||
|
console.log('Queue socket connected for user:', userId);
|
||||||
|
});
|
||||||
|
|
||||||
|
newQueueSocket.on('connect_error', (error) => {
|
||||||
|
console.log('Queue socket connection error:', error);
|
||||||
|
});
|
||||||
|
|
||||||
|
newQueueSocket.on('run-completed', (completionData) => {
|
||||||
|
console.log('Run completed event received:', completionData);
|
||||||
|
if (runCompletedCallbackRef.current) {
|
||||||
|
runCompletedCallbackRef.current(completionData);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
setQueueSocket(currentSocket => {
|
||||||
|
if (currentSocket) {
|
||||||
|
currentSocket.disconnect();
|
||||||
|
}
|
||||||
|
return newQueueSocket;
|
||||||
|
});
|
||||||
|
|
||||||
|
socketStore.queueSocket = newQueueSocket;
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const disconnectQueueSocket = useCallback(() => {
|
||||||
|
setQueueSocket(currentSocket => {
|
||||||
|
if (currentSocket) {
|
||||||
|
currentSocket.disconnect();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
socketStore.queueSocket = null;
|
||||||
|
runCompletedCallbackRef.current = null;
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
// Cleanup on unmount
|
||||||
|
useEffect(() => {
|
||||||
|
return () => {
|
||||||
|
if (queueSocket) {
|
||||||
|
queueSocket.disconnect();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}, [queueSocket]);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<socketStoreContext.Provider
|
<socketStoreContext.Provider
|
||||||
value={{
|
value={{
|
||||||
socket,
|
socket,
|
||||||
|
queueSocket,
|
||||||
id,
|
id,
|
||||||
setId,
|
setId,
|
||||||
|
connectToQueueSocket,
|
||||||
|
disconnectQueueSocket,
|
||||||
}}
|
}}
|
||||||
>
|
>
|
||||||
{children}
|
{children}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import { ScheduleSettings } from "../components/robot/ScheduleSettings";
|
|||||||
import { apiUrl } from "../apiConfig";
|
import { apiUrl } from "../apiConfig";
|
||||||
import { useNavigate } from 'react-router-dom';
|
import { useNavigate } from 'react-router-dom';
|
||||||
import { AuthContext } from '../context/auth';
|
import { AuthContext } from '../context/auth';
|
||||||
|
import { useSocketStore } from '../context/socket';
|
||||||
|
|
||||||
interface MainPageProps {
|
interface MainPageProps {
|
||||||
handleEditRecording: (id: string, fileName: string) => void;
|
handleEditRecording: (id: string, fileName: string) => void;
|
||||||
@@ -54,6 +55,8 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
|
|||||||
const { state } = useContext(AuthContext);
|
const { state } = useContext(AuthContext);
|
||||||
const { user } = state;
|
const { user } = state;
|
||||||
|
|
||||||
|
const { connectToQueueSocket, disconnectQueueSocket } = useSocketStore();
|
||||||
|
|
||||||
const abortRunHandler = (runId: string, robotName: string, browserId: string) => {
|
const abortRunHandler = (runId: string, robotName: string, browserId: string) => {
|
||||||
notify('info', t('main_page.notifications.abort_initiated', { name: robotName }));
|
notify('info', t('main_page.notifications.abort_initiated', { name: robotName }));
|
||||||
|
|
||||||
@@ -138,50 +141,7 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
|
|||||||
navigate(`/runs/${robotMetaId}/run/${runId}`);
|
navigate(`/runs/${robotMetaId}/run/${runId}`);
|
||||||
|
|
||||||
if (queued) {
|
if (queued) {
|
||||||
console.log('Creating queue socket for queued run:', runId);
|
|
||||||
|
|
||||||
setQueuedRuns(prev => new Set([...prev, runId]));
|
setQueuedRuns(prev => new Set([...prev, runId]));
|
||||||
|
|
||||||
const queueSocket = io(`${apiUrl}/queued-run`, {
|
|
||||||
transports: ["websocket"],
|
|
||||||
rejectUnauthorized: false,
|
|
||||||
query: { userId: user?.id }
|
|
||||||
});
|
|
||||||
|
|
||||||
queueSocket.on('connect', () => {
|
|
||||||
console.log('Queue socket connected for user:', user?.id);
|
|
||||||
});
|
|
||||||
|
|
||||||
queueSocket.on('connect_error', (error) => {
|
|
||||||
console.log('Queue socket connection error:', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
queueSocket.on('run-completed', (completionData) => {
|
|
||||||
if (completionData.runId === runId) {
|
|
||||||
setRunningRecordingName('');
|
|
||||||
setCurrentInterpretationLog('');
|
|
||||||
setRerenderRuns(true);
|
|
||||||
|
|
||||||
setQueuedRuns(prev => {
|
|
||||||
const newSet = new Set(prev);
|
|
||||||
newSet.delete(runId);
|
|
||||||
return newSet;
|
|
||||||
});
|
|
||||||
|
|
||||||
const robotName = completionData.robotName || runningRecordingName;
|
|
||||||
|
|
||||||
if (completionData.status === 'success') {
|
|
||||||
notify('success', t('main_page.notifications.interpretation_success', { name: robotName }));
|
|
||||||
} else {
|
|
||||||
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
|
|
||||||
}
|
|
||||||
|
|
||||||
queueSocket.disconnect();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
setSockets(sockets => [...sockets, queueSocket]);
|
|
||||||
|
|
||||||
notify('info', `Run queued: ${runningRecordingName}`);
|
notify('info', `Run queued: ${runningRecordingName}`);
|
||||||
} else {
|
} else {
|
||||||
const socket = io(`${apiUrl}/${browserId}`, {
|
const socket = io(`${apiUrl}/${browserId}`, {
|
||||||
@@ -245,6 +205,36 @@ export const MainPage = ({ handleEditRecording, initialContent }: MainPageProps)
|
|||||||
return message === 'success';
|
return message === 'success';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
if (user?.id) {
|
||||||
|
const handleRunCompleted = (completionData: any) => {
|
||||||
|
setRerenderRuns(true);
|
||||||
|
|
||||||
|
if (queuedRuns.has(completionData.runId)) {
|
||||||
|
setQueuedRuns(prev => {
|
||||||
|
const newSet = new Set(prev);
|
||||||
|
newSet.delete(completionData.runId);
|
||||||
|
return newSet;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const robotName = completionData.robotName || 'Unknown Robot';
|
||||||
|
|
||||||
|
if (completionData.status === 'success') {
|
||||||
|
notify('success', t('main_page.notifications.interpretation_success', { name: robotName }));
|
||||||
|
} else {
|
||||||
|
notify('error', t('main_page.notifications.interpretation_failed', { name: robotName }));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
connectToQueueSocket(user.id, handleRunCompleted);
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
disconnectQueueSocket();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}, [user?.id, connectToQueueSocket, disconnectQueueSocket, t, setRerenderRuns, queuedRuns, setQueuedRuns]);
|
||||||
|
|
||||||
const DisplayContent = () => {
|
const DisplayContent = () => {
|
||||||
switch (content) {
|
switch (content) {
|
||||||
case 'robots':
|
case 'robots':
|
||||||
|
|||||||
Reference in New Issue
Block a user