From 8b84cae5a4557316494938b95fe419ebcd9e0e4b Mon Sep 17 00:00:00 2001 From: Marc Kelechava Date: Mon, 9 Feb 2026 18:46:45 -0800 Subject: [PATCH] =?UTF-8?q?SKY-7807/7808:=20Workflow=20MCP=20tools=20?= =?UTF-8?q?=E2=80=94=20CRUD=20+=20execution=20(#4675)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- skyvern/cli/core/result.py | 3 + skyvern/cli/mcp_tools/__init__.py | 121 ++- skyvern/cli/mcp_tools/workflow.py | 704 ++++++++++++++++++ .../forge/sdk/api/llm/api_handler_factory.py | 61 +- 4 files changed, 812 insertions(+), 77 deletions(-) create mode 100644 skyvern/cli/mcp_tools/workflow.py diff --git a/skyvern/cli/core/result.py b/skyvern/cli/core/result.py index 67ceb864..437cf48d 100644 --- a/skyvern/cli/core/result.py +++ b/skyvern/cli/core/result.py @@ -15,6 +15,9 @@ class ErrorCode: SDK_ERROR = "SDK_ERROR" TIMEOUT = "TIMEOUT" INVALID_INPUT = "INVALID_INPUT" + WORKFLOW_NOT_FOUND = "WORKFLOW_NOT_FOUND" + RUN_NOT_FOUND = "RUN_NOT_FOUND" + API_ERROR = "API_ERROR" @dataclass diff --git a/skyvern/cli/mcp_tools/__init__.py b/skyvern/cli/mcp_tools/__init__.py index ad85afb5..b4b0cefb 100644 --- a/skyvern/cli/mcp_tools/__init__.py +++ b/skyvern/cli/mcp_tools/__init__.py @@ -1,7 +1,8 @@ """Skyvern MCP Tools. -This module provides MCP (Model Context Protocol) tools for browser automation. -Tools are registered with FastMCP and can be used by AI assistants like Claude. +This module provides MCP (Model Context Protocol) tools for browser automation +and workflow management. Tools are registered with FastMCP and can be used by +AI assistants like Claude. """ from fastmcp import FastMCP @@ -28,6 +29,16 @@ from .session import ( skyvern_session_get, skyvern_session_list, ) +from .workflow import ( + skyvern_workflow_cancel, + skyvern_workflow_create, + skyvern_workflow_delete, + skyvern_workflow_get, + skyvern_workflow_list, + skyvern_workflow_run, + skyvern_workflow_status, + skyvern_workflow_update, +) mcp = FastMCP( "Skyvern", @@ -41,6 +52,9 @@ Reach for Skyvern tools when the user asks you to: - Check the current state of a web page or verify something on a site - Do anything you would otherwise attempt with requests, beautifulsoup, selenium, or playwright - Access website data where you are unsure whether an API endpoint exists +- Create, run, monitor, or manage web automations (Skyvern workflows) +- Set up reusable, parameterized automations that run on Skyvern's cloud +- Check the status of running automations or retrieve their results DO NOT try to scrape websites by guessing API endpoints or writing HTTP requests. Instead, use skyvern_navigate + skyvern_extract to get real data from actual pages. @@ -56,8 +70,58 @@ These tools give you a real browser — use them instead of writing scraping cod | "Fill out this form" | skyvern_act | | "Log in and buy the first item" | skyvern_run_task | | "Is checkout complete?" | skyvern_validate | +| "List my workflows" | skyvern_workflow_list | +| "Create a workflow that monitors prices" | skyvern_workflow_create | +| "Run the login workflow" | skyvern_workflow_run | +| "Is my workflow done?" | skyvern_workflow_status | +| "Set up a reusable automation for this" | Explore with browser tools, then skyvern_workflow_create | | "Write a script to do this" | Skyvern SDK (see below) | +## Getting Started + +**Visiting a website** (extracting data, filling forms, interacting with a page): +1. Create a session with skyvern_session_create +2. Navigate and interact with browser tools +3. Close with skyvern_session_close when done + +**Managing automations** (running, listing, or monitoring workflows): +No browser session needed — use workflow tools directly: +skyvern_workflow_list, skyvern_workflow_run, skyvern_workflow_status, etc. + +**Building a reusable automation** (explore a site, then save as a workflow): +1. **Explore** — Create a browser session, navigate the site, use skyvern_extract and skyvern_screenshot to understand the page structure +2. **Create** — Build a workflow definition and save it with skyvern_workflow_create +3. **Test** — Run the workflow with skyvern_workflow_run and check results with skyvern_workflow_status + +## Workflows vs Scripts + +When the user wants something **persistent, versioned, and managed in Skyvern's dashboard** — create a workflow. +Trigger words: "automation", "workflow", "reusable", "schedule", "monitor", "set up" +→ Use skyvern_workflow_create with a JSON definition (see example below) + +When the user wants **custom Python code** to run in their own environment — write an SDK script. +Trigger words: "script", "code", "function", "program" +→ Use `from skyvern import Skyvern` (see Writing Scripts section) + +### Workflow definition example (JSON, for skyvern_workflow_create): + { + "title": "Price Monitor", + "workflow_definition": { + "parameters": [ + {"parameter_type": "workflow", "key": "url", "workflow_parameter_type": "string"} + ], + "blocks": [ + {"block_type": "task", "label": "extract_prices", "url": "{{url}}", "engine": "skyvern-2.0", + "navigation_goal": "Extract all product names and prices from the page", + "data_extraction_goal": "Get product names and prices as a list", + "data_schema": {"type": "object", "properties": {"products": {"type": "array", + "items": {"type": "object", "properties": {"name": {"type": "string"}, "price": {"type": "string"}}}}}}} + ] + } + } +Use `{{parameter_key}}` to reference workflow parameters in block fields. +To inspect a real workflow for reference, use skyvern_workflow_get on an existing workflow. + ## Writing Scripts and Code When asked to write an automation script, use the Skyvern Python SDK with the **hybrid xpath+prompt pattern** for production-quality scripts. The hybrid form tries the xpath/selector first (fast, @@ -86,12 +150,6 @@ The `resolved_selector` field in responses gives you the xpath the AI resolved t IMPORTANT: NEVER import from skyvern.cli.mcp_tools — those are internal server modules. The public SDK is: from skyvern import Skyvern -## Recommended Workflow -1. **Connect** — Create or connect to a browser session -2. **Explore** — Navigate pages, take screenshots, extract data with AI -3. **Build** — Capture selectors and data schemas to construct deterministic workflows -4. **Test** — Validate workflows via skyvern_run_task - ## Primary Tools (use these first) These are the tools you should reach for by default: @@ -126,28 +184,32 @@ Precision tools support three modes. When unsure, use `intent`. 3. **Selector mode** — deterministic CSS/XPath targeting: `skyvern_click(selector="#submit-btn")` -## Replay Story: From Exploration to Production Scripts +## Replay Story: From Exploration to Production When you use precision tools (skyvern_click, skyvern_type, etc.) with intent mode, the response -includes `resolved_selector` — the xpath/CSS the AI found. Capture these to build hybrid scripts. +includes `resolved_selector` — the xpath/CSS the AI found. Capture these for hybrid scripts or +workflow definitions. **The hybrid pattern** is the recommended default for SDK scripts: await page.click("xpath=//button[@id='submit']", prompt="the Submit button") It tries the selector first (fast, no AI cost), then falls back to AI if the selector breaks. -**Workflow for generating scripts:** -1. Explore: Use skyvern_click(intent="Submit button") during interactive exploration -2. Capture: Note the `resolved_selector` from the response (e.g., "//button[@id='submit']") -3. Script: Write `page.click("xpath=//button[@id='submit']", prompt="Submit button")` - The `sdk_equivalent` field in each tool response shows the correct hybrid call to use in scripts. -Always prefer hybrid xpath+prompt over prompt-only in generated scripts. Note: Currently only skyvern_click returns resolved_selector. Support for skyvern_type and -skyvern_select_option is planned (SKY-7905). For those tools, use the selector you provided -as input, or fall back to prompt-only until SKY-7905 ships. +skyvern_select_option is planned (SKY-7905). -## Getting Started -Create a session with skyvern_session_create, then use browser tools to interact with pages. +## Workflow Management +Use these tools to create, manage, and run Skyvern workflows programmatically. +Workflows are persistent, versioned, multi-step automations that can be parameterized and scheduled. + +- **skyvern_workflow_list** — Find workflows by name or browse all available workflows +- **skyvern_workflow_get** — Get the full definition of a workflow to inspect its blocks and parameters +- **skyvern_workflow_create** — Create a new workflow from a YAML or JSON definition +- **skyvern_workflow_update** — Update an existing workflow's definition (creates a new version) +- **skyvern_workflow_delete** — Delete a workflow (requires force=true confirmation) +- **skyvern_workflow_run** — Execute a workflow with parameters (returns immediately by default, or wait for completion) +- **skyvern_workflow_status** — Check the status and progress of a running or completed workflow run +- **skyvern_workflow_cancel** — Cancel a running workflow """, ) @@ -175,6 +237,16 @@ mcp.tool()(skyvern_select_option) mcp.tool()(skyvern_press_key) mcp.tool()(skyvern_wait) +# -- Workflow management (CRUD + execution, no browser needed) -- +mcp.tool()(skyvern_workflow_list) +mcp.tool()(skyvern_workflow_get) +mcp.tool()(skyvern_workflow_create) +mcp.tool()(skyvern_workflow_update) +mcp.tool()(skyvern_workflow_delete) +mcp.tool()(skyvern_workflow_run) +mcp.tool()(skyvern_workflow_status) +mcp.tool()(skyvern_workflow_cancel) + __all__ = [ "mcp", # Session @@ -198,4 +270,13 @@ __all__ = [ "skyvern_select_option", "skyvern_press_key", "skyvern_wait", + # Workflow management + "skyvern_workflow_list", + "skyvern_workflow_get", + "skyvern_workflow_create", + "skyvern_workflow_update", + "skyvern_workflow_delete", + "skyvern_workflow_run", + "skyvern_workflow_status", + "skyvern_workflow_cancel", ] diff --git a/skyvern/cli/mcp_tools/workflow.py b/skyvern/cli/mcp_tools/workflow.py new file mode 100644 index 00000000..b8ca602d --- /dev/null +++ b/skyvern/cli/mcp_tools/workflow.py @@ -0,0 +1,704 @@ +"""Skyvern MCP workflow tools — CRUD and execution for Skyvern workflows. + +Tools for listing, creating, updating, deleting, running, and monitoring +Skyvern workflows via the Skyvern HTTP API. These tools do not require a +browser session. +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Annotated, Any + +import structlog +from pydantic import Field + +from skyvern.client.errors import NotFoundError +from skyvern.client.types import WorkflowCreateYamlRequest +from skyvern.schemas.runs import ProxyLocation + +from ._common import ErrorCode, Timer, make_error, make_result +from ._session import get_skyvern + +LOG = structlog.get_logger() + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _serialize_workflow(wf: Any) -> dict[str, Any]: + """Pick the fields we expose from a Workflow Pydantic model. + + Uses Any to avoid tight coupling with Fern-generated client types. + """ + return { + "workflow_permanent_id": wf.workflow_permanent_id, + "workflow_id": wf.workflow_id, + "title": wf.title, + "version": wf.version, + "status": str(wf.status) if wf.status else None, + "description": wf.description, + "is_saved_task": wf.is_saved_task, + "folder_id": wf.folder_id, + "created_at": wf.created_at.isoformat() if wf.created_at else None, + "modified_at": wf.modified_at.isoformat() if wf.modified_at else None, + } + + +def _serialize_workflow_full(wf: Any) -> dict[str, Any]: + """Like _serialize_workflow but includes the full definition.""" + data = _serialize_workflow(wf) + if hasattr(wf, "workflow_definition") and wf.workflow_definition is not None: + try: + data["workflow_definition"] = wf.workflow_definition.model_dump(mode="json") + except Exception: + data["workflow_definition"] = str(wf.workflow_definition) + return data + + +def _serialize_run(run: Any) -> dict[str, Any]: + """Pick fields from a run response (GetRunResponse variant or WorkflowRunResponse). + + Uses Any to avoid tight coupling with Fern-generated client types. + """ + data: dict[str, Any] = { + "run_id": run.run_id, + "status": str(run.status) if run.status else None, + } + for field in ( + "run_type", + "step_count", + "failure_reason", + "recording_url", + "app_url", + "browser_session_id", + "run_with", + ): + val = getattr(run, field, None) + if val is not None: + data[field] = str(val) if not isinstance(val, (str, int, bool)) else val + + if hasattr(run, "output") and run.output is not None: + try: + data["output"] = run.output.model_dump(mode="json") if hasattr(run.output, "model_dump") else run.output + except Exception: + data["output"] = str(run.output) + + for ts_field in ("created_at", "modified_at", "started_at", "finished_at", "queued_at"): + val = getattr(run, ts_field, None) + if val is not None: + data[ts_field] = val.isoformat() + + return data + + +def _validate_workflow_id(workflow_id: str, action: str) -> dict[str, Any] | None: + """Validate workflow_id format. Returns a make_result error dict or None if valid.""" + if "/" in workflow_id or "\\" in workflow_id: + return make_result( + action, + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + "workflow_id must not contain path separators", + "Provide a valid workflow permanent ID (starts with wpid_)", + ), + ) + if not workflow_id.startswith("wpid_"): + return make_result( + action, + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Invalid workflow_id format: {workflow_id!r}", + "Workflow IDs start with wpid_. Use skyvern_workflow_list to find valid IDs.", + ), + ) + return None + + +def _validate_run_id(run_id: str, action: str) -> dict[str, Any] | None: + """Validate run_id format. Returns a make_result error dict or None if valid.""" + if "/" in run_id or "\\" in run_id: + return make_result( + action, + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + "run_id must not contain path separators", + "Provide a valid run ID (starts with wr_ or tsk_v2_)", + ), + ) + if not run_id.startswith("wr_") and not run_id.startswith("tsk_v2_"): + return make_result( + action, + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Invalid run_id format: {run_id!r}", + "Run IDs start with wr_ (workflow runs) or tsk_v2_ (task runs). Check skyvern_workflow_run output.", + ), + ) + return None + + +async def _get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, Any]: + """Fetch a single workflow by ID via the Skyvern API. + + The Fern-generated client has get_workflows() (list) but no get_workflow(id). + This helper isolates the private client access so the workaround is contained + in one place. Replace with ``skyvern.get_workflow(id)`` when the SDK adds it. + + Raises NotFoundError on 404, or RuntimeError on other HTTP errors, so callers + can use the same ``except NotFoundError`` pattern as all other workflow tools. + """ + skyvern = get_skyvern() + params: dict[str, Any] = {} + if version is not None: + params["version"] = version + # TODO(SKY-7807): Replace with skyvern.get_workflow() when the Fern client adds it. + response = await skyvern._client_wrapper.httpx_client.request( + f"api/v1/workflows/{workflow_id}", + method="GET", + params=params, + ) + if response.status_code == 404: + raise NotFoundError(body={"detail": f"Workflow {workflow_id!r} not found"}) + if response.status_code >= 400: + detail = "" + try: + detail = response.json().get("detail", response.text) + except Exception: + detail = response.text + raise RuntimeError(f"HTTP {response.status_code}: {detail}") + return response.json() + + +def _validate_definition_structure(json_def: WorkflowCreateYamlRequest | None, action: str) -> dict[str, Any] | None: + """Validate required fields in a JSON workflow definition. + + Returns a make_result error dict if validation fails, or None if valid. + Only validates JSON definitions — YAML is validated server-side. + Note: WorkflowCreateYamlRequest already enforces ``title`` and + ``workflow_definition`` as required fields via Pydantic, so this is + a belt-and-suspenders check that produces user-friendly error messages. + """ + if json_def is None: + return None + if not json_def.title: + return make_result( + action, + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + "Workflow definition missing 'title' field", + "Add a 'title' field to your workflow definition", + ), + ) + return None + + +def _parse_definition( + definition: str, fmt: str +) -> tuple[WorkflowCreateYamlRequest | None, str | None, dict[str, Any] | None]: + """Parse a workflow definition string. + + Returns (json_definition, yaml_definition, error). + Exactly one of the first two will be set on success, or error on failure. + JSON input is parsed into a WorkflowCreateYamlRequest (the type the SDK expects). + """ + if fmt == "json": + try: + raw = json.loads(definition) + return WorkflowCreateYamlRequest(**raw), None, None + except (json.JSONDecodeError, TypeError) as e: + return ( + None, + None, + make_error( + ErrorCode.INVALID_INPUT, + f"Invalid JSON definition: {e}", + "Provide a valid JSON object for the workflow definition", + ), + ) + except Exception as e: + return ( + None, + None, + make_error( + ErrorCode.INVALID_INPUT, + f"Invalid workflow definition: {e}", + "Check the workflow definition fields (title, workflow_definition with blocks)", + ), + ) + elif fmt == "yaml": + return None, definition, None + else: + # auto: try JSON first, fall back to YAML + try: + raw = json.loads(definition) + return WorkflowCreateYamlRequest(**raw), None, None + except (json.JSONDecodeError, TypeError): + return None, definition, None + except Exception: + # JSON parsed but failed model validation — treat as YAML + return None, definition, None + + +# --------------------------------------------------------------------------- +# SKY-7807: Workflow CRUD +# --------------------------------------------------------------------------- + + +async def skyvern_workflow_list( + search: Annotated[str | None, "Search across workflow titles, folder names, and parameter metadata"] = None, + page: Annotated[int, Field(description="Page number (1-based)", ge=1)] = 1, + page_size: Annotated[int, Field(description="Results per page", ge=1, le=100)] = 10, + only_workflows: Annotated[bool, "Only return multi-step workflows (exclude saved tasks)"] = False, +) -> dict[str, Any]: + """Find and browse available Skyvern workflows. Use when you need to discover what workflows exist, + search for a workflow by name, or list all workflows for an organization.""" + skyvern = get_skyvern() + + with Timer() as timer: + try: + workflows = await skyvern.get_workflows( + search_key=search, + page=page, + page_size=page_size, + only_workflows=only_workflows, + ) + timer.mark("sdk") + except Exception as e: + return make_result( + "skyvern_workflow_list", + ok=False, + timing_ms=timer.timing_ms, + error=make_error(ErrorCode.API_ERROR, str(e), "Check your API key and Skyvern connection"), + ) + + return make_result( + "skyvern_workflow_list", + data={ + "workflows": [_serialize_workflow(wf) for wf in workflows], + "page": page, + "page_size": page_size, + "count": len(workflows), + "has_more": len(workflows) == page_size, + "sdk_equivalent": f"await skyvern.get_workflows(search_key={search!r}, page={page}, page_size={page_size})", + }, + timing_ms=timer.timing_ms, + ) + + +async def skyvern_workflow_get( + workflow_id: Annotated[str, "Workflow permanent ID (starts with wpid_)"], + version: Annotated[int | None, "Specific version to retrieve (latest if omitted)"] = None, +) -> dict[str, Any]: + """Get the full definition of a specific workflow. Use when you need to inspect a workflow's + blocks, parameters, and configuration before running or updating it.""" + if err := _validate_workflow_id(workflow_id, "skyvern_workflow_get"): + return err + + with Timer() as timer: + try: + wf_data = await _get_workflow_by_id(workflow_id, version) + timer.mark("sdk") + except NotFoundError: + return make_result( + "skyvern_workflow_get", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.WORKFLOW_NOT_FOUND, + f"Workflow {workflow_id!r} not found", + "Verify the workflow ID with skyvern_workflow_list", + ), + ) + except Exception as e: + return make_result( + "skyvern_workflow_get", + ok=False, + timing_ms=timer.timing_ms, + error=make_error(ErrorCode.API_ERROR, str(e), "Check your API key and workflow ID"), + ) + + version_str = f", version={version}" if version is not None else "" + return make_result( + "skyvern_workflow_get", + data={ + **wf_data, + "sdk_equivalent": f"# No SDK method yet — GET /api/v1/workflows/{workflow_id}{version_str}", + }, + timing_ms=timer.timing_ms, + ) + + +async def skyvern_workflow_create( + definition: Annotated[str, "Workflow definition as a YAML or JSON string"], + format: Annotated[ # noqa: A002 + str, Field(description="Definition format: 'json', 'yaml', or 'auto' (tries JSON first, falls back to YAML)") + ] = "auto", + folder_id: Annotated[str | None, "Folder ID (fld_...) to organize the workflow in"] = None, +) -> dict[str, Any]: + """Create a new Skyvern workflow from a YAML or JSON definition. Use when you need to save + a new automation workflow that can be run repeatedly with different parameters.""" + if format not in ("json", "yaml", "auto"): + return make_result( + "skyvern_workflow_create", + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Invalid format: {format!r}", + "Use 'json', 'yaml', or 'auto'", + ), + ) + + json_def, yaml_def, parse_err = _parse_definition(definition, format) + if parse_err is not None: + return make_result("skyvern_workflow_create", ok=False, error=parse_err) + + if err := _validate_definition_structure(json_def, "skyvern_workflow_create"): + return err + + skyvern = get_skyvern() + + with Timer() as timer: + try: + workflow = await skyvern.create_workflow( + json_definition=json_def, + yaml_definition=yaml_def, + folder_id=folder_id, + ) + timer.mark("sdk") + except Exception as e: + LOG.error("workflow_create_failed", error=str(e)) + return make_result( + "skyvern_workflow_create", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.API_ERROR, + str(e), + "Check the workflow definition syntax and required fields (title, workflow_definition.blocks)", + ), + ) + + LOG.info("workflow_created", workflow_id=workflow.workflow_permanent_id) + data = _serialize_workflow(workflow) + fmt_label = "json_definition" if json_def is not None else "yaml_definition" + folder_str = f", folder_id={folder_id!r}" if folder_id is not None else "" + data["sdk_equivalent"] = f"await skyvern.create_workflow({fmt_label}={folder_str})" + return make_result("skyvern_workflow_create", data=data, timing_ms=timer.timing_ms) + + +async def skyvern_workflow_update( + workflow_id: Annotated[str, "Workflow permanent ID (wpid_...) to update"], + definition: Annotated[str, "Updated workflow definition as a YAML or JSON string"], + format: Annotated[ # noqa: A002 + str, Field(description="Definition format: 'json', 'yaml', or 'auto'") + ] = "auto", +) -> dict[str, Any]: + """Update an existing workflow's definition. Use when you need to modify a workflow's blocks, + parameters, or configuration. Creates a new version of the workflow.""" + if err := _validate_workflow_id(workflow_id, "skyvern_workflow_update"): + return err + + if format not in ("json", "yaml", "auto"): + return make_result( + "skyvern_workflow_update", + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Invalid format: {format!r}", + "Use 'json', 'yaml', or 'auto'", + ), + ) + + json_def, yaml_def, parse_err = _parse_definition(definition, format) + if parse_err is not None: + return make_result("skyvern_workflow_update", ok=False, error=parse_err) + + if err := _validate_definition_structure(json_def, "skyvern_workflow_update"): + return err + + skyvern = get_skyvern() + + with Timer() as timer: + try: + workflow = await skyvern.update_workflow( + workflow_id, + json_definition=json_def, + yaml_definition=yaml_def, + ) + timer.mark("sdk") + except Exception as e: + return make_result( + "skyvern_workflow_update", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.API_ERROR, + str(e), + "Check the workflow ID and definition syntax", + ), + ) + + data = _serialize_workflow(workflow) + fmt_label = "json_definition" if json_def is not None else "yaml_definition" + data["sdk_equivalent"] = f"await skyvern.update_workflow({workflow_id!r}, {fmt_label}=)" + return make_result("skyvern_workflow_update", data=data, timing_ms=timer.timing_ms) + + +async def skyvern_workflow_delete( + workflow_id: Annotated[str, "Workflow permanent ID (wpid_...) to delete"], + force: Annotated[bool, "Must be true to confirm deletion — prevents accidental deletes"] = False, +) -> dict[str, Any]: + """Delete a workflow permanently. Use when you need to remove a workflow that is no longer needed. + Requires force=true to prevent accidental deletion.""" + if err := _validate_workflow_id(workflow_id, "skyvern_workflow_delete"): + return err + + if not force: + return make_result( + "skyvern_workflow_delete", + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Deletion of workflow {workflow_id!r} requires confirmation", + "Set force=true to confirm deletion. This action is irreversible.", + ), + ) + + skyvern = get_skyvern() + + with Timer() as timer: + try: + await skyvern.delete_workflow(workflow_id) + timer.mark("sdk") + except NotFoundError: + return make_result( + "skyvern_workflow_delete", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.WORKFLOW_NOT_FOUND, + f"Workflow {workflow_id!r} not found", + "Verify the workflow ID with skyvern_workflow_list", + ), + ) + except Exception as e: + LOG.error("workflow_delete_failed", workflow_id=workflow_id, error=str(e)) + return make_result( + "skyvern_workflow_delete", + ok=False, + timing_ms=timer.timing_ms, + error=make_error(ErrorCode.API_ERROR, str(e), "Check the workflow ID and your permissions"), + ) + + LOG.info("workflow_deleted", workflow_id=workflow_id) + return make_result( + "skyvern_workflow_delete", + data={ + "workflow_permanent_id": workflow_id, + "deleted": True, + "sdk_equivalent": f"await skyvern.delete_workflow({workflow_id!r})", + }, + timing_ms=timer.timing_ms, + ) + + +# --------------------------------------------------------------------------- +# SKY-7808: Workflow Execution +# --------------------------------------------------------------------------- + + +async def skyvern_workflow_run( + workflow_id: Annotated[str, "Workflow permanent ID (wpid_...) to run"], + parameters: Annotated[str | None, Field(description="JSON string of workflow parameters")] = None, + browser_session_id: Annotated[ + str | None, Field(description="Reuse an existing browser session (pbs_...) to preserve login state") + ] = None, + webhook_url: Annotated[str | None, Field(description="URL for status webhook callbacks after completion")] = None, + proxy_location: Annotated[ + str | None, Field(description="Geographic proxy: RESIDENTIAL, RESIDENTIAL_GB, NONE, etc.") + ] = None, + wait: Annotated[bool, "Wait for the workflow to complete before returning (default: return immediately)"] = False, + timeout_seconds: Annotated[ + int, Field(description="Max wait time in seconds when wait=true (default 300)", ge=10, le=3600) + ] = 300, +) -> dict[str, Any]: + """Run a Skyvern workflow with parameters. Use when you need to execute an automation workflow. + Returns immediately by default (async) — set wait=true to block until completion. + Default timeout is 300s (5 minutes). For longer workflows, increase timeout_seconds + or use wait=false and poll with skyvern_workflow_status.""" + if err := _validate_workflow_id(workflow_id, "skyvern_workflow_run"): + return err + + parsed_params: dict[str, Any] | None = None + if parameters is not None: + try: + parsed_params = json.loads(parameters) + except (json.JSONDecodeError, TypeError) as e: + return make_result( + "skyvern_workflow_run", + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Invalid parameters JSON: {e}", + "Provide parameters as a valid JSON object string", + ), + ) + + proxy: ProxyLocation | None = None + if proxy_location is not None: + try: + proxy = ProxyLocation(proxy_location) + except ValueError: + return make_result( + "skyvern_workflow_run", + ok=False, + error=make_error( + ErrorCode.INVALID_INPUT, + f"Invalid proxy_location: {proxy_location!r}", + "Use RESIDENTIAL, RESIDENTIAL_GB, NONE, etc.", + ), + ) + + skyvern = get_skyvern() + + with Timer() as timer: + try: + run = await skyvern.run_workflow( + workflow_id=workflow_id, + parameters=parsed_params, + browser_session_id=browser_session_id, + webhook_url=webhook_url, + proxy_location=proxy, + wait_for_completion=wait, + timeout=timeout_seconds, + ) + timer.mark("sdk") + except asyncio.TimeoutError: + return make_result( + "skyvern_workflow_run", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.TIMEOUT, + f"Workflow did not complete within {timeout_seconds}s", + "Increase timeout_seconds or set wait=false and poll with skyvern_workflow_status", + ), + ) + except NotFoundError: + return make_result( + "skyvern_workflow_run", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.WORKFLOW_NOT_FOUND, + f"Workflow {workflow_id!r} not found", + "Verify the workflow ID with skyvern_workflow_list", + ), + ) + except Exception as e: + LOG.error("workflow_run_failed", workflow_id=workflow_id, error=str(e)) + return make_result( + "skyvern_workflow_run", + ok=False, + timing_ms=timer.timing_ms, + error=make_error(ErrorCode.API_ERROR, str(e), "Check the workflow ID, parameters, and API key"), + ) + + LOG.info("workflow_run_started", workflow_id=workflow_id, run_id=run.run_id, wait=wait) + data = _serialize_run(run) + params_str = f", parameters={parsed_params}" if parsed_params else "" + wait_str = f", wait_for_completion=True, timeout={timeout_seconds}" if wait else "" + data["sdk_equivalent"] = f"await skyvern.run_workflow(workflow_id={workflow_id!r}{params_str}{wait_str})" + return make_result("skyvern_workflow_run", data=data, timing_ms=timer.timing_ms) + + +async def skyvern_workflow_status( + run_id: Annotated[str, "Run ID to check (wr_... for workflow runs, tsk_v2_... for task runs)"], +) -> dict[str, Any]: + """Check the status and progress of a workflow or task run. Use when you need to monitor + a running workflow, check if it completed, or retrieve its output.""" + if err := _validate_run_id(run_id, "skyvern_workflow_status"): + return err + + skyvern = get_skyvern() + + with Timer() as timer: + try: + run = await skyvern.get_run(run_id) + timer.mark("sdk") + except NotFoundError: + return make_result( + "skyvern_workflow_status", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.RUN_NOT_FOUND, + f"Run {run_id!r} not found", + "Verify the run ID — it should start with wr_ or tsk_v2_", + ), + ) + except Exception as e: + return make_result( + "skyvern_workflow_status", + ok=False, + timing_ms=timer.timing_ms, + error=make_error(ErrorCode.API_ERROR, str(e), "Check the run ID and your API key"), + ) + + data = _serialize_run(run) + data["sdk_equivalent"] = f"await skyvern.get_run({run_id!r})" + return make_result("skyvern_workflow_status", data=data, timing_ms=timer.timing_ms) + + +async def skyvern_workflow_cancel( + run_id: Annotated[str, "Run ID to cancel (wr_... for workflow runs, tsk_v2_... for task runs)"], +) -> dict[str, Any]: + """Cancel a running workflow or task. Use when you need to stop a workflow that is taking + too long, is stuck, or is no longer needed.""" + if err := _validate_run_id(run_id, "skyvern_workflow_cancel"): + return err + + skyvern = get_skyvern() + + with Timer() as timer: + try: + await skyvern.cancel_run(run_id) + timer.mark("sdk") + except NotFoundError: + return make_result( + "skyvern_workflow_cancel", + ok=False, + timing_ms=timer.timing_ms, + error=make_error( + ErrorCode.RUN_NOT_FOUND, + f"Run {run_id!r} not found", + "Verify the run ID — it should start with wr_ or tsk_v2_", + ), + ) + except Exception as e: + LOG.error("workflow_cancel_failed", run_id=run_id, error=str(e)) + return make_result( + "skyvern_workflow_cancel", + ok=False, + timing_ms=timer.timing_ms, + error=make_error(ErrorCode.API_ERROR, str(e), "Check the run ID and your API key"), + ) + + LOG.info("workflow_cancelled", run_id=run_id) + return make_result( + "skyvern_workflow_cancel", + data={ + "run_id": run_id, + "cancelled": True, + "sdk_equivalent": f"await skyvern.cancel_run({run_id!r})", + }, + timing_ms=timer.timing_ms, + ) diff --git a/skyvern/forge/sdk/api/llm/api_handler_factory.py b/skyvern/forge/sdk/api/llm/api_handler_factory.py index 64c4b59b..b2ae5544 100644 --- a/skyvern/forge/sdk/api/llm/api_handler_factory.py +++ b/skyvern/forge/sdk/api/llm/api_handler_factory.py @@ -56,55 +56,6 @@ EXTRACT_ACTION_DEFAULT_THINKING_BUDGET = settings.EXTRACT_ACTION_THINKING_BUDGET DEFAULT_THINKING_BUDGET = settings.DEFAULT_THINKING_BUDGET -def _is_stale_client_error(exc: BaseException) -> bool: - """Check if an exception chain contains the httpx 'client has been closed' RuntimeError. - - litellm caches AsyncOpenAI/AsyncAzureOpenAI clients keyed by event loop ID. - In long-lived Temporal worker pods the event loop can change (task completion, - activity recycling), leaving stale clients whose underlying httpx.AsyncClient - is closed. Subsequent requests through those clients raise: - RuntimeError: Cannot send a request, as the client has been closed - which litellm wraps as APIConnectionError or InternalServerError. - """ - cur: BaseException | None = exc - while cur is not None: - if isinstance(cur, RuntimeError) and "client has been closed" in str(cur): - return True - next_exc = cur.__cause__ or cur.__context__ - cur = next_exc if next_exc is not cur else None - return False - - -async def _acompletion_with_stale_client_retry( - acompletion_callable: Any, - **kwargs: Any, -) -> Any: - """Call *acompletion_callable* and retry once if the failure is a stale httpx client. - - On first failure the entire ``litellm.in_memory_llm_clients_cache`` is flushed - so that the retry creates a fresh client bound to the current event loop. - """ - try: - return await acompletion_callable(**kwargs) - except Exception as first_err: - if not _is_stale_client_error(first_err): - raise - - model = kwargs.get("model", "unknown") - LOG.warning( - "Stale httpx client detected – flushing litellm client cache and retrying", - error_type=type(first_err).__name__, - model=model, - ) - try: - litellm.in_memory_llm_clients_cache.flush_cache() - except Exception: - LOG.warning("Failed to flush litellm client cache", exc_info=True) - - # Retry once with a fresh client - return await acompletion_callable(**kwargs) - - def _safe_model_dump_json(response: ModelResponse, indent: int = 2) -> str: """ Call model_dump_json() while suppressing Pydantic serialization warnings. @@ -658,8 +609,7 @@ class LLMAPIHandlerFactory: cache_variant=cache_variant_name, ) request_payload_json = await _log_llm_request_artifact(request_model, True) - response = await _acompletion_with_stale_client_retry( - litellm.acompletion, + response = await litellm.acompletion( model=request_model, messages=active_messages, timeout=settings.LLM_CONFIG_TIMEOUT, @@ -670,8 +620,7 @@ class LLMAPIHandlerFactory: async def _call_router_without_cache() -> tuple[ModelResponse, str]: request_payload_json = await _log_llm_request_artifact(llm_key, False) - response = await _acompletion_with_stale_client_retry( - router.acompletion, + response = await router.acompletion( model=main_model_group, messages=messages, timeout=settings.LLM_CONFIG_TIMEOUT, @@ -1092,8 +1041,7 @@ class LLMAPIHandlerFactory: try: # TODO (kerem): add a retry mechanism to this call (acompletion_with_retries) # TODO (kerem): use litellm fallbacks? https://litellm.vercel.app/docs/tutorials/fallbacks#how-does-completion_with_fallbacks-work - response = await _acompletion_with_stale_client_retry( - litellm.acompletion, + response = await litellm.acompletion( model=model_name, messages=active_messages, drop_params=True, # Drop unsupported parameters gracefully @@ -1679,8 +1627,7 @@ class LLMCaller: if self.llm_key and "UI_TARS" in self.llm_key: return await self._call_ui_tars(messages, tools, timeout, **active_parameters) - return await _acompletion_with_stale_client_retry( - litellm.acompletion, + return await litellm.acompletion( model=self.llm_config.model_name, messages=messages, tools=tools,