diff --git a/skyvern-frontend/src/api/sse.ts b/skyvern-frontend/src/api/sse.ts index 636df9b6..531c4156 100644 --- a/skyvern-frontend/src/api/sse.ts +++ b/skyvern-frontend/src/api/sse.ts @@ -2,19 +2,65 @@ import { fetchEventSource } from "@microsoft/fetch-event-source"; import type { CredentialGetter } from "@/api/AxiosClient"; import { getRuntimeApiKey, runsApiBaseUrl } from "@/util/env"; -export type SseJsonPayload = Record; +export type SseMessageHandler = (payload: T, eventName: string) => boolean; -type SseClient = { - post: (path: string, body: unknown) => Promise; +type SseStreamingOptions = { + signal?: AbortSignal; }; -export async function fetchJsonSse( +type SseClient = { + postStreaming: ( + path: string, + body: unknown, + onMessage: SseMessageHandler, + options?: SseStreamingOptions, + ) => Promise; +}; + +export async function fetchStreamingSse( input: RequestInfo | URL, init: RequestInit, -): Promise { + onMessage: SseMessageHandler, + options?: SseStreamingOptions, +): Promise { const controller = new AbortController(); + const externalSignal = options?.signal; + let settled = false; + const resolveOnce = () => { + if (!settled) { + settled = true; + return true; + } + return false; + }; + const onExternalAbort = () => { + controller.abort(); + }; + if (externalSignal) { + if (externalSignal.aborted) { + controller.abort(); + return; + } + externalSignal.addEventListener("abort", onExternalAbort, { once: true }); + } try { - const parsedPayload = await new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { + const safeResolve = () => { + if (resolveOnce()) { + resolve(); + } + }; + const safeReject = (error: unknown) => { + if (controller.signal.aborted) { + safeResolve(); + return; + } + if (!settled) { + settled = true; + reject(error); + } + }; + fetchEventSource(input instanceof URL ? input.toString() : input, { method: init.method, headers: init.headers as Record, @@ -26,25 +72,28 @@ export async function fetchJsonSse( } try { const payload = JSON.parse(event.data) as T; - resolve(payload); + if (onMessage(payload, event.event)) { + safeResolve(); + } } catch (error) { - reject(error); + safeReject(error); } }, onerror: (error) => { - reject(error); + safeReject(error); }, onopen: async (response) => { if (!response.ok) { const errorText = await response.text(); - reject(new Error(errorText || "Failed to send request.")); + safeReject(new Error(errorText || "Failed to send request.")); } }, - }).catch(reject); + }).catch(safeReject); }); - - return parsedPayload; } finally { + if (externalSignal) { + externalSignal.removeEventListener("abort", onExternalAbort); + } controller.abort(); } } @@ -73,14 +122,21 @@ export async function getSseClient( } return { - post: (path: string, body: unknown) => { - return fetchJsonSse( + postStreaming: ( + path: string, + body: unknown, + onMessage: SseMessageHandler, + options?: SseStreamingOptions, + ) => { + return fetchStreamingSse( `${runsApiBaseUrl.replace(/\/$/, "")}/${path.replace(/^\//, "")}`, { method: "POST", headers: requestHeaders, body: JSON.stringify(body), }, + onMessage, + options, ); }, }; diff --git a/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotButton.tsx b/skyvern-frontend/src/routes/workflows/copilot/WorkflowCopilotButton.tsx similarity index 100% rename from skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotButton.tsx rename to skyvern-frontend/src/routes/workflows/copilot/WorkflowCopilotButton.tsx diff --git a/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx b/skyvern-frontend/src/routes/workflows/copilot/WorkflowCopilotChat.tsx similarity index 77% rename from skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx rename to skyvern-frontend/src/routes/workflows/copilot/WorkflowCopilotChat.tsx index f0d9fd24..06eacf46 100644 --- a/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx +++ b/skyvern-frontend/src/routes/workflows/copilot/WorkflowCopilotChat.tsx @@ -9,14 +9,26 @@ import { useWorkflowHasChangesStore } from "@/store/WorkflowHasChangesStore"; import { WorkflowCreateYAMLRequest } from "@/routes/workflows/types/workflowYamlTypes"; import { toast } from "@/components/ui/use-toast"; import { getSseClient } from "@/api/sse"; +import { + WorkflowCopilotChatHistoryResponse, + WorkflowCopilotProcessingUpdate, + WorkflowCopilotStreamError, + WorkflowCopilotStreamResponse, + WorkflowCopilotChatSender, +} from "./workflowCopilotTypes"; interface ChatMessage { id: string; - sender: "ai" | "user"; + sender: WorkflowCopilotChatSender; content: string; timestamp?: string; } +type WorkflowCopilotSsePayload = + | WorkflowCopilotProcessingUpdate + | WorkflowCopilotStreamResponse + | WorkflowCopilotStreamError; + const formatChatTimestamp = (value: string) => { let normalizedValue = value.replace(/\.(\d{3})\d*/, ".$1"); if (!normalizedValue.endsWith("Z")) { @@ -39,7 +51,9 @@ const MessageItem = memo(({ message }: { message: ChatMessage }) => { {message.sender === "ai" ? "AI" : "U"}
-

{message.content}

+

+ {message.content} +

{message.timestamp ? ( {formatChatTimestamp(message.timestamp)} @@ -111,7 +125,10 @@ export function WorkflowCopilotChat({ const [messages, setMessages] = useState([]); const [inputValue, setInputValue] = useState(""); const [isLoading, setIsLoading] = useState(false); + const [processingStatus, setProcessingStatus] = useState(""); const [isLoadingHistory, setIsLoadingHistory] = useState(false); + const streamingAbortController = useRef(null); + const pendingMessageId = useRef(null); const [workflowCopilotChatId, setWorkflowCopilotChatId] = useState< string | null >(null); @@ -143,6 +160,7 @@ export function WorkflowCopilotChat({ const credentialGetter = useCredentialGetter(); const { workflowRunId, workflowPermanentId } = useParams(); const messagesEndRef = useRef(null); + const textareaRef = useRef(null); const { getSaveData } = useWorkflowHasChangesStore(); const hasInitializedPosition = useRef(false); const hasScrolledOnLoad = useRef(false); @@ -151,6 +169,18 @@ export function WorkflowCopilotChat({ messagesEndRef.current?.scrollIntoView({ behavior }); }; + const adjustTextareaHeight = () => { + const textarea = textareaRef.current; + if (!textarea) return; + + textarea.style.height = "auto"; + textarea.style.height = `${Math.min(textarea.scrollHeight, 150)}px`; + }; + + useEffect(() => { + adjustTextareaHeight(); + }, [inputValue]); + const handleNewChat = () => { setMessages([]); setWorkflowCopilotChatId(null); @@ -194,16 +224,12 @@ export function WorkflowCopilotChat({ hasScrolledOnLoad.current = false; try { const client = await getClient(credentialGetter, "sans-api-v1"); - const response = await client.get<{ - workflow_copilot_chat_id: string | null; - chat_history: Array<{ - sender: "ai" | "user"; - content: string; - created_at: string; - }>; - }>("/workflow/copilot/chat-history", { - params: { workflow_permanent_id: workflowPermanentId }, - }); + const response = await client.get( + "/workflow/copilot/chat-history", + { + params: { workflow_permanent_id: workflowPermanentId }, + }, + ); if (!isMounted) return; @@ -233,6 +259,33 @@ export function WorkflowCopilotChat({ }; }, [credentialGetter, workflowPermanentId]); + useEffect(() => { + const handleKeyDown = (event: KeyboardEvent) => { + if (event.key !== "Escape" || !isOpen || !isLoading) { + return; + } + cancelSend(); + }; + + window.addEventListener("keydown", handleKeyDown); + return () => { + window.removeEventListener("keydown", handleKeyDown); + }; + }, [isLoading, isOpen]); + + const cancelSend = async () => { + if (!streamingAbortController.current) return; + + if (pendingMessageId.current) { + const messageId = pendingMessageId.current; + pendingMessageId.current = null; + setMessages((prev) => prev.filter((message) => message.id !== messageId)); + } + setIsLoading(false); + setProcessingStatus(""); + streamingAbortController.current?.abort(); + }; + const handleSend = async () => { if (!inputValue.trim() || isLoading) return; if (!workflowPermanentId) { @@ -251,10 +304,16 @@ export function WorkflowCopilotChat({ content: inputValue, }; + pendingMessageId.current = userMessageId; setMessages((prev) => [...prev, userMessage]); const messageContent = inputValue; setInputValue(""); setIsLoading(true); + setProcessingStatus("Starting..."); + + const abortController = new AbortController(); + streamingAbortController.current?.abort(); + streamingAbortController.current = abortController; try { const saveData = getSaveData(); @@ -314,69 +373,97 @@ export function WorkflowCopilotChat({ workflowYaml = convertToYAML(requestBody); } - const client = await getSseClient(credentialGetter); - const response = await client.post<{ - workflow_copilot_chat_id?: string; - message?: string; - updated_workflow_yaml?: string | null; - request_time?: string; - response_time?: string; - error?: string; - }>("/workflow/copilot/chat-post", { - workflow_permanent_id: workflowPermanentId, - workflow_copilot_chat_id: workflowCopilotChatId, - workflow_run_id: workflowRunId, - message: messageContent, - workflow_yaml: workflowYaml, - }); + const handleProcessingUpdate = ( + payload: WorkflowCopilotProcessingUpdate, + ) => { + if (payload.status) { + setProcessingStatus(payload.status); + } - if (response.error) { - throw new Error(response.error); - } + const pendingId = pendingMessageId.current; + if (!pendingId || !payload.timestamp) { + return; + } - if ( - !response.workflow_copilot_chat_id || - !response.message || - !response.request_time || - !response.response_time - ) { - throw new Error("No response received."); - } - - setWorkflowCopilotChatId(response.workflow_copilot_chat_id); - - const aiMessage: ChatMessage = { - id: Date.now().toString(), - sender: "ai", - content: response.message || "I received your message.", - timestamp: response.response_time, + setMessages((prev) => + prev.map((message) => + message.id === pendingId + ? { + ...message, + timestamp: payload.timestamp, + } + : message, + ), + ); }; - setMessages((prev) => [ - ...prev.map((message) => - message.id === userMessageId - ? { - ...message, - timestamp: response.request_time, - } - : message, - ), - aiMessage, - ]); + const handleResponse = (response: WorkflowCopilotStreamResponse) => { + setWorkflowCopilotChatId(response.workflow_copilot_chat_id); - if (response.updated_workflow_yaml && onWorkflowUpdate) { - try { - onWorkflowUpdate(response.updated_workflow_yaml); - } catch (updateError) { - console.error("Failed to update workflow:", updateError); - toast({ - title: "Update failed", - description: "Failed to apply workflow changes. Please try again.", - variant: "destructive", - }); + const aiMessage: ChatMessage = { + id: Date.now().toString(), + sender: "ai", + content: response.message, + timestamp: response.response_time, + }; + + setMessages((prev) => [...prev, aiMessage]); + + if (response.updated_workflow_yaml && onWorkflowUpdate) { + try { + onWorkflowUpdate(response.updated_workflow_yaml); + } catch (updateError) { + console.error("Failed to update workflow:", updateError); + toast({ + title: "Update failed", + description: + "Failed to apply workflow changes. Please try again.", + variant: "destructive", + }); + } } - } + }; + + const handleError = (payload: WorkflowCopilotStreamError) => { + const errorMessage: ChatMessage = { + id: Date.now().toString(), + sender: "ai", + content: payload.error, + }; + setMessages((prev) => [...prev, errorMessage]); + }; + + const client = await getSseClient(credentialGetter); + await client.postStreaming( + "/workflow/copilot/chat-post", + { + workflow_permanent_id: workflowPermanentId, + workflow_copilot_chat_id: workflowCopilotChatId, + workflow_run_id: workflowRunId, + message: messageContent, + workflow_yaml: workflowYaml, + }, + (payload) => { + switch (payload.type) { + case "processing_update": + handleProcessingUpdate(payload); + return false; + case "response": + handleResponse(payload); + return true; + case "error": + handleError(payload); + return true; + default: + return false; + } + }, + { signal: abortController.signal }, + ); } catch (error) { + if (abortController.signal.aborted) { + return; + } console.error("Failed to send message:", error); const errorMessage: ChatMessage = { id: Date.now().toString(), @@ -385,12 +472,18 @@ export function WorkflowCopilotChat({ }; setMessages((prev) => [...prev, errorMessage]); } finally { + if (streamingAbortController.current === abortController) { + streamingAbortController.current = null; + } + pendingMessageId.current = null; setIsLoading(false); + setProcessingStatus(""); } }; - const handleKeyPress = (e: React.KeyboardEvent) => { - if (e.key === "Enter") { + const handleKeyPress = (e: React.KeyboardEvent) => { + if (e.key === "Enter" && !e.shiftKey) { + e.preventDefault(); handleSend(); } }; @@ -614,7 +707,7 @@ export function WorkflowCopilotChat({
- Processing... + {processingStatus || "Processing..."}
@@ -625,15 +718,21 @@ export function WorkflowCopilotChat({ {/* Input */}
-
- +