Merge pull request #937 from getmaxun/run-progress
feat(core): real time progress update for runs
This commit is contained in:
@@ -48,6 +48,7 @@ interface InterpreterOptions {
|
|||||||
debugMessage: (msg: string) => void,
|
debugMessage: (msg: string) => void,
|
||||||
setActionType: (type: string) => void,
|
setActionType: (type: string) => void,
|
||||||
incrementScrapeListIndex: () => void,
|
incrementScrapeListIndex: () => void,
|
||||||
|
progressUpdate: (current: number, total: number, percentage: number) => void,
|
||||||
}>
|
}>
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -84,6 +85,10 @@ export default class Interpreter extends EventEmitter {
|
|||||||
|
|
||||||
private scrapeListCounter: number = 0;
|
private scrapeListCounter: number = 0;
|
||||||
|
|
||||||
|
private totalActions: number = 0;
|
||||||
|
|
||||||
|
private executedActions: number = 0;
|
||||||
|
|
||||||
constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
|
constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
|
||||||
super();
|
super();
|
||||||
this.workflow = workflow.workflow;
|
this.workflow = workflow.workflow;
|
||||||
@@ -1596,6 +1601,17 @@ export default class Interpreter extends EventEmitter {
|
|||||||
|
|
||||||
workflowCopy.splice(actionId, 1);
|
workflowCopy.splice(actionId, 1);
|
||||||
console.log(`Action with ID ${action.id} removed from the workflow copy.`);
|
console.log(`Action with ID ${action.id} removed from the workflow copy.`);
|
||||||
|
|
||||||
|
this.executedActions++;
|
||||||
|
const percentage = Math.round((this.executedActions / this.totalActions) * 100);
|
||||||
|
|
||||||
|
if (this.options.debugChannel?.progressUpdate) {
|
||||||
|
this.options.debugChannel.progressUpdate(
|
||||||
|
this.executedActions,
|
||||||
|
this.totalActions,
|
||||||
|
percentage
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// const newSelectors = this.getPreviousSelectors(workflow, actionId);
|
// const newSelectors = this.getPreviousSelectors(workflow, actionId);
|
||||||
// const newSelectors = this.getSelectors(workflowCopy);
|
// const newSelectors = this.getSelectors(workflowCopy);
|
||||||
@@ -1686,6 +1702,13 @@ export default class Interpreter extends EventEmitter {
|
|||||||
*/
|
*/
|
||||||
this.initializedWorkflow = Preprocessor.initWorkflow(this.workflow, params);
|
this.initializedWorkflow = Preprocessor.initWorkflow(this.workflow, params);
|
||||||
|
|
||||||
|
this.totalActions = this.initializedWorkflow.length;
|
||||||
|
this.executedActions = 0;
|
||||||
|
|
||||||
|
if (this.options.debugChannel?.progressUpdate) {
|
||||||
|
this.options.debugChannel.progressUpdate(0, this.totalActions, 0);
|
||||||
|
}
|
||||||
|
|
||||||
await this.ensureScriptsLoaded(page);
|
await this.ensureScriptsLoaded(page);
|
||||||
|
|
||||||
this.stopper = () => {
|
this.stopper = () => {
|
||||||
|
|||||||
@@ -580,6 +580,13 @@ export class WorkflowInterpreter {
|
|||||||
setActionName: (name: string) => {
|
setActionName: (name: string) => {
|
||||||
this.currentActionName = name;
|
this.currentActionName = name;
|
||||||
},
|
},
|
||||||
|
progressUpdate: (current: number, total: number, percentage: number) => {
|
||||||
|
this.socket.nsp.emit('workflowProgress', {
|
||||||
|
current,
|
||||||
|
total,
|
||||||
|
percentage
|
||||||
|
});
|
||||||
|
},
|
||||||
},
|
},
|
||||||
serializableCallback: async (data: any) => {
|
serializableCallback: async (data: any) => {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -12,6 +12,45 @@ import { GenericModal } from "../ui/GenericModal";
|
|||||||
import { getUserById } from "../../api/auth";
|
import { getUserById } from "../../api/auth";
|
||||||
import { useTranslation } from "react-i18next";
|
import { useTranslation } from "react-i18next";
|
||||||
import { useTheme } from "@mui/material/styles";
|
import { useTheme } from "@mui/material/styles";
|
||||||
|
import { io, Socket } from "socket.io-client";
|
||||||
|
import { apiUrl } from "../../apiConfig";
|
||||||
|
|
||||||
|
const socketCache = new Map<string, Socket>();
|
||||||
|
const progressCallbacks = new Map<string, Set<(data: any) => void>>();
|
||||||
|
|
||||||
|
function getOrCreateSocket(browserId: string): Socket {
|
||||||
|
if (socketCache.has(browserId)) {
|
||||||
|
return socketCache.get(browserId)!;
|
||||||
|
}
|
||||||
|
|
||||||
|
const socket = io(`${apiUrl}/${browserId}`, {
|
||||||
|
transports: ["websocket"],
|
||||||
|
rejectUnauthorized: false
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on('workflowProgress', (data: any) => {
|
||||||
|
const callbacks = progressCallbacks.get(browserId);
|
||||||
|
if (callbacks) {
|
||||||
|
callbacks.forEach(cb => cb(data));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
socketCache.set(browserId, socket);
|
||||||
|
return socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
function cleanupSocketIfUnused(browserId: string) {
|
||||||
|
const callbacks = progressCallbacks.get(browserId);
|
||||||
|
|
||||||
|
if (!callbacks || callbacks.size === 0) {
|
||||||
|
const socket = socketCache.get(browserId);
|
||||||
|
if (socket) {
|
||||||
|
socket.disconnect();
|
||||||
|
socketCache.delete(browserId);
|
||||||
|
progressCallbacks.delete(browserId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
interface RunTypeChipProps {
|
interface RunTypeChipProps {
|
||||||
runByUserId?: string;
|
runByUserId?: string;
|
||||||
@@ -54,11 +93,52 @@ export const CollapsibleRow = ({ row, handleDelete, isOpen, onToggleExpanded, cu
|
|||||||
|
|
||||||
const logEndRef = useRef<HTMLDivElement | null>(null);
|
const logEndRef = useRef<HTMLDivElement | null>(null);
|
||||||
|
|
||||||
const scrollToLogBottom = () => {
|
const [workflowProgress, setWorkflowProgress] = useState<{
|
||||||
if (logEndRef.current) {
|
current: number;
|
||||||
logEndRef.current.scrollIntoView({ behavior: "smooth" });
|
total: number;
|
||||||
|
percentage: number;
|
||||||
|
} | null>(null);
|
||||||
|
|
||||||
|
// Subscribe to progress updates using module-level socket cache
|
||||||
|
useEffect(() => {
|
||||||
|
if (!row.browserId) return;
|
||||||
|
|
||||||
|
// Get or create socket (from module cache)
|
||||||
|
getOrCreateSocket(row.browserId);
|
||||||
|
|
||||||
|
// Register callback
|
||||||
|
if (!progressCallbacks.has(row.browserId)) {
|
||||||
|
progressCallbacks.set(row.browserId, new Set());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
const callback = (data: any) => {
|
||||||
|
setWorkflowProgress(data);
|
||||||
|
};
|
||||||
|
|
||||||
|
progressCallbacks.get(row.browserId)!.add(callback);
|
||||||
|
|
||||||
|
// Cleanup: remove callback and cleanup socket if no callbacks remain
|
||||||
|
return () => {
|
||||||
|
const callbacks = progressCallbacks.get(row.browserId);
|
||||||
|
if (callbacks) {
|
||||||
|
callbacks.delete(callback);
|
||||||
|
// Cleanup socket if this was the last callback
|
||||||
|
cleanupSocketIfUnused(row.browserId);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}, [row.browserId]);
|
||||||
|
|
||||||
|
// Clear progress UI when run completes and trigger socket cleanup
|
||||||
|
useEffect(() => {
|
||||||
|
if (row.status !== 'running' && row.status !== 'queued') {
|
||||||
|
setWorkflowProgress(null);
|
||||||
|
// Attempt to cleanup socket when run completes
|
||||||
|
// (will only cleanup if no other callbacks exist)
|
||||||
|
if (row.browserId) {
|
||||||
|
cleanupSocketIfUnused(row.browserId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, [row.status, row.browserId]);
|
||||||
|
|
||||||
const handleAbort = () => {
|
const handleAbort = () => {
|
||||||
abortRunHandler(row.runId, row.name, row.browserId);
|
abortRunHandler(row.runId, row.name, row.browserId);
|
||||||
@@ -67,12 +147,7 @@ export const CollapsibleRow = ({ row, handleDelete, isOpen, onToggleExpanded, cu
|
|||||||
const handleRowExpand = () => {
|
const handleRowExpand = () => {
|
||||||
const newOpen = !isOpen;
|
const newOpen = !isOpen;
|
||||||
onToggleExpanded(newOpen);
|
onToggleExpanded(newOpen);
|
||||||
//scrollToLogBottom();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// useEffect(() => {
|
|
||||||
// scrollToLogBottom();
|
|
||||||
// }, [currentLog])
|
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
const fetchUserEmail = async () => {
|
const fetchUserEmail = async () => {
|
||||||
@@ -196,7 +271,8 @@ export const CollapsibleRow = ({ row, handleDelete, isOpen, onToggleExpanded, cu
|
|||||||
<TableCell style={{ paddingBottom: 0, paddingTop: 0 }} colSpan={6}>
|
<TableCell style={{ paddingBottom: 0, paddingTop: 0 }} colSpan={6}>
|
||||||
<Collapse in={isOpen} timeout="auto" unmountOnExit>
|
<Collapse in={isOpen} timeout="auto" unmountOnExit>
|
||||||
<RunContent row={row} abortRunHandler={handleAbort} currentLog={currentLog}
|
<RunContent row={row} abortRunHandler={handleAbort} currentLog={currentLog}
|
||||||
logEndRef={logEndRef} interpretationInProgress={runningRecordingName === row.name} />
|
logEndRef={logEndRef} interpretationInProgress={runningRecordingName === row.name}
|
||||||
|
workflowProgress={workflowProgress} />
|
||||||
</Collapse>
|
</Collapse>
|
||||||
</TableCell>
|
</TableCell>
|
||||||
</TableRow>
|
</TableRow>
|
||||||
|
|||||||
@@ -30,10 +30,14 @@ interface RunContentProps {
|
|||||||
interpretationInProgress: boolean,
|
interpretationInProgress: boolean,
|
||||||
logEndRef: React.RefObject<HTMLDivElement>,
|
logEndRef: React.RefObject<HTMLDivElement>,
|
||||||
abortRunHandler: () => void,
|
abortRunHandler: () => void,
|
||||||
|
workflowProgress: {
|
||||||
|
current: number;
|
||||||
|
total: number;
|
||||||
|
percentage: number;
|
||||||
|
} | null,
|
||||||
}
|
}
|
||||||
|
|
||||||
export const RunContent = ({ row, currentLog, interpretationInProgress, logEndRef, abortRunHandler }: RunContentProps) => {
|
export const RunContent = ({ row, currentLog, interpretationInProgress, logEndRef, abortRunHandler, workflowProgress }: RunContentProps) => { const { t } = useTranslation();
|
||||||
const { t } = useTranslation();
|
|
||||||
const [tab, setTab] = React.useState<string>('output');
|
const [tab, setTab] = React.useState<string>('output');
|
||||||
const [markdownContent, setMarkdownContent] = useState<string>('');
|
const [markdownContent, setMarkdownContent] = useState<string>('');
|
||||||
const [htmlContent, setHtmlContent] = useState<string>('');
|
const [htmlContent, setHtmlContent] = useState<string>('');
|
||||||
@@ -63,6 +67,15 @@ export const RunContent = ({ row, currentLog, interpretationInProgress, logEndRe
|
|||||||
setTab(tab);
|
setTab(tab);
|
||||||
}, [interpretationInProgress]);
|
}, [interpretationInProgress]);
|
||||||
|
|
||||||
|
const getProgressMessage = (percentage: number): string => {
|
||||||
|
if (percentage === 0) return 'Initializing workflow...';
|
||||||
|
if (percentage < 25) return 'Starting execution...';
|
||||||
|
if (percentage < 50) return 'Processing actions...';
|
||||||
|
if (percentage < 75) return 'Extracting data...';
|
||||||
|
if (percentage < 100) return 'Finalizing results...';
|
||||||
|
return 'Completing...';
|
||||||
|
};
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
setMarkdownContent('');
|
setMarkdownContent('');
|
||||||
setHtmlContent('');
|
setHtmlContent('');
|
||||||
@@ -810,7 +823,20 @@ export const RunContent = ({ row, currentLog, interpretationInProgress, logEndRe
|
|||||||
{row.status === 'running' || row.status === 'queued' ? (
|
{row.status === 'running' || row.status === 'queued' ? (
|
||||||
<>
|
<>
|
||||||
<Box sx={{ display: 'flex', alignItems: 'center', mb: 2 }}>
|
<Box sx={{ display: 'flex', alignItems: 'center', mb: 2 }}>
|
||||||
<CircularProgress size={22} sx={{ marginRight: '10px' }} />
|
{workflowProgress ? (
|
||||||
|
<>
|
||||||
|
<CircularProgress
|
||||||
|
size={22}
|
||||||
|
sx={{ marginRight: '10px' }}
|
||||||
|
/>
|
||||||
|
{getProgressMessage(workflowProgress.percentage)}
|
||||||
|
</>
|
||||||
|
) : (
|
||||||
|
<>
|
||||||
|
<CircularProgress size={22} sx={{ marginRight: '10px' }} />
|
||||||
|
{t('run_content.loading')}
|
||||||
|
</>
|
||||||
|
)}
|
||||||
{t('run_content.loading')}
|
{t('run_content.loading')}
|
||||||
</Box>
|
</Box>
|
||||||
<Button color="error" onClick={abortRunHandler} sx={{ mt: 1 }}>
|
<Button color="error" onClick={abortRunHandler} sx={{ mt: 1 }}>
|
||||||
|
|||||||
Reference in New Issue
Block a user