diff --git a/pyproject.toml b/pyproject.toml index 8f864b5b..32ea2d49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ dependencies = [ "python-calamine>=0.6.1", "urllib3>=2.6.0", "zstandard>=0.25.0", + "sse-starlette>=3.0.3,<4", ] [dependency-groups] diff --git a/skyvern-frontend/package-lock.json b/skyvern-frontend/package-lock.json index d480866a..2b323420 100644 --- a/skyvern-frontend/package-lock.json +++ b/skyvern-frontend/package-lock.json @@ -13,6 +13,7 @@ "@codemirror/lang-python": "^6.1.6", "@dagrejs/dagre": "^1.1.4", "@hookform/resolvers": "^3.3.4", + "@microsoft/fetch-event-source": "^2.0.1", "@novnc/novnc": "1.5.x", "@radix-ui/react-accordion": "^1.1.2", "@radix-ui/react-aspect-ratio": "^1.0.3", @@ -261,7 +262,6 @@ "version": "6.33.0", "resolved": "https://registry.npmjs.org/@codemirror/view/-/view-6.33.0.tgz", "integrity": "sha512-AroaR3BvnjRW8fiZBalAaK+ZzB5usGgI014YKElYZvQdNH5ZIidHlO+cyf/2rWzyBFRkvG6VhiXeAEbC53P2YQ==", - "peer": true, "dependencies": { "@codemirror/state": "^6.4.0", "style-mod": "^4.1.0", @@ -1033,6 +1033,12 @@ "@lezer/lr": "^1.0.0" } }, + "node_modules/@microsoft/fetch-event-source": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz", + "integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==", + "license": "MIT" + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -3884,7 +3890,6 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-20.11.30.tgz", "integrity": "sha512-dHM6ZxwlmuZaRmUPfv1p+KrdD1Dci04FbdEm/9wEMouFqxYoFl5aMkt0VMAUtYRQDyYvD41WJLukhq/ha3YuTw==", "dev": true, - "peer": true, "dependencies": { "undici-types": "~5.26.4" } @@ -3900,7 +3905,6 @@ "resolved": "https://registry.npmjs.org/@types/react/-/react-18.2.67.tgz", "integrity": "sha512-vkIE2vTIMHQ/xL0rgmuoECBCkZFZeHr49HeWSc24AptMbNRo7pwSBvj73rlJJs9fGKj0koS+V7kQB1jHS0uCgw==", "devOptional": true, - "peer": true, "dependencies": { "@types/prop-types": "*", "@types/scheduler": "*", @@ -3912,7 +3916,6 @@ "resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.2.22.tgz", "integrity": "sha512-fHkBXPeNtfvri6gdsMYyW+dW7RXFo6Ad09nLFK0VQWR7yGLai/Cyvyj696gbwYvBnhGtevUG9cET0pmUbMtoPQ==", "devOptional": true, - "peer": true, "dependencies": { "@types/react": "*" } @@ -3961,7 +3964,6 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-7.16.0.tgz", "integrity": "sha512-ar9E+k7CU8rWi2e5ErzQiC93KKEFAXA2Kky0scAlPcxYblLt8+XZuHUZwlyfXILyQa95P6lQg+eZgh/dDs3+Vw==", "dev": true, - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "7.16.0", "@typescript-eslint/types": "7.16.0", @@ -4340,7 +4342,6 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.11.3.tgz", "integrity": "sha512-Y9rRfJG5jcKOE0CLisYbojUjIrIEE7AGMzA/Sm4BslANhbS+cDMpgBdcPT91oJ7OuJ9hYJBx59RjbhxVnrF8Xg==", "dev": true, - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -4617,7 +4618,6 @@ "url": "https://github.com/sponsors/ai" } ], - "peer": true, "dependencies": { "caniuse-lite": "^1.0.30001587", "electron-to-chromium": "^1.4.668", @@ -5114,7 +5114,6 @@ "version": "3.0.0", "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", - "peer": true, "engines": { "node": ">=12" } @@ -5327,8 +5326,7 @@ "node_modules/embla-carousel": { "version": "8.0.0", "resolved": "https://registry.npmjs.org/embla-carousel/-/embla-carousel-8.0.0.tgz", - "integrity": "sha512-ecixcyqS6oKD2nh5Nj5MObcgoSILWNI/GtBxkidn5ytFaCCmwVHo2SecksaQZHcARMMpIR2dWOlSIdA1LkZFUA==", - "peer": true + "integrity": "sha512-ecixcyqS6oKD2nh5Nj5MObcgoSILWNI/GtBxkidn5ytFaCCmwVHo2SecksaQZHcARMMpIR2dWOlSIdA1LkZFUA==" }, "node_modules/embla-carousel-react": { "version": "8.0.0", @@ -5495,7 +5493,6 @@ "integrity": "sha512-ypowyDxpVSYpkXr9WPv2PAZCtNip1Mv5KTW0SCurXv/9iOpcrH9PaqUElksqEB6pChqHGDRCFTyrZlGhnLNGiA==", "deprecated": "This version is no longer supported. Please see https://eslint.org/version-support for other options.", "dev": true, - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.2.0", "@eslint-community/regexpp": "^4.6.1", @@ -5551,7 +5548,6 @@ "resolved": "https://registry.npmjs.org/eslint-config-prettier/-/eslint-config-prettier-9.1.0.tgz", "integrity": "sha512-NSWl5BFQWEPi1j4TjVNItzYV7dZXZ+wP6I6ZhrBGpChQhZRUaElihE9uRRkcbRnNb76UMKDF3r+WTmNcGPKsqw==", "dev": true, - "peer": true, "bin": { "eslint-config-prettier": "bin/cli.js" }, @@ -7374,7 +7370,6 @@ "url": "https://github.com/sponsors/ai" } ], - "peer": true, "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", @@ -7542,7 +7537,6 @@ "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.2.5.tgz", "integrity": "sha512-3/GWa9aOC0YeD7LUfvOG2NiDyhOWRvt1k+rcKhOuYnMY24iiCphgneUfJDyFXd6rZCAnuLBv6UeAULtrhT/F4A==", "dev": true, - "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -7745,7 +7739,6 @@ "version": "18.2.0", "resolved": "https://registry.npmjs.org/react/-/react-18.2.0.tgz", "integrity": "sha512-/3IjMdb2L9QbBdWiW5e3P2/npwMBaU9mHCSCUzNln0ZCYbcfTsGbTJrU/kGemdH2IWmB2ioZ+zkxtmq6g09fGQ==", - "peer": true, "dependencies": { "loose-envify": "^1.1.0" }, @@ -7757,7 +7750,6 @@ "version": "18.2.0", "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-18.2.0.tgz", "integrity": "sha512-6IMTriUmvsjHUjNtEDudZfuDQUoWXVxKHhlEGSk81n4YFS+r/Kl99wXiwlVXtPBtJenozv2P+hxDsw9eA7Xo6g==", - "peer": true, "dependencies": { "loose-envify": "^1.1.0", "scheduler": "^0.23.0" @@ -7795,7 +7787,6 @@ "version": "7.51.1", "resolved": "https://registry.npmjs.org/react-hook-form/-/react-hook-form-7.51.1.tgz", "integrity": "sha512-ifnBjl+kW0ksINHd+8C/Gp6a4eZOdWyvRv0UBaByShwU8JbVx5hTcTWEcd5VdybvmPTATkVVXk9npXArHmo56w==", - "peer": true, "engines": { "node": ">=12.22.0" }, @@ -8686,7 +8677,6 @@ "version": "3.4.17", "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.17.tgz", "integrity": "sha512-w33E2aCvSDP0tW9RZuNXadXlkHXqFzSkQew/aIa2i/Sj8fThxwovwlXHSPXTbAHwEIhBFXAedUhP2tueAKP8Og==", - "peer": true, "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", @@ -8807,7 +8797,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -8927,7 +8916,6 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.5.3.tgz", "integrity": "sha512-/hreyEujaB0w76zKo6717l3L0o/qEUtRgdvUBvlkhoWeOVMjMuHNHk0BRBzikzuGDqNmPQbg5ifMEqsHLiIUcQ==", "dev": true, - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -9088,7 +9076,6 @@ "integrity": "sha512-o5a9xKjbtuhY6Bi5S3+HvbRERmouabWbyUcpXXUA1u+GNUKoROi9byOJ8M0nHbHYHkYICiMlqxkg1KkYmm25Sw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.21.3", "postcss": "^8.4.43", diff --git a/skyvern-frontend/package.json b/skyvern-frontend/package.json index 19a83634..d29c57ee 100644 --- a/skyvern-frontend/package.json +++ b/skyvern-frontend/package.json @@ -54,6 +54,7 @@ "country-state-city": "^3.2.1", "cross-spawn": "^7.0.6", "embla-carousel-react": "^8.0.0", + "@microsoft/fetch-event-source": "^2.0.1", "express": "^4.21.2", "fetch-to-curl": "^0.6.0", "nanoid": "^5.0.7", diff --git a/skyvern-frontend/src/api/sse.ts b/skyvern-frontend/src/api/sse.ts new file mode 100644 index 00000000..636df9b6 --- /dev/null +++ b/skyvern-frontend/src/api/sse.ts @@ -0,0 +1,87 @@ +import { fetchEventSource } from "@microsoft/fetch-event-source"; +import type { CredentialGetter } from "@/api/AxiosClient"; +import { getRuntimeApiKey, runsApiBaseUrl } from "@/util/env"; + +export type SseJsonPayload = Record; + +type SseClient = { + post: (path: string, body: unknown) => Promise; +}; + +export async function fetchJsonSse( + input: RequestInfo | URL, + init: RequestInit, +): Promise { + const controller = new AbortController(); + try { + const parsedPayload = await new Promise((resolve, reject) => { + fetchEventSource(input instanceof URL ? input.toString() : input, { + method: init.method, + headers: init.headers as Record, + body: init.body, + signal: controller.signal, + onmessage: (event) => { + if (!event.data || !event.data.trim()) { + return; + } + try { + const payload = JSON.parse(event.data) as T; + resolve(payload); + } catch (error) { + reject(error); + } + }, + onerror: (error) => { + reject(error); + }, + onopen: async (response) => { + if (!response.ok) { + const errorText = await response.text(); + reject(new Error(errorText || "Failed to send request.")); + } + }, + }).catch(reject); + }); + + return parsedPayload; + } finally { + controller.abort(); + } +} + +export async function getSseClient( + credentialGetter: CredentialGetter | null, +): Promise { + const requestHeaders: Record = { + Accept: "text/event-stream", + "Content-Type": "application/json", + "x-user-agent": "skyvern-ui", + }; + + let authToken: string | null = null; + if (credentialGetter) { + authToken = await credentialGetter(); + } + + if (authToken) { + requestHeaders.Authorization = `Bearer ${authToken}`; + } else { + const apiKey = getRuntimeApiKey(); + if (apiKey) { + requestHeaders["X-API-Key"] = apiKey; + } + } + + return { + post: (path: string, body: unknown) => { + return fetchJsonSse( + `${runsApiBaseUrl.replace(/\/$/, "")}/${path.replace(/^\//, "")}`, + { + method: "POST", + headers: requestHeaders, + body: JSON.stringify(body), + }, + ); + }, + }; +} diff --git a/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx b/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx index ecf22134..47fa6306 100644 --- a/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx +++ b/skyvern-frontend/src/routes/workflows/editor/WorkflowCopilotChat.tsx @@ -8,6 +8,7 @@ import { stringify as convertToYAML } from "yaml"; import { useWorkflowHasChangesStore } from "@/store/WorkflowHasChangesStore"; import { WorkflowCreateYAMLRequest } from "@/routes/workflows/types/workflowYamlTypes"; import { toast } from "@/components/ui/use-toast"; +import { getSseClient } from "@/api/sse"; interface ChatMessage { id: string; @@ -309,35 +310,42 @@ export function WorkflowCopilotChat({ workflowYaml = convertToYAML(requestBody); } - const client = await getClient(credentialGetter, "sans-api-v1"); - + const client = await getSseClient(credentialGetter); const response = await client.post<{ - workflow_copilot_chat_id: string; - message: string; - updated_workflow_yaml: string | null; - request_time: string; - response_time: string; - }>( - "/workflow/copilot/chat-post", - { - workflow_permanent_id: workflowPermanentId, - workflow_copilot_chat_id: workflowCopilotChatId, - workflow_run_id: workflowRunId, - message: messageContent, - workflow_yaml: workflowYaml, - }, - { - timeout: 300000, - }, - ); + workflow_copilot_chat_id?: string; + message?: string; + updated_workflow_yaml?: string | null; + request_time?: string; + response_time?: string; + error?: string; + }>("/workflow/copilot/chat-post", { + workflow_permanent_id: workflowPermanentId, + workflow_copilot_chat_id: workflowCopilotChatId, + workflow_run_id: workflowRunId, + message: messageContent, + workflow_yaml: workflowYaml, + }); - setWorkflowCopilotChatId(response.data.workflow_copilot_chat_id); + if (response.error) { + throw new Error(response.error); + } + + if ( + !response.workflow_copilot_chat_id || + !response.message || + !response.request_time || + !response.response_time + ) { + throw new Error("No response received."); + } + + setWorkflowCopilotChatId(response.workflow_copilot_chat_id); const aiMessage: ChatMessage = { id: Date.now().toString(), sender: "ai", - content: response.data.message || "I received your message.", - timestamp: response.data.response_time, + content: response.message || "I received your message.", + timestamp: response.response_time, }; setMessages((prev) => [ @@ -345,16 +353,16 @@ export function WorkflowCopilotChat({ message.id === userMessageId ? { ...message, - timestamp: response.data.request_time, + timestamp: response.request_time, } : message, ), aiMessage, ]); - if (response.data.updated_workflow_yaml && onWorkflowUpdate) { + if (response.updated_workflow_yaml && onWorkflowUpdate) { try { - onWorkflowUpdate(response.data.updated_workflow_yaml); + onWorkflowUpdate(response.updated_workflow_yaml); } catch (updateError) { console.error("Failed to update workflow:", updateError); toast({ diff --git a/skyvern/forge/sdk/routes/workflow_copilot.py b/skyvern/forge/sdk/routes/workflow_copilot.py index ea6b7a26..d0b3fa25 100644 --- a/skyvern/forge/sdk/routes/workflow_copilot.py +++ b/skyvern/forge/sdk/routes/workflow_copilot.py @@ -2,11 +2,12 @@ import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import Any +from typing import Any, AsyncGenerator import structlog import yaml from fastapi import Depends, HTTPException, status +from sse_starlette import EventSourceResponse, JSONServerSentEvent, ServerSentEvent from skyvern.forge import app from skyvern.forge.prompts import prompt_engine @@ -29,6 +30,7 @@ from skyvern.schemas.workflows import LoginBlockYAML, WorkflowCreateYAMLRequest WORKFLOW_KNOWLEDGE_BASE_PATH = Path("skyvern/forge/prompts/skyvern/workflow_knowledge_base.txt") CHAT_HISTORY_CONTEXT_MESSAGES = 10 +SSE_KEEPALIVE_INTERVAL_SECONDS = 10 LOG = structlog.get_logger() @@ -236,7 +238,7 @@ async def _process_workflow_yaml(action_data: dict[str, Any]) -> None | str: async def workflow_copilot_chat_post( chat_request: WorkflowCopilotChatRequest, organization: Organization = Depends(org_auth_service.get_current_org), -) -> WorkflowCopilotChatResponse: +) -> EventSourceResponse: LOG.info( "Workflow copilot chat request", workflow_copilot_chat_id=chat_request.workflow_copilot_chat_id, @@ -292,53 +294,57 @@ async def workflow_copilot_chat_post( content=chat_request.message, ) - try: - user_response, updated_workflow_yaml, updated_global_llm_context = await copilot_call_llm( - organization.organization_id, - chat_request, - convert_to_history_messages(chat_messages[-CHAT_HISTORY_CONTEXT_MESSAGES:]), - global_llm_context, - debug_run_info_text, - ) - except HTTPException: - raise - except LLMProviderError as e: - LOG.error( - "LLM provider error", - organization_id=organization.organization_id, - error=str(e), - exc_info=True, - ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to process your request. Please try again.", - ) - except Exception as e: - LOG.error( - "Unexpected error in workflow copilot", - organization_id=organization.organization_id, - error=str(e), - exc_info=True, - ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred: {str(e)}", - ) + async def event_stream() -> AsyncGenerator[JSONServerSentEvent, None]: + try: + user_response, updated_workflow_yaml, updated_global_llm_context = await copilot_call_llm( + organization.organization_id, + chat_request, + convert_to_history_messages(chat_messages[-CHAT_HISTORY_CONTEXT_MESSAGES:]), + global_llm_context, + debug_run_info_text, + ) + assistant_message = await app.DATABASE.create_workflow_copilot_chat_message( + organization_id=chat.organization_id, + workflow_copilot_chat_id=chat.workflow_copilot_chat_id, + sender=WorkflowCopilotChatSender.AI, + content=user_response, + global_llm_context=updated_global_llm_context, + ) - assistant_message = await app.DATABASE.create_workflow_copilot_chat_message( - organization_id=chat.organization_id, - workflow_copilot_chat_id=chat.workflow_copilot_chat_id, - sender=WorkflowCopilotChatSender.AI, - content=user_response, - global_llm_context=updated_global_llm_context, - ) + response_payload = WorkflowCopilotChatResponse( + workflow_copilot_chat_id=chat.workflow_copilot_chat_id, + message=user_response, + updated_workflow_yaml=updated_workflow_yaml, + request_time=request_started_at, + response_time=assistant_message.created_at, + ).model_dump(mode="json") + yield JSONServerSentEvent(response_payload) + except HTTPException as exc: + yield JSONServerSentEvent({"error": exc.detail}) + except LLMProviderError as exc: + LOG.error( + "LLM provider error", + organization_id=organization.organization_id, + error=str(exc), + exc_info=True, + ) + yield JSONServerSentEvent({"error": "Failed to process your request. Please try again."}) + except Exception as exc: + LOG.error( + "Unexpected error in workflow copilot", + organization_id=organization.organization_id, + error=str(exc), + exc_info=True, + ) + yield JSONServerSentEvent({"error": "An error occurred. Please try again."}) - return WorkflowCopilotChatResponse( - workflow_copilot_chat_id=chat.workflow_copilot_chat_id, - message=user_response, - updated_workflow_yaml=updated_workflow_yaml, - request_time=request_started_at, - response_time=assistant_message.created_at, + def ping_message_factory() -> ServerSentEvent: + return ServerSentEvent(comment="keep-alive") + + return EventSourceResponse( + event_stream(), + ping=SSE_KEEPALIVE_INTERVAL_SECONDS, + ping_message_factory=ping_message_factory, ) diff --git a/uv.lock b/uv.lock index f6999e3a..2e33226d 100644 --- a/uv.lock +++ b/uv.lock @@ -5135,6 +5135,7 @@ dependencies = [ { name = "requests-toolbelt" }, { name = "rich", extra = ["jupyter"] }, { name = "sqlalchemy", extra = ["mypy"] }, + { name = "sse-starlette" }, { name = "starlette-context" }, { name = "structlog" }, { name = "tiktoken" }, @@ -5234,6 +5235,7 @@ requires-dist = [ { name = "requests-toolbelt", specifier = ">=1.0.0,<2" }, { name = "rich", extras = ["jupyter"], specifier = ">=13.7.0,<14" }, { name = "sqlalchemy", extras = ["mypy"], specifier = ">=2.0.29,<3" }, + { name = "sse-starlette", specifier = ">=3.0.3,<4" }, { name = "starlette-context", specifier = ">=0.3.6,<0.4" }, { name = "structlog", specifier = ">=23.2.0,<24" }, { name = "tiktoken", specifier = ">=0.9.0" },