Workflow Copilot: Use streaming in /chat-post (#4437)

This commit is contained in:
Stanislav Novosad
2026-01-12 16:12:29 -07:00
committed by GitHub
parent 1d38c7bfe8
commit a6f0781491
7 changed files with 186 additions and 94 deletions

View File

@@ -69,6 +69,7 @@ dependencies = [
"python-calamine>=0.6.1",
"urllib3>=2.6.0",
"zstandard>=0.25.0",
"sse-starlette>=3.0.3,<4",
]
[dependency-groups]

View File

@@ -13,6 +13,7 @@
"@codemirror/lang-python": "^6.1.6",
"@dagrejs/dagre": "^1.1.4",
"@hookform/resolvers": "^3.3.4",
"@microsoft/fetch-event-source": "^2.0.1",
"@novnc/novnc": "1.5.x",
"@radix-ui/react-accordion": "^1.1.2",
"@radix-ui/react-aspect-ratio": "^1.0.3",
@@ -261,7 +262,6 @@
"version": "6.33.0",
"resolved": "https://registry.npmjs.org/@codemirror/view/-/view-6.33.0.tgz",
"integrity": "sha512-AroaR3BvnjRW8fiZBalAaK+ZzB5usGgI014YKElYZvQdNH5ZIidHlO+cyf/2rWzyBFRkvG6VhiXeAEbC53P2YQ==",
"peer": true,
"dependencies": {
"@codemirror/state": "^6.4.0",
"style-mod": "^4.1.0",
@@ -1033,6 +1033,12 @@
"@lezer/lr": "^1.0.0"
}
},
"node_modules/@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==",
"license": "MIT"
},
"node_modules/@nodelib/fs.scandir": {
"version": "2.1.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
@@ -3884,7 +3890,6 @@
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.11.30.tgz",
"integrity": "sha512-dHM6ZxwlmuZaRmUPfv1p+KrdD1Dci04FbdEm/9wEMouFqxYoFl5aMkt0VMAUtYRQDyYvD41WJLukhq/ha3YuTw==",
"dev": true,
"peer": true,
"dependencies": {
"undici-types": "~5.26.4"
}
@@ -3900,7 +3905,6 @@
"resolved": "https://registry.npmjs.org/@types/react/-/react-18.2.67.tgz",
"integrity": "sha512-vkIE2vTIMHQ/xL0rgmuoECBCkZFZeHr49HeWSc24AptMbNRo7pwSBvj73rlJJs9fGKj0koS+V7kQB1jHS0uCgw==",
"devOptional": true,
"peer": true,
"dependencies": {
"@types/prop-types": "*",
"@types/scheduler": "*",
@@ -3912,7 +3916,6 @@
"resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-18.2.22.tgz",
"integrity": "sha512-fHkBXPeNtfvri6gdsMYyW+dW7RXFo6Ad09nLFK0VQWR7yGLai/Cyvyj696gbwYvBnhGtevUG9cET0pmUbMtoPQ==",
"devOptional": true,
"peer": true,
"dependencies": {
"@types/react": "*"
}
@@ -3961,7 +3964,6 @@
"resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-7.16.0.tgz",
"integrity": "sha512-ar9E+k7CU8rWi2e5ErzQiC93KKEFAXA2Kky0scAlPcxYblLt8+XZuHUZwlyfXILyQa95P6lQg+eZgh/dDs3+Vw==",
"dev": true,
"peer": true,
"dependencies": {
"@typescript-eslint/scope-manager": "7.16.0",
"@typescript-eslint/types": "7.16.0",
@@ -4340,7 +4342,6 @@
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.11.3.tgz",
"integrity": "sha512-Y9rRfJG5jcKOE0CLisYbojUjIrIEE7AGMzA/Sm4BslANhbS+cDMpgBdcPT91oJ7OuJ9hYJBx59RjbhxVnrF8Xg==",
"dev": true,
"peer": true,
"bin": {
"acorn": "bin/acorn"
},
@@ -4617,7 +4618,6 @@
"url": "https://github.com/sponsors/ai"
}
],
"peer": true,
"dependencies": {
"caniuse-lite": "^1.0.30001587",
"electron-to-chromium": "^1.4.668",
@@ -5114,7 +5114,6 @@
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz",
"integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==",
"peer": true,
"engines": {
"node": ">=12"
}
@@ -5327,8 +5326,7 @@
"node_modules/embla-carousel": {
"version": "8.0.0",
"resolved": "https://registry.npmjs.org/embla-carousel/-/embla-carousel-8.0.0.tgz",
"integrity": "sha512-ecixcyqS6oKD2nh5Nj5MObcgoSILWNI/GtBxkidn5ytFaCCmwVHo2SecksaQZHcARMMpIR2dWOlSIdA1LkZFUA==",
"peer": true
"integrity": "sha512-ecixcyqS6oKD2nh5Nj5MObcgoSILWNI/GtBxkidn5ytFaCCmwVHo2SecksaQZHcARMMpIR2dWOlSIdA1LkZFUA=="
},
"node_modules/embla-carousel-react": {
"version": "8.0.0",
@@ -5495,7 +5493,6 @@
"integrity": "sha512-ypowyDxpVSYpkXr9WPv2PAZCtNip1Mv5KTW0SCurXv/9iOpcrH9PaqUElksqEB6pChqHGDRCFTyrZlGhnLNGiA==",
"deprecated": "This version is no longer supported. Please see https://eslint.org/version-support for other options.",
"dev": true,
"peer": true,
"dependencies": {
"@eslint-community/eslint-utils": "^4.2.0",
"@eslint-community/regexpp": "^4.6.1",
@@ -5551,7 +5548,6 @@
"resolved": "https://registry.npmjs.org/eslint-config-prettier/-/eslint-config-prettier-9.1.0.tgz",
"integrity": "sha512-NSWl5BFQWEPi1j4TjVNItzYV7dZXZ+wP6I6ZhrBGpChQhZRUaElihE9uRRkcbRnNb76UMKDF3r+WTmNcGPKsqw==",
"dev": true,
"peer": true,
"bin": {
"eslint-config-prettier": "bin/cli.js"
},
@@ -7374,7 +7370,6 @@
"url": "https://github.com/sponsors/ai"
}
],
"peer": true,
"dependencies": {
"nanoid": "^3.3.11",
"picocolors": "^1.1.1",
@@ -7542,7 +7537,6 @@
"resolved": "https://registry.npmjs.org/prettier/-/prettier-3.2.5.tgz",
"integrity": "sha512-3/GWa9aOC0YeD7LUfvOG2NiDyhOWRvt1k+rcKhOuYnMY24iiCphgneUfJDyFXd6rZCAnuLBv6UeAULtrhT/F4A==",
"dev": true,
"peer": true,
"bin": {
"prettier": "bin/prettier.cjs"
},
@@ -7745,7 +7739,6 @@
"version": "18.2.0",
"resolved": "https://registry.npmjs.org/react/-/react-18.2.0.tgz",
"integrity": "sha512-/3IjMdb2L9QbBdWiW5e3P2/npwMBaU9mHCSCUzNln0ZCYbcfTsGbTJrU/kGemdH2IWmB2ioZ+zkxtmq6g09fGQ==",
"peer": true,
"dependencies": {
"loose-envify": "^1.1.0"
},
@@ -7757,7 +7750,6 @@
"version": "18.2.0",
"resolved": "https://registry.npmjs.org/react-dom/-/react-dom-18.2.0.tgz",
"integrity": "sha512-6IMTriUmvsjHUjNtEDudZfuDQUoWXVxKHhlEGSk81n4YFS+r/Kl99wXiwlVXtPBtJenozv2P+hxDsw9eA7Xo6g==",
"peer": true,
"dependencies": {
"loose-envify": "^1.1.0",
"scheduler": "^0.23.0"
@@ -7795,7 +7787,6 @@
"version": "7.51.1",
"resolved": "https://registry.npmjs.org/react-hook-form/-/react-hook-form-7.51.1.tgz",
"integrity": "sha512-ifnBjl+kW0ksINHd+8C/Gp6a4eZOdWyvRv0UBaByShwU8JbVx5hTcTWEcd5VdybvmPTATkVVXk9npXArHmo56w==",
"peer": true,
"engines": {
"node": ">=12.22.0"
},
@@ -8686,7 +8677,6 @@
"version": "3.4.17",
"resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.17.tgz",
"integrity": "sha512-w33E2aCvSDP0tW9RZuNXadXlkHXqFzSkQew/aIa2i/Sj8fThxwovwlXHSPXTbAHwEIhBFXAedUhP2tueAKP8Og==",
"peer": true,
"dependencies": {
"@alloc/quick-lru": "^5.2.0",
"arg": "^5.0.2",
@@ -8807,7 +8797,6 @@
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
"dev": true,
"license": "MIT",
"peer": true,
"engines": {
"node": ">=12"
},
@@ -8927,7 +8916,6 @@
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.5.3.tgz",
"integrity": "sha512-/hreyEujaB0w76zKo6717l3L0o/qEUtRgdvUBvlkhoWeOVMjMuHNHk0BRBzikzuGDqNmPQbg5ifMEqsHLiIUcQ==",
"dev": true,
"peer": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
@@ -9088,7 +9076,6 @@
"integrity": "sha512-o5a9xKjbtuhY6Bi5S3+HvbRERmouabWbyUcpXXUA1u+GNUKoROi9byOJ8M0nHbHYHkYICiMlqxkg1KkYmm25Sw==",
"dev": true,
"license": "MIT",
"peer": true,
"dependencies": {
"esbuild": "^0.21.3",
"postcss": "^8.4.43",

View File

@@ -54,6 +54,7 @@
"country-state-city": "^3.2.1",
"cross-spawn": "^7.0.6",
"embla-carousel-react": "^8.0.0",
"@microsoft/fetch-event-source": "^2.0.1",
"express": "^4.21.2",
"fetch-to-curl": "^0.6.0",
"nanoid": "^5.0.7",

View File

@@ -0,0 +1,87 @@
import { fetchEventSource } from "@microsoft/fetch-event-source";
import type { CredentialGetter } from "@/api/AxiosClient";
import { getRuntimeApiKey, runsApiBaseUrl } from "@/util/env";
export type SseJsonPayload = Record<string, unknown>;
type SseClient = {
post: <T extends SseJsonPayload>(path: string, body: unknown) => Promise<T>;
};
export async function fetchJsonSse<T extends SseJsonPayload>(
input: RequestInfo | URL,
init: RequestInit,
): Promise<T> {
const controller = new AbortController();
try {
const parsedPayload = await new Promise<T>((resolve, reject) => {
fetchEventSource(input instanceof URL ? input.toString() : input, {
method: init.method,
headers: init.headers as Record<string, string>,
body: init.body,
signal: controller.signal,
onmessage: (event) => {
if (!event.data || !event.data.trim()) {
return;
}
try {
const payload = JSON.parse(event.data) as T;
resolve(payload);
} catch (error) {
reject(error);
}
},
onerror: (error) => {
reject(error);
},
onopen: async (response) => {
if (!response.ok) {
const errorText = await response.text();
reject(new Error(errorText || "Failed to send request."));
}
},
}).catch(reject);
});
return parsedPayload;
} finally {
controller.abort();
}
}
export async function getSseClient(
credentialGetter: CredentialGetter | null,
): Promise<SseClient> {
const requestHeaders: Record<string, string> = {
Accept: "text/event-stream",
"Content-Type": "application/json",
"x-user-agent": "skyvern-ui",
};
let authToken: string | null = null;
if (credentialGetter) {
authToken = await credentialGetter();
}
if (authToken) {
requestHeaders.Authorization = `Bearer ${authToken}`;
} else {
const apiKey = getRuntimeApiKey();
if (apiKey) {
requestHeaders["X-API-Key"] = apiKey;
}
}
return {
post: <T extends SseJsonPayload>(path: string, body: unknown) => {
return fetchJsonSse<T>(
`${runsApiBaseUrl.replace(/\/$/, "")}/${path.replace(/^\//, "")}`,
{
method: "POST",
headers: requestHeaders,
body: JSON.stringify(body),
},
);
},
};
}

View File

@@ -8,6 +8,7 @@ import { stringify as convertToYAML } from "yaml";
import { useWorkflowHasChangesStore } from "@/store/WorkflowHasChangesStore";
import { WorkflowCreateYAMLRequest } from "@/routes/workflows/types/workflowYamlTypes";
import { toast } from "@/components/ui/use-toast";
import { getSseClient } from "@/api/sse";
interface ChatMessage {
id: string;
@@ -309,35 +310,42 @@ export function WorkflowCopilotChat({
workflowYaml = convertToYAML(requestBody);
}
const client = await getClient(credentialGetter, "sans-api-v1");
const client = await getSseClient(credentialGetter);
const response = await client.post<{
workflow_copilot_chat_id: string;
message: string;
updated_workflow_yaml: string | null;
request_time: string;
response_time: string;
}>(
"/workflow/copilot/chat-post",
{
workflow_copilot_chat_id?: string;
message?: string;
updated_workflow_yaml?: string | null;
request_time?: string;
response_time?: string;
error?: string;
}>("/workflow/copilot/chat-post", {
workflow_permanent_id: workflowPermanentId,
workflow_copilot_chat_id: workflowCopilotChatId,
workflow_run_id: workflowRunId,
message: messageContent,
workflow_yaml: workflowYaml,
},
{
timeout: 300000,
},
);
});
setWorkflowCopilotChatId(response.data.workflow_copilot_chat_id);
if (response.error) {
throw new Error(response.error);
}
if (
!response.workflow_copilot_chat_id ||
!response.message ||
!response.request_time ||
!response.response_time
) {
throw new Error("No response received.");
}
setWorkflowCopilotChatId(response.workflow_copilot_chat_id);
const aiMessage: ChatMessage = {
id: Date.now().toString(),
sender: "ai",
content: response.data.message || "I received your message.",
timestamp: response.data.response_time,
content: response.message || "I received your message.",
timestamp: response.response_time,
};
setMessages((prev) => [
@@ -345,16 +353,16 @@ export function WorkflowCopilotChat({
message.id === userMessageId
? {
...message,
timestamp: response.data.request_time,
timestamp: response.request_time,
}
: message,
),
aiMessage,
]);
if (response.data.updated_workflow_yaml && onWorkflowUpdate) {
if (response.updated_workflow_yaml && onWorkflowUpdate) {
try {
onWorkflowUpdate(response.data.updated_workflow_yaml);
onWorkflowUpdate(response.updated_workflow_yaml);
} catch (updateError) {
console.error("Failed to update workflow:", updateError);
toast({

View File

@@ -2,11 +2,12 @@ import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from typing import Any, AsyncGenerator
import structlog
import yaml
from fastapi import Depends, HTTPException, status
from sse_starlette import EventSourceResponse, JSONServerSentEvent, ServerSentEvent
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
@@ -29,6 +30,7 @@ from skyvern.schemas.workflows import LoginBlockYAML, WorkflowCreateYAMLRequest
WORKFLOW_KNOWLEDGE_BASE_PATH = Path("skyvern/forge/prompts/skyvern/workflow_knowledge_base.txt")
CHAT_HISTORY_CONTEXT_MESSAGES = 10
SSE_KEEPALIVE_INTERVAL_SECONDS = 10
LOG = structlog.get_logger()
@@ -236,7 +238,7 @@ async def _process_workflow_yaml(action_data: dict[str, Any]) -> None | str:
async def workflow_copilot_chat_post(
chat_request: WorkflowCopilotChatRequest,
organization: Organization = Depends(org_auth_service.get_current_org),
) -> WorkflowCopilotChatResponse:
) -> EventSourceResponse:
LOG.info(
"Workflow copilot chat request",
workflow_copilot_chat_id=chat_request.workflow_copilot_chat_id,
@@ -292,6 +294,7 @@ async def workflow_copilot_chat_post(
content=chat_request.message,
)
async def event_stream() -> AsyncGenerator[JSONServerSentEvent, None]:
try:
user_response, updated_workflow_yaml, updated_global_llm_context = await copilot_call_llm(
organization.organization_id,
@@ -300,31 +303,6 @@ async def workflow_copilot_chat_post(
global_llm_context,
debug_run_info_text,
)
except HTTPException:
raise
except LLMProviderError as e:
LOG.error(
"LLM provider error",
organization_id=organization.organization_id,
error=str(e),
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to process your request. Please try again.",
)
except Exception as e:
LOG.error(
"Unexpected error in workflow copilot",
organization_id=organization.organization_id,
error=str(e),
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"An error occurred: {str(e)}",
)
assistant_message = await app.DATABASE.create_workflow_copilot_chat_message(
organization_id=chat.organization_id,
workflow_copilot_chat_id=chat.workflow_copilot_chat_id,
@@ -333,12 +311,40 @@ async def workflow_copilot_chat_post(
global_llm_context=updated_global_llm_context,
)
return WorkflowCopilotChatResponse(
response_payload = WorkflowCopilotChatResponse(
workflow_copilot_chat_id=chat.workflow_copilot_chat_id,
message=user_response,
updated_workflow_yaml=updated_workflow_yaml,
request_time=request_started_at,
response_time=assistant_message.created_at,
).model_dump(mode="json")
yield JSONServerSentEvent(response_payload)
except HTTPException as exc:
yield JSONServerSentEvent({"error": exc.detail})
except LLMProviderError as exc:
LOG.error(
"LLM provider error",
organization_id=organization.organization_id,
error=str(exc),
exc_info=True,
)
yield JSONServerSentEvent({"error": "Failed to process your request. Please try again."})
except Exception as exc:
LOG.error(
"Unexpected error in workflow copilot",
organization_id=organization.organization_id,
error=str(exc),
exc_info=True,
)
yield JSONServerSentEvent({"error": "An error occurred. Please try again."})
def ping_message_factory() -> ServerSentEvent:
return ServerSentEvent(comment="keep-alive")
return EventSourceResponse(
event_stream(),
ping=SSE_KEEPALIVE_INTERVAL_SECONDS,
ping_message_factory=ping_message_factory,
)

2
uv.lock generated
View File

@@ -5135,6 +5135,7 @@ dependencies = [
{ name = "requests-toolbelt" },
{ name = "rich", extra = ["jupyter"] },
{ name = "sqlalchemy", extra = ["mypy"] },
{ name = "sse-starlette" },
{ name = "starlette-context" },
{ name = "structlog" },
{ name = "tiktoken" },
@@ -5234,6 +5235,7 @@ requires-dist = [
{ name = "requests-toolbelt", specifier = ">=1.0.0,<2" },
{ name = "rich", extras = ["jupyter"], specifier = ">=13.7.0,<14" },
{ name = "sqlalchemy", extras = ["mypy"], specifier = ">=2.0.29,<3" },
{ name = "sse-starlette", specifier = ">=3.0.3,<4" },
{ name = "starlette-context", specifier = ">=0.3.6,<0.4" },
{ name = "structlog", specifier = ">=23.2.0,<24" },
{ name = "tiktoken", specifier = ">=0.9.0" },