From 7a283ddf2c1f860308fc858ee0469b1ccbd770cc Mon Sep 17 00:00:00 2001 From: Kerem Yilmaz Date: Mon, 30 Sep 2024 11:36:24 -0700 Subject: [PATCH] add workflow streaming (#892) --- .../src/routes/tasks/detail/TaskActions.tsx | 35 +++-- skyvern-frontend/src/routes/tasks/types.ts | 18 +++ .../src/routes/workflows/WorkflowRun.tsx | 133 +++++++++++++++++- 3 files changed, 168 insertions(+), 18 deletions(-) diff --git a/skyvern-frontend/src/routes/tasks/detail/TaskActions.tsx b/skyvern-frontend/src/routes/tasks/detail/TaskActions.tsx index d835b5c7..3be2c729 100644 --- a/skyvern-frontend/src/routes/tasks/detail/TaskActions.tsx +++ b/skyvern-frontend/src/routes/tasks/detail/TaskActions.tsx @@ -15,6 +15,7 @@ import { useCredentialGetter } from "@/hooks/useCredentialGetter"; import { Skeleton } from "@/components/ui/skeleton"; import { toast } from "@/components/ui/use-toast"; import { envCredential } from "@/util/env"; +import { statusIsNotFinalized, statusIsRunningOrQueued } from "../types"; type StreamMessage = { task_id: string; @@ -51,18 +52,18 @@ function TaskActions() { return client.get(`/tasks/${taskId}`).then((response) => response.data); }, refetchInterval: (query) => { - if ( - query.state.data?.status === Status.Running || - query.state.data?.status === Status.Queued - ) { + if (!query.state.data) { + return false; + } + if (statusIsNotFinalized(query.state.data)) { return 5000; } return false; }, placeholderData: keepPreviousData, }); - const taskIsRunningOrQueued = - task?.status === Status.Running || task?.status === Status.Queued; + const taskIsNotFinalized = task && statusIsNotFinalized(task); + const taskIsRunningOrQueued = task && statusIsRunningOrQueued(task); useEffect(() => { if (!taskIsRunningOrQueued) { @@ -135,10 +136,10 @@ function TaskActions() { }, [credentialGetter, taskId, taskIsRunningOrQueued]); useEffect(() => { - if (!taskIsLoading && taskIsRunningOrQueued) { + if (!taskIsLoading && taskIsNotFinalized) { setSelectedAction("stream"); } - }, [taskIsLoading, taskIsRunningOrQueued]); + }, [taskIsLoading, taskIsNotFinalized]); const { data: steps, isLoading: stepsIsLoading } = useQuery< Array @@ -151,8 +152,8 @@ function TaskActions() { .then((response) => response.data); }, enabled: !!task, - refetchOnWindowFocus: taskIsRunningOrQueued, - refetchInterval: taskIsRunningOrQueued ? 5000 : false, + refetchOnWindowFocus: taskIsNotFinalized, + refetchInterval: taskIsNotFinalized ? 5000 : false, placeholderData: keepPreviousData, }); @@ -198,6 +199,14 @@ function TaskActions() { actions?.[actions.length - selectedAction - 1]; function getStream() { + if (task?.status === Status.Created) { + return ( +
+ Task has been created. + Stream will start when the task is running. +
+ ); + } if (task?.status === Status.Queued) { return (
@@ -242,13 +251,13 @@ function TaskActions() { activeIndex={selectedAction} data={actions ?? []} onActiveIndexChange={setSelectedAction} - showStreamOption={taskIsRunningOrQueued} + showStreamOption={Boolean(taskIsNotFinalized)} onNext={() => { if (!actions) { return; } setSelectedAction((prev) => { - if (taskIsRunningOrQueued) { + if (taskIsNotFinalized) { if (actions.length === 0) { return "stream"; } @@ -271,7 +280,7 @@ function TaskActions() { return; } setSelectedAction((prev) => { - if (taskIsRunningOrQueued) { + if (taskIsNotFinalized) { if (actions.length === 0) { return "stream"; } diff --git a/skyvern-frontend/src/routes/tasks/types.ts b/skyvern-frontend/src/routes/tasks/types.ts index 21748027..005c1528 100644 --- a/skyvern-frontend/src/routes/tasks/types.ts +++ b/skyvern-frontend/src/routes/tasks/types.ts @@ -1,3 +1,5 @@ +import { Status } from "@/api/types"; + export const sampleCases = [ "blank", "geico", @@ -8,3 +10,19 @@ export const sampleCases = [ ] as const; export type SampleCase = (typeof sampleCases)[number]; + +export function statusIsNotFinalized({ status }: { status: Status }): boolean { + return ( + status === Status.Created || + status === Status.Queued || + status === Status.Running + ); +} + +export function statusIsRunningOrQueued({ + status, +}: { + status: Status; +}): boolean { + return status === Status.Queued || status === Status.Running; +} diff --git a/skyvern-frontend/src/routes/workflows/WorkflowRun.tsx b/skyvern-frontend/src/routes/workflows/WorkflowRun.tsx index 74f32746..e9a854fd 100644 --- a/skyvern-frontend/src/routes/workflows/WorkflowRun.tsx +++ b/skyvern-frontend/src/routes/workflows/WorkflowRun.tsx @@ -32,12 +32,27 @@ import { keepPreviousData, useQuery } from "@tanstack/react-query"; import { useNavigate, useParams, useSearchParams } from "react-router-dom"; import { TaskActions } from "../tasks/list/TaskActions"; import { TaskListSkeletonRows } from "../tasks/list/TaskListSkeletonRows"; +import { useEffect, useState } from "react"; +import { statusIsNotFinalized, statusIsRunningOrQueued } from "../tasks/types"; +import { envCredential } from "@/util/env"; +import { toast } from "@/components/ui/use-toast"; + +type StreamMessage = { + task_id: string; + status: string; + screenshot?: string; +}; + +let socket: WebSocket | null = null; + +const wssBaseUrl = import.meta.env.VITE_WSS_BASE_URL; function WorkflowRun() { const [searchParams, setSearchParams] = useSearchParams(); const page = searchParams.get("page") ? Number(searchParams.get("page")) : 1; const { workflowRunId, workflowPermanentId } = useParams(); const credentialGetter = useCredentialGetter(); + const [streamImgSrc, setStreamImgSrc] = useState(""); const navigate = useNavigate(); const { data: workflowRun, isLoading: workflowRunIsLoading } = useQuery({ @@ -49,11 +64,10 @@ function WorkflowRun() { .then((response) => response.data); }, refetchInterval: (query) => { - if ( - query.state.data?.status === Status.Running || - query.state.data?.status === Status.Queued || - query.state.data?.status === Status.Created - ) { + if (!query.state.data) { + return false; + } + if (statusIsNotFinalized(query.state.data)) { return 5000; } return false; @@ -83,6 +97,114 @@ function WorkflowRun() { refetchOnMount: workflowRun?.status === Status.Running, }); + const workflowRunIsRunningOrQueued = + workflowRun && statusIsRunningOrQueued(workflowRun); + + useEffect(() => { + if (!workflowRunIsRunningOrQueued) { + return; + } + + async function run() { + // Create WebSocket connection. + let credential = null; + if (credentialGetter) { + const token = await credentialGetter(); + credential = `?token=Bearer ${token}`; + } else { + credential = `?apikey=${envCredential}`; + } + if (socket) { + socket.close(); + } + socket = new WebSocket( + `${wssBaseUrl}/stream/workflow_runs/${workflowRunId}${credential}`, + ); + // Listen for messages + socket.addEventListener("message", (event) => { + try { + const message: StreamMessage = JSON.parse(event.data); + if (message.screenshot) { + setStreamImgSrc(message.screenshot); + } + if ( + message.status === "completed" || + message.status === "failed" || + message.status === "terminated" + ) { + socket?.close(); + if ( + message.status === "failed" || + message.status === "terminated" + ) { + toast({ + title: "Run Failed", + description: "The workflow run has failed.", + variant: "destructive", + }); + } else if (message.status === "completed") { + toast({ + title: "Run Completed", + description: "The workflow run has been completed.", + variant: "success", + }); + } + } + } catch (e) { + console.error("Failed to parse message", e); + } + }); + + socket.addEventListener("close", () => { + socket = null; + }); + } + run(); + + return () => { + if (socket) { + socket.close(); + socket = null; + } + }; + }, [credentialGetter, workflowRunId, workflowRunIsRunningOrQueued]); + + function getStream() { + if (workflowRun?.status === Status.Created) { + return ( +
+ Workflow has been created. + Stream will start when the workflow is running. +
+ ); + } + if (workflowRun?.status === Status.Queued) { + return ( +
+ Your workflow run is queued. + Stream will start when the workflow is running. +
+ ); + } + + if (workflowRun?.status === Status.Running && streamImgSrc.length === 0) { + return ( +
+ Starting the stream... +
+ ); + } + + if (workflowRun?.status === Status.Running && streamImgSrc.length > 0) { + return ( +
+ +
+ ); + } + return null; + } + function handleNavigate(event: React.MouseEvent, id: string) { if (event.ctrlKey || event.metaKey) { window.open( @@ -119,6 +241,7 @@ function WorkflowRun() { Rerun Workflow + {getStream()}

Tasks