Workflow Copilot: convert YAML -> workflow definition on BE side (#4461)
This commit is contained in:
committed by
GitHub
parent
d05e817dcc
commit
09f2903c18
@@ -7,14 +7,16 @@ import { ReloadIcon, Cross2Icon } from "@radix-ui/react-icons";
|
||||
import { stringify as convertToYAML } from "yaml";
|
||||
import { useWorkflowHasChangesStore } from "@/store/WorkflowHasChangesStore";
|
||||
import { WorkflowCreateYAMLRequest } from "@/routes/workflows/types/workflowYamlTypes";
|
||||
import { WorkflowDefinition } from "@/routes/workflows/types/workflowTypes";
|
||||
import { toast } from "@/components/ui/use-toast";
|
||||
import { getSseClient } from "@/api/sse";
|
||||
import {
|
||||
WorkflowCopilotChatHistoryResponse,
|
||||
WorkflowCopilotProcessingUpdate,
|
||||
WorkflowCopilotStreamError,
|
||||
WorkflowCopilotStreamResponse,
|
||||
WorkflowCopilotStreamErrorUpdate,
|
||||
WorkflowCopilotStreamResponseUpdate,
|
||||
WorkflowCopilotChatSender,
|
||||
WorkflowCopilotChatRequest,
|
||||
} from "./workflowCopilotTypes";
|
||||
|
||||
interface ChatMessage {
|
||||
@@ -26,8 +28,8 @@ interface ChatMessage {
|
||||
|
||||
type WorkflowCopilotSsePayload =
|
||||
| WorkflowCopilotProcessingUpdate
|
||||
| WorkflowCopilotStreamResponse
|
||||
| WorkflowCopilotStreamError;
|
||||
| WorkflowCopilotStreamResponseUpdate
|
||||
| WorkflowCopilotStreamErrorUpdate;
|
||||
|
||||
const formatChatTimestamp = (value: string) => {
|
||||
let normalizedValue = value.replace(/\.(\d{3})\d*/, ".$1");
|
||||
@@ -65,7 +67,7 @@ const MessageItem = memo(({ message }: { message: ChatMessage }) => {
|
||||
});
|
||||
|
||||
interface WorkflowCopilotChatProps {
|
||||
onWorkflowUpdate?: (workflowYaml: string) => void;
|
||||
onWorkflowUpdate?: (workflow: WorkflowDefinition) => void;
|
||||
isOpen?: boolean;
|
||||
onClose?: () => void;
|
||||
onMessageCountChange?: (count: number) => void;
|
||||
@@ -317,8 +319,18 @@ export function WorkflowCopilotChat({
|
||||
|
||||
try {
|
||||
const saveData = getSaveData();
|
||||
const workflowId = saveData?.workflow.workflow_id;
|
||||
let workflowYaml = "";
|
||||
|
||||
if (!workflowId) {
|
||||
toast({
|
||||
title: "Missing workflow",
|
||||
description: "Workflow ID is required to chat.",
|
||||
variant: "destructive",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (saveData) {
|
||||
const extraHttpHeaders: Record<string, string> = {};
|
||||
if (saveData.settings.extraHttpHeaders) {
|
||||
@@ -397,7 +409,9 @@ export function WorkflowCopilotChat({
|
||||
);
|
||||
};
|
||||
|
||||
const handleResponse = (response: WorkflowCopilotStreamResponse) => {
|
||||
const handleResponse = (
|
||||
response: WorkflowCopilotStreamResponseUpdate,
|
||||
) => {
|
||||
setWorkflowCopilotChatId(response.workflow_copilot_chat_id);
|
||||
|
||||
const aiMessage: ChatMessage = {
|
||||
@@ -409,9 +423,9 @@ export function WorkflowCopilotChat({
|
||||
|
||||
setMessages((prev) => [...prev, aiMessage]);
|
||||
|
||||
if (response.updated_workflow_yaml && onWorkflowUpdate) {
|
||||
if (response.updated_workflow && onWorkflowUpdate) {
|
||||
try {
|
||||
onWorkflowUpdate(response.updated_workflow_yaml);
|
||||
onWorkflowUpdate(response.updated_workflow as WorkflowDefinition);
|
||||
} catch (updateError) {
|
||||
console.error("Failed to update workflow:", updateError);
|
||||
toast({
|
||||
@@ -424,7 +438,7 @@ export function WorkflowCopilotChat({
|
||||
}
|
||||
};
|
||||
|
||||
const handleError = (payload: WorkflowCopilotStreamError) => {
|
||||
const handleError = (payload: WorkflowCopilotStreamErrorUpdate) => {
|
||||
const errorMessage: ChatMessage = {
|
||||
id: Date.now().toString(),
|
||||
sender: "ai",
|
||||
@@ -437,12 +451,13 @@ export function WorkflowCopilotChat({
|
||||
await client.postStreaming<WorkflowCopilotSsePayload>(
|
||||
"/workflow/copilot/chat-post",
|
||||
{
|
||||
workflow_id: workflowId,
|
||||
workflow_permanent_id: workflowPermanentId,
|
||||
workflow_copilot_chat_id: workflowCopilotChatId,
|
||||
workflow_run_id: workflowRunId,
|
||||
message: messageContent,
|
||||
workflow_yaml: workflowYaml,
|
||||
},
|
||||
} as WorkflowCopilotChatRequest,
|
||||
(payload) => {
|
||||
switch (payload.type) {
|
||||
case "processing_update":
|
||||
|
||||
@@ -20,6 +20,7 @@ export interface WorkflowCopilotChatMessage {
|
||||
|
||||
export interface WorkflowCopilotChatRequest {
|
||||
workflow_permanent_id: string;
|
||||
workflow_id: string;
|
||||
workflow_copilot_chat_id?: string | null;
|
||||
workflow_run_id?: string | null;
|
||||
message: string;
|
||||
@@ -48,15 +49,15 @@ export interface WorkflowCopilotProcessingUpdate {
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export interface WorkflowCopilotStreamResponse {
|
||||
export interface WorkflowCopilotStreamResponseUpdate {
|
||||
type: "response";
|
||||
workflow_copilot_chat_id: string;
|
||||
message: string;
|
||||
updated_workflow_yaml?: string | null;
|
||||
updated_workflow?: Record<string, unknown> | null;
|
||||
response_time: string;
|
||||
}
|
||||
|
||||
export interface WorkflowCopilotStreamError {
|
||||
export interface WorkflowCopilotStreamErrorUpdate {
|
||||
type: "error";
|
||||
error: string;
|
||||
}
|
||||
|
||||
@@ -87,11 +87,7 @@ import { getWorkflowErrors, getElements } from "./workflowEditorUtils";
|
||||
import { WorkflowHeader } from "./WorkflowHeader";
|
||||
import { WorkflowHistoryPanel } from "./panels/WorkflowHistoryPanel";
|
||||
import { WorkflowVersion } from "../hooks/useWorkflowVersionsQuery";
|
||||
import {
|
||||
WorkflowApiResponse,
|
||||
WorkflowBlock,
|
||||
WorkflowSettings,
|
||||
} from "../types/workflowTypes";
|
||||
import { WorkflowApiResponse, WorkflowSettings } from "../types/workflowTypes";
|
||||
import { ProxyLocation } from "@/api/types";
|
||||
import {
|
||||
nodeAdderNode,
|
||||
@@ -100,16 +96,10 @@ import {
|
||||
generateNodeLabel,
|
||||
layout,
|
||||
startNode,
|
||||
upgradeWorkflowBlocksV1toV2,
|
||||
} from "./workflowEditorUtils";
|
||||
import { constructCacheKeyValue, getInitialParameters } from "./utils";
|
||||
import { WorkflowCopilotChat } from "../copilot/WorkflowCopilotChat";
|
||||
import { WorkflowCopilotButton } from "../copilot/WorkflowCopilotButton";
|
||||
import { parse as parseYAML } from "yaml";
|
||||
import {
|
||||
BlockYAML,
|
||||
WorkflowCreateYAMLRequest,
|
||||
} from "../types/workflowYamlTypes";
|
||||
import "./workspace-styles.css";
|
||||
|
||||
const Constants = {
|
||||
@@ -1664,68 +1654,36 @@ function Workspace({
|
||||
onClose={() => setIsCopilotOpen(false)}
|
||||
onMessageCountChange={setCopilotMessageCount}
|
||||
buttonRef={copilotButtonRef}
|
||||
onWorkflowUpdate={(workflowYaml) => {
|
||||
onWorkflowUpdate={(workflowData) => {
|
||||
try {
|
||||
const parsedYaml = parseYAML(
|
||||
workflowYaml,
|
||||
) as WorkflowCreateYAMLRequest;
|
||||
const saveData = workflowChangesStore.getSaveData?.();
|
||||
|
||||
const settings: WorkflowSettings = {
|
||||
proxyLocation:
|
||||
parsedYaml.proxy_location || ProxyLocation.Residential,
|
||||
webhookCallbackUrl: parsedYaml.webhook_callback_url || "",
|
||||
saveData?.settings.proxyLocation || ProxyLocation.Residential,
|
||||
webhookCallbackUrl: saveData?.settings.webhookCallbackUrl || "",
|
||||
persistBrowserSession:
|
||||
parsedYaml.persist_browser_session ?? false,
|
||||
model: parsedYaml.model ?? null,
|
||||
maxScreenshotScrolls: parsedYaml.max_screenshot_scrolls || 3,
|
||||
extraHttpHeaders: parsedYaml.extra_http_headers
|
||||
? JSON.stringify(parsedYaml.extra_http_headers)
|
||||
: null,
|
||||
runWith: parsedYaml.run_with ?? null,
|
||||
scriptCacheKey: parsedYaml.cache_key ?? null,
|
||||
aiFallback: parsedYaml.ai_fallback ?? true,
|
||||
runSequentially: parsedYaml.run_sequentially ?? false,
|
||||
sequentialKey: parsedYaml.sequential_key ?? null,
|
||||
finallyBlockLabel:
|
||||
parsedYaml.workflow_definition?.finally_block_label ?? null,
|
||||
saveData?.settings.persistBrowserSession ?? false,
|
||||
model: saveData?.settings.model ?? null,
|
||||
maxScreenshotScrolls:
|
||||
saveData?.settings.maxScreenshotScrolls || 3,
|
||||
extraHttpHeaders: saveData?.settings.extraHttpHeaders ?? null,
|
||||
runWith: saveData?.settings.runWith ?? null,
|
||||
scriptCacheKey: saveData?.settings.scriptCacheKey ?? null,
|
||||
aiFallback: saveData?.settings.aiFallback ?? true,
|
||||
runSequentially: saveData?.settings.runSequentially ?? false,
|
||||
sequentialKey: saveData?.settings.sequentialKey ?? null,
|
||||
finallyBlockLabel: workflowData?.finally_block_label ?? null,
|
||||
};
|
||||
|
||||
// Convert YAML blocks to internal format
|
||||
// YAML has parameter_keys (array of strings), internal format has parameters (array of objects)
|
||||
let blocks = (parsedYaml.workflow_definition?.blocks || []).map(
|
||||
(block: BlockYAML) => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const convertedBlock = { ...block } as any;
|
||||
|
||||
// Convert parameter_keys to parameters format
|
||||
if ("parameter_keys" in block) {
|
||||
convertedBlock.parameters = (block.parameter_keys || []).map(
|
||||
(key: string) => ({
|
||||
key,
|
||||
parameter_type: "workflow",
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
return convertedBlock;
|
||||
},
|
||||
) as WorkflowBlock[];
|
||||
|
||||
// Auto-upgrade v1 workflows to v2 by assigning sequential next_block_label values
|
||||
const workflowVersion =
|
||||
parsedYaml.workflow_definition?.version ?? 1;
|
||||
if (workflowVersion < 2) {
|
||||
blocks = upgradeWorkflowBlocksV1toV2(blocks);
|
||||
}
|
||||
|
||||
const elements = getElements(blocks, settings, true);
|
||||
const elements = getElements(workflowData.blocks, settings, true);
|
||||
|
||||
setNodes(elements.nodes);
|
||||
setEdges(elements.edges);
|
||||
|
||||
const initialParameters = getInitialParameters({
|
||||
workflow_definition: {
|
||||
parameters: parsedYaml.workflow_definition?.parameters || [],
|
||||
parameters: workflowData.parameters,
|
||||
},
|
||||
} as WorkflowApiResponse);
|
||||
useWorkflowParametersStore
|
||||
@@ -1734,11 +1692,14 @@ function Workspace({
|
||||
|
||||
workflowChangesStore.setHasChanges(true);
|
||||
} catch (error) {
|
||||
console.error("Failed to parse and apply workflow YAML", error);
|
||||
console.log("YAML:", workflowYaml);
|
||||
console.error(
|
||||
"Failed to parse and apply workflow",
|
||||
error,
|
||||
workflowData,
|
||||
);
|
||||
toast({
|
||||
title: "Update failed",
|
||||
description: "Failed to parse workflow YAML. Please try again.",
|
||||
description: "Failed to apply workflow update. Please try again.",
|
||||
variant: "destructive",
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, AsyncGenerator
|
||||
from typing import AsyncGenerator
|
||||
|
||||
import structlog
|
||||
import yaml
|
||||
@@ -29,7 +29,13 @@ from skyvern.forge.sdk.schemas.workflow_copilot import (
|
||||
WorkflowCopilotStreamResponseUpdate,
|
||||
)
|
||||
from skyvern.forge.sdk.services import org_auth_service
|
||||
from skyvern.schemas.workflows import LoginBlockYAML, WorkflowCreateYAMLRequest
|
||||
from skyvern.forge.sdk.workflow.models.parameter import ParameterType
|
||||
from skyvern.forge.sdk.workflow.models.workflow import WorkflowDefinition
|
||||
from skyvern.forge.sdk.workflow.workflow_definition_converter import convert_workflow_definition
|
||||
from skyvern.schemas.workflows import (
|
||||
LoginBlockYAML,
|
||||
WorkflowCreateYAMLRequest,
|
||||
)
|
||||
|
||||
WORKFLOW_KNOWLEDGE_BASE_PATH = Path("skyvern/forge/prompts/skyvern/workflow_knowledge_base.txt")
|
||||
CHAT_HISTORY_CONTEXT_MESSAGES = 10
|
||||
@@ -88,7 +94,7 @@ async def copilot_call_llm(
|
||||
chat_history: list[WorkflowCopilotChatHistoryMessage],
|
||||
global_llm_context: str | None,
|
||||
debug_run_info_text: str,
|
||||
) -> tuple[str, str | None, str | None]:
|
||||
) -> tuple[str, WorkflowDefinition | None, str | None]:
|
||||
current_datetime = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
chat_history_text = ""
|
||||
@@ -177,8 +183,8 @@ async def copilot_call_llm(
|
||||
global_llm_context = str(global_llm_context)
|
||||
|
||||
if action_type == "REPLACE_WORKFLOW":
|
||||
workflow_yaml = await _process_workflow_yaml(action_data)
|
||||
return user_response, workflow_yaml, global_llm_context
|
||||
updated_workflow = await _process_workflow_yaml(chat_request.workflow_id, action_data.get("workflow_yaml", ""))
|
||||
return user_response, updated_workflow, global_llm_context
|
||||
elif action_type == "REPLY":
|
||||
return user_response, None, global_llm_context
|
||||
elif action_type == "ASK_QUESTION":
|
||||
@@ -192,17 +198,11 @@ async def copilot_call_llm(
|
||||
return "I received your request but I'm not sure how to help. Could you rephrase?", None, None
|
||||
|
||||
|
||||
async def _process_workflow_yaml(action_data: dict[str, Any]) -> None | str:
|
||||
workflow_yaml = action_data.get("workflow_yaml", "")
|
||||
|
||||
async def _process_workflow_yaml(workflow_id: str, workflow_yaml: str) -> WorkflowDefinition:
|
||||
try:
|
||||
parsed_yaml = yaml.safe_load(workflow_yaml)
|
||||
except yaml.YAMLError as e:
|
||||
LOG.error(
|
||||
"Invalid YAML from LLM",
|
||||
error=str(e),
|
||||
yaml=f"\n{str(e)}\n{workflow_yaml}",
|
||||
)
|
||||
LOG.error("Invalid YAML from LLM", yaml=workflow_yaml, exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"LLM generated invalid YAML: {str(e)}",
|
||||
@@ -216,25 +216,28 @@ async def _process_workflow_yaml(action_data: dict[str, Any]) -> None | str:
|
||||
for block in blocks:
|
||||
block["title"] = block.get("title", "")
|
||||
|
||||
workflow = WorkflowCreateYAMLRequest.model_validate(parsed_yaml)
|
||||
workflow_yaml_request = WorkflowCreateYAMLRequest.model_validate(parsed_yaml)
|
||||
|
||||
# Post-processing
|
||||
for block in workflow.workflow_definition.blocks:
|
||||
for block in workflow_yaml_request.workflow_definition.blocks:
|
||||
if isinstance(block, LoginBlockYAML) and not block.navigation_goal:
|
||||
block.navigation_goal = DEFAULT_LOGIN_PROMPT
|
||||
|
||||
workflow_yaml = yaml.safe_dump(workflow.model_dump(mode="json"), sort_keys=False)
|
||||
except Exception as e:
|
||||
LOG.error(
|
||||
"YAML from LLM does not conform to Skyvern workflow schema",
|
||||
error=str(e),
|
||||
yaml=f"\n{str(e)}\n{workflow_yaml}",
|
||||
workflow_yaml_request.workflow_definition.parameters = [
|
||||
p for p in workflow_yaml_request.workflow_definition.parameters if p.parameter_type != ParameterType.OUTPUT
|
||||
]
|
||||
|
||||
updated_workflow = convert_workflow_definition(
|
||||
workflow_definition_yaml=workflow_yaml_request.workflow_definition,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.error("YAML from LLM does not conform to Skyvern workflow schema", yaml=workflow_yaml, exc_info=True)
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"LLM generated YAML that doesn't match workflow schema: {str(e)}",
|
||||
)
|
||||
return workflow_yaml
|
||||
return updated_workflow
|
||||
|
||||
|
||||
@base_router.post("/workflow/copilot/chat-post", include_in_schema=False)
|
||||
@@ -314,7 +317,7 @@ async def workflow_copilot_chat_post(
|
||||
)
|
||||
return
|
||||
|
||||
user_response, updated_workflow_yaml, updated_global_llm_context = await copilot_call_llm(
|
||||
user_response, updated_workflow, updated_global_llm_context = await copilot_call_llm(
|
||||
organization.organization_id,
|
||||
chat_request,
|
||||
convert_to_history_messages(chat_messages[-CHAT_HISTORY_CONTEXT_MESSAGES:]),
|
||||
@@ -349,7 +352,7 @@ async def workflow_copilot_chat_post(
|
||||
type=WorkflowCopilotStreamMessageType.RESPONSE,
|
||||
workflow_copilot_chat_id=chat.workflow_copilot_chat_id,
|
||||
message=user_response,
|
||||
updated_workflow_yaml=updated_workflow_yaml,
|
||||
updated_workflow=updated_workflow.model_dump(mode="json") if updated_workflow else None,
|
||||
response_time=assistant_message.created_at,
|
||||
).model_dump(mode="json"),
|
||||
)
|
||||
|
||||
@@ -33,6 +33,7 @@ class WorkflowCopilotChatMessage(BaseModel):
|
||||
|
||||
class WorkflowCopilotChatRequest(BaseModel):
|
||||
workflow_permanent_id: str = Field(..., description="Workflow permanent ID for the chat")
|
||||
workflow_id: str = Field(..., description="Workflow permanent ID for the chat")
|
||||
workflow_copilot_chat_id: str | None = Field(None, description="The chat ID to send the message to")
|
||||
workflow_run_id: str | None = Field(None, description="The workflow run ID to use for the context")
|
||||
message: str = Field(..., description="The message that user sends")
|
||||
@@ -70,7 +71,7 @@ class WorkflowCopilotStreamResponseUpdate(BaseModel):
|
||||
)
|
||||
workflow_copilot_chat_id: str = Field(..., description="The chat ID")
|
||||
message: str = Field(..., description="The message sent to the user")
|
||||
updated_workflow_yaml: str | None = Field(None, description="The updated workflow yaml")
|
||||
updated_workflow: dict | None = Field(None, description="The updated workflow")
|
||||
response_time: datetime = Field(..., description="When the assistant message was created")
|
||||
|
||||
|
||||
|
||||
@@ -2981,14 +2981,10 @@ class WorkflowService:
|
||||
self,
|
||||
workflow_id: str,
|
||||
workflow_definition_yaml: WorkflowDefinitionYAML,
|
||||
title: str,
|
||||
organization_id: str,
|
||||
) -> WorkflowDefinition:
|
||||
workflow_definition = convert_workflow_definition(
|
||||
workflow_id=workflow_id,
|
||||
workflow_definition_yaml=workflow_definition_yaml,
|
||||
title=title,
|
||||
organization_id=organization_id,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
|
||||
await app.DATABASE.save_workflow_definition_parameters(workflow_definition.parameters)
|
||||
@@ -3080,8 +3076,6 @@ class WorkflowService:
|
||||
workflow_definition = await self.make_workflow_definition(
|
||||
potential_workflow.workflow_id,
|
||||
request.workflow_definition,
|
||||
request.title,
|
||||
organization_id,
|
||||
)
|
||||
|
||||
updated_workflow = await self.update_workflow_definition(
|
||||
|
||||
@@ -85,10 +85,8 @@ LOG = structlog.get_logger()
|
||||
|
||||
|
||||
def convert_workflow_definition(
|
||||
workflow_id: str,
|
||||
workflow_definition_yaml: WorkflowDefinitionYAML,
|
||||
title: str,
|
||||
organization_id: str,
|
||||
workflow_id: str,
|
||||
) -> WorkflowDefinition:
|
||||
# Create parameters from the request
|
||||
parameters: dict[str, PARAMETER_TYPE] = {}
|
||||
@@ -311,11 +309,9 @@ def convert_workflow_definition(
|
||||
)
|
||||
|
||||
LOG.info(
|
||||
f"Created workflow from request, title: {title}",
|
||||
"Created workflow from request",
|
||||
parameter_keys=[parameter.key for parameter in parameters.values()],
|
||||
block_labels=[block.label for block in blocks],
|
||||
organization_id=organization_id,
|
||||
title=title,
|
||||
workflow_id=workflow_id,
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user