add workflow streaming (#892)

This commit is contained in:
Kerem Yilmaz
2024-09-30 11:36:24 -07:00
committed by GitHub
parent 5f4089edf5
commit 7a283ddf2c
3 changed files with 168 additions and 18 deletions

View File

@@ -15,6 +15,7 @@ import { useCredentialGetter } from "@/hooks/useCredentialGetter";
import { Skeleton } from "@/components/ui/skeleton"; import { Skeleton } from "@/components/ui/skeleton";
import { toast } from "@/components/ui/use-toast"; import { toast } from "@/components/ui/use-toast";
import { envCredential } from "@/util/env"; import { envCredential } from "@/util/env";
import { statusIsNotFinalized, statusIsRunningOrQueued } from "../types";
type StreamMessage = { type StreamMessage = {
task_id: string; task_id: string;
@@ -51,18 +52,18 @@ function TaskActions() {
return client.get(`/tasks/${taskId}`).then((response) => response.data); return client.get(`/tasks/${taskId}`).then((response) => response.data);
}, },
refetchInterval: (query) => { refetchInterval: (query) => {
if ( if (!query.state.data) {
query.state.data?.status === Status.Running || return false;
query.state.data?.status === Status.Queued }
) { if (statusIsNotFinalized(query.state.data)) {
return 5000; return 5000;
} }
return false; return false;
}, },
placeholderData: keepPreviousData, placeholderData: keepPreviousData,
}); });
const taskIsRunningOrQueued = const taskIsNotFinalized = task && statusIsNotFinalized(task);
task?.status === Status.Running || task?.status === Status.Queued; const taskIsRunningOrQueued = task && statusIsRunningOrQueued(task);
useEffect(() => { useEffect(() => {
if (!taskIsRunningOrQueued) { if (!taskIsRunningOrQueued) {
@@ -135,10 +136,10 @@ function TaskActions() {
}, [credentialGetter, taskId, taskIsRunningOrQueued]); }, [credentialGetter, taskId, taskIsRunningOrQueued]);
useEffect(() => { useEffect(() => {
if (!taskIsLoading && taskIsRunningOrQueued) { if (!taskIsLoading && taskIsNotFinalized) {
setSelectedAction("stream"); setSelectedAction("stream");
} }
}, [taskIsLoading, taskIsRunningOrQueued]); }, [taskIsLoading, taskIsNotFinalized]);
const { data: steps, isLoading: stepsIsLoading } = useQuery< const { data: steps, isLoading: stepsIsLoading } = useQuery<
Array<StepApiResponse> Array<StepApiResponse>
@@ -151,8 +152,8 @@ function TaskActions() {
.then((response) => response.data); .then((response) => response.data);
}, },
enabled: !!task, enabled: !!task,
refetchOnWindowFocus: taskIsRunningOrQueued, refetchOnWindowFocus: taskIsNotFinalized,
refetchInterval: taskIsRunningOrQueued ? 5000 : false, refetchInterval: taskIsNotFinalized ? 5000 : false,
placeholderData: keepPreviousData, placeholderData: keepPreviousData,
}); });
@@ -198,6 +199,14 @@ function TaskActions() {
actions?.[actions.length - selectedAction - 1]; actions?.[actions.length - selectedAction - 1];
function getStream() { function getStream() {
if (task?.status === Status.Created) {
return (
<div className="flex h-full w-full flex-col items-center justify-center gap-4 bg-slate-900 text-lg">
<span>Task has been created.</span>
<span>Stream will start when the task is running.</span>
</div>
);
}
if (task?.status === Status.Queued) { if (task?.status === Status.Queued) {
return ( return (
<div className="flex h-full w-full flex-col items-center justify-center gap-4 bg-slate-900 text-lg"> <div className="flex h-full w-full flex-col items-center justify-center gap-4 bg-slate-900 text-lg">
@@ -242,13 +251,13 @@ function TaskActions() {
activeIndex={selectedAction} activeIndex={selectedAction}
data={actions ?? []} data={actions ?? []}
onActiveIndexChange={setSelectedAction} onActiveIndexChange={setSelectedAction}
showStreamOption={taskIsRunningOrQueued} showStreamOption={Boolean(taskIsNotFinalized)}
onNext={() => { onNext={() => {
if (!actions) { if (!actions) {
return; return;
} }
setSelectedAction((prev) => { setSelectedAction((prev) => {
if (taskIsRunningOrQueued) { if (taskIsNotFinalized) {
if (actions.length === 0) { if (actions.length === 0) {
return "stream"; return "stream";
} }
@@ -271,7 +280,7 @@ function TaskActions() {
return; return;
} }
setSelectedAction((prev) => { setSelectedAction((prev) => {
if (taskIsRunningOrQueued) { if (taskIsNotFinalized) {
if (actions.length === 0) { if (actions.length === 0) {
return "stream"; return "stream";
} }

View File

@@ -1,3 +1,5 @@
import { Status } from "@/api/types";
export const sampleCases = [ export const sampleCases = [
"blank", "blank",
"geico", "geico",
@@ -8,3 +10,19 @@ export const sampleCases = [
] as const; ] as const;
export type SampleCase = (typeof sampleCases)[number]; export type SampleCase = (typeof sampleCases)[number];
export function statusIsNotFinalized({ status }: { status: Status }): boolean {
return (
status === Status.Created ||
status === Status.Queued ||
status === Status.Running
);
}
export function statusIsRunningOrQueued({
status,
}: {
status: Status;
}): boolean {
return status === Status.Queued || status === Status.Running;
}

View File

@@ -32,12 +32,27 @@ import { keepPreviousData, useQuery } from "@tanstack/react-query";
import { useNavigate, useParams, useSearchParams } from "react-router-dom"; import { useNavigate, useParams, useSearchParams } from "react-router-dom";
import { TaskActions } from "../tasks/list/TaskActions"; import { TaskActions } from "../tasks/list/TaskActions";
import { TaskListSkeletonRows } from "../tasks/list/TaskListSkeletonRows"; import { TaskListSkeletonRows } from "../tasks/list/TaskListSkeletonRows";
import { useEffect, useState } from "react";
import { statusIsNotFinalized, statusIsRunningOrQueued } from "../tasks/types";
import { envCredential } from "@/util/env";
import { toast } from "@/components/ui/use-toast";
type StreamMessage = {
task_id: string;
status: string;
screenshot?: string;
};
let socket: WebSocket | null = null;
const wssBaseUrl = import.meta.env.VITE_WSS_BASE_URL;
function WorkflowRun() { function WorkflowRun() {
const [searchParams, setSearchParams] = useSearchParams(); const [searchParams, setSearchParams] = useSearchParams();
const page = searchParams.get("page") ? Number(searchParams.get("page")) : 1; const page = searchParams.get("page") ? Number(searchParams.get("page")) : 1;
const { workflowRunId, workflowPermanentId } = useParams(); const { workflowRunId, workflowPermanentId } = useParams();
const credentialGetter = useCredentialGetter(); const credentialGetter = useCredentialGetter();
const [streamImgSrc, setStreamImgSrc] = useState<string>("");
const navigate = useNavigate(); const navigate = useNavigate();
const { data: workflowRun, isLoading: workflowRunIsLoading } = const { data: workflowRun, isLoading: workflowRunIsLoading } =
useQuery<WorkflowRunStatusApiResponse>({ useQuery<WorkflowRunStatusApiResponse>({
@@ -49,11 +64,10 @@ function WorkflowRun() {
.then((response) => response.data); .then((response) => response.data);
}, },
refetchInterval: (query) => { refetchInterval: (query) => {
if ( if (!query.state.data) {
query.state.data?.status === Status.Running || return false;
query.state.data?.status === Status.Queued || }
query.state.data?.status === Status.Created if (statusIsNotFinalized(query.state.data)) {
) {
return 5000; return 5000;
} }
return false; return false;
@@ -83,6 +97,114 @@ function WorkflowRun() {
refetchOnMount: workflowRun?.status === Status.Running, refetchOnMount: workflowRun?.status === Status.Running,
}); });
const workflowRunIsRunningOrQueued =
workflowRun && statusIsRunningOrQueued(workflowRun);
useEffect(() => {
if (!workflowRunIsRunningOrQueued) {
return;
}
async function run() {
// Create WebSocket connection.
let credential = null;
if (credentialGetter) {
const token = await credentialGetter();
credential = `?token=Bearer ${token}`;
} else {
credential = `?apikey=${envCredential}`;
}
if (socket) {
socket.close();
}
socket = new WebSocket(
`${wssBaseUrl}/stream/workflow_runs/${workflowRunId}${credential}`,
);
// Listen for messages
socket.addEventListener("message", (event) => {
try {
const message: StreamMessage = JSON.parse(event.data);
if (message.screenshot) {
setStreamImgSrc(message.screenshot);
}
if (
message.status === "completed" ||
message.status === "failed" ||
message.status === "terminated"
) {
socket?.close();
if (
message.status === "failed" ||
message.status === "terminated"
) {
toast({
title: "Run Failed",
description: "The workflow run has failed.",
variant: "destructive",
});
} else if (message.status === "completed") {
toast({
title: "Run Completed",
description: "The workflow run has been completed.",
variant: "success",
});
}
}
} catch (e) {
console.error("Failed to parse message", e);
}
});
socket.addEventListener("close", () => {
socket = null;
});
}
run();
return () => {
if (socket) {
socket.close();
socket = null;
}
};
}, [credentialGetter, workflowRunId, workflowRunIsRunningOrQueued]);
function getStream() {
if (workflowRun?.status === Status.Created) {
return (
<div className="flex h-full w-full flex-col items-center justify-center gap-8 bg-slate-900 py-8 text-lg">
<span>Workflow has been created.</span>
<span>Stream will start when the workflow is running.</span>
</div>
);
}
if (workflowRun?.status === Status.Queued) {
return (
<div className="flex h-full w-full flex-col items-center justify-center gap-8 bg-slate-900 py-8 text-lg">
<span>Your workflow run is queued.</span>
<span>Stream will start when the workflow is running.</span>
</div>
);
}
if (workflowRun?.status === Status.Running && streamImgSrc.length === 0) {
return (
<div className="flex h-full w-full items-center justify-center bg-slate-900 py-8 text-lg">
Starting the stream...
</div>
);
}
if (workflowRun?.status === Status.Running && streamImgSrc.length > 0) {
return (
<div className="h-full w-full">
<img src={`data:image/png;base64,${streamImgSrc}`} />
</div>
);
}
return null;
}
function handleNavigate(event: React.MouseEvent, id: string) { function handleNavigate(event: React.MouseEvent, id: string) {
if (event.ctrlKey || event.metaKey) { if (event.ctrlKey || event.metaKey) {
window.open( window.open(
@@ -119,6 +241,7 @@ function WorkflowRun() {
Rerun Workflow Rerun Workflow
</Button> </Button>
</header> </header>
{getStream()}
<div className="space-y-4"> <div className="space-y-4">
<header> <header>
<h2 className="text-lg font-semibold">Tasks</h2> <h2 className="text-lg font-semibold">Tasks</h2>