SKY-7807/7808: Workflow MCP tools — CRUD + execution (#4675)

This commit is contained in:
Marc Kelechava
2026-02-09 18:46:45 -08:00
committed by GitHub
parent ed9e37ee48
commit 8b84cae5a4
4 changed files with 812 additions and 77 deletions

View File

@@ -15,6 +15,9 @@ class ErrorCode:
SDK_ERROR = "SDK_ERROR"
TIMEOUT = "TIMEOUT"
INVALID_INPUT = "INVALID_INPUT"
WORKFLOW_NOT_FOUND = "WORKFLOW_NOT_FOUND"
RUN_NOT_FOUND = "RUN_NOT_FOUND"
API_ERROR = "API_ERROR"
@dataclass

View File

@@ -1,7 +1,8 @@
"""Skyvern MCP Tools.
This module provides MCP (Model Context Protocol) tools for browser automation.
Tools are registered with FastMCP and can be used by AI assistants like Claude.
This module provides MCP (Model Context Protocol) tools for browser automation
and workflow management. Tools are registered with FastMCP and can be used by
AI assistants like Claude.
"""
from fastmcp import FastMCP
@@ -28,6 +29,16 @@ from .session import (
skyvern_session_get,
skyvern_session_list,
)
from .workflow import (
skyvern_workflow_cancel,
skyvern_workflow_create,
skyvern_workflow_delete,
skyvern_workflow_get,
skyvern_workflow_list,
skyvern_workflow_run,
skyvern_workflow_status,
skyvern_workflow_update,
)
mcp = FastMCP(
"Skyvern",
@@ -41,6 +52,9 @@ Reach for Skyvern tools when the user asks you to:
- Check the current state of a web page or verify something on a site
- Do anything you would otherwise attempt with requests, beautifulsoup, selenium, or playwright
- Access website data where you are unsure whether an API endpoint exists
- Create, run, monitor, or manage web automations (Skyvern workflows)
- Set up reusable, parameterized automations that run on Skyvern's cloud
- Check the status of running automations or retrieve their results
DO NOT try to scrape websites by guessing API endpoints or writing HTTP requests.
Instead, use skyvern_navigate + skyvern_extract to get real data from actual pages.
@@ -56,8 +70,58 @@ These tools give you a real browser — use them instead of writing scraping cod
| "Fill out this form" | skyvern_act |
| "Log in and buy the first item" | skyvern_run_task |
| "Is checkout complete?" | skyvern_validate |
| "List my workflows" | skyvern_workflow_list |
| "Create a workflow that monitors prices" | skyvern_workflow_create |
| "Run the login workflow" | skyvern_workflow_run |
| "Is my workflow done?" | skyvern_workflow_status |
| "Set up a reusable automation for this" | Explore with browser tools, then skyvern_workflow_create |
| "Write a script to do this" | Skyvern SDK (see below) |
## Getting Started
**Visiting a website** (extracting data, filling forms, interacting with a page):
1. Create a session with skyvern_session_create
2. Navigate and interact with browser tools
3. Close with skyvern_session_close when done
**Managing automations** (running, listing, or monitoring workflows):
No browser session needed — use workflow tools directly:
skyvern_workflow_list, skyvern_workflow_run, skyvern_workflow_status, etc.
**Building a reusable automation** (explore a site, then save as a workflow):
1. **Explore** — Create a browser session, navigate the site, use skyvern_extract and skyvern_screenshot to understand the page structure
2. **Create** — Build a workflow definition and save it with skyvern_workflow_create
3. **Test** — Run the workflow with skyvern_workflow_run and check results with skyvern_workflow_status
## Workflows vs Scripts
When the user wants something **persistent, versioned, and managed in Skyvern's dashboard** — create a workflow.
Trigger words: "automation", "workflow", "reusable", "schedule", "monitor", "set up"
→ Use skyvern_workflow_create with a JSON definition (see example below)
When the user wants **custom Python code** to run in their own environment — write an SDK script.
Trigger words: "script", "code", "function", "program"
→ Use `from skyvern import Skyvern` (see Writing Scripts section)
### Workflow definition example (JSON, for skyvern_workflow_create):
{
"title": "Price Monitor",
"workflow_definition": {
"parameters": [
{"parameter_type": "workflow", "key": "url", "workflow_parameter_type": "string"}
],
"blocks": [
{"block_type": "task", "label": "extract_prices", "url": "{{url}}", "engine": "skyvern-2.0",
"navigation_goal": "Extract all product names and prices from the page",
"data_extraction_goal": "Get product names and prices as a list",
"data_schema": {"type": "object", "properties": {"products": {"type": "array",
"items": {"type": "object", "properties": {"name": {"type": "string"}, "price": {"type": "string"}}}}}}}
]
}
}
Use `{{parameter_key}}` to reference workflow parameters in block fields.
To inspect a real workflow for reference, use skyvern_workflow_get on an existing workflow.
## Writing Scripts and Code
When asked to write an automation script, use the Skyvern Python SDK with the **hybrid xpath+prompt
pattern** for production-quality scripts. The hybrid form tries the xpath/selector first (fast,
@@ -86,12 +150,6 @@ The `resolved_selector` field in responses gives you the xpath the AI resolved t
IMPORTANT: NEVER import from skyvern.cli.mcp_tools — those are internal server modules.
The public SDK is: from skyvern import Skyvern
## Recommended Workflow
1. **Connect** — Create or connect to a browser session
2. **Explore** — Navigate pages, take screenshots, extract data with AI
3. **Build** — Capture selectors and data schemas to construct deterministic workflows
4. **Test** — Validate workflows via skyvern_run_task
## Primary Tools (use these first)
These are the tools you should reach for by default:
@@ -126,28 +184,32 @@ Precision tools support three modes. When unsure, use `intent`.
3. **Selector mode** — deterministic CSS/XPath targeting:
`skyvern_click(selector="#submit-btn")`
## Replay Story: From Exploration to Production Scripts
## Replay Story: From Exploration to Production
When you use precision tools (skyvern_click, skyvern_type, etc.) with intent mode, the response
includes `resolved_selector` — the xpath/CSS the AI found. Capture these to build hybrid scripts.
includes `resolved_selector` — the xpath/CSS the AI found. Capture these for hybrid scripts or
workflow definitions.
**The hybrid pattern** is the recommended default for SDK scripts:
await page.click("xpath=//button[@id='submit']", prompt="the Submit button")
It tries the selector first (fast, no AI cost), then falls back to AI if the selector breaks.
**Workflow for generating scripts:**
1. Explore: Use skyvern_click(intent="Submit button") during interactive exploration
2. Capture: Note the `resolved_selector` from the response (e.g., "//button[@id='submit']")
3. Script: Write `page.click("xpath=//button[@id='submit']", prompt="Submit button")`
The `sdk_equivalent` field in each tool response shows the correct hybrid call to use in scripts.
Always prefer hybrid xpath+prompt over prompt-only in generated scripts.
Note: Currently only skyvern_click returns resolved_selector. Support for skyvern_type and
skyvern_select_option is planned (SKY-7905). For those tools, use the selector you provided
as input, or fall back to prompt-only until SKY-7905 ships.
skyvern_select_option is planned (SKY-7905).
## Getting Started
Create a session with skyvern_session_create, then use browser tools to interact with pages.
## Workflow Management
Use these tools to create, manage, and run Skyvern workflows programmatically.
Workflows are persistent, versioned, multi-step automations that can be parameterized and scheduled.
- **skyvern_workflow_list** — Find workflows by name or browse all available workflows
- **skyvern_workflow_get** — Get the full definition of a workflow to inspect its blocks and parameters
- **skyvern_workflow_create** — Create a new workflow from a YAML or JSON definition
- **skyvern_workflow_update** — Update an existing workflow's definition (creates a new version)
- **skyvern_workflow_delete** — Delete a workflow (requires force=true confirmation)
- **skyvern_workflow_run** — Execute a workflow with parameters (returns immediately by default, or wait for completion)
- **skyvern_workflow_status** — Check the status and progress of a running or completed workflow run
- **skyvern_workflow_cancel** — Cancel a running workflow
""",
)
@@ -175,6 +237,16 @@ mcp.tool()(skyvern_select_option)
mcp.tool()(skyvern_press_key)
mcp.tool()(skyvern_wait)
# -- Workflow management (CRUD + execution, no browser needed) --
mcp.tool()(skyvern_workflow_list)
mcp.tool()(skyvern_workflow_get)
mcp.tool()(skyvern_workflow_create)
mcp.tool()(skyvern_workflow_update)
mcp.tool()(skyvern_workflow_delete)
mcp.tool()(skyvern_workflow_run)
mcp.tool()(skyvern_workflow_status)
mcp.tool()(skyvern_workflow_cancel)
__all__ = [
"mcp",
# Session
@@ -198,4 +270,13 @@ __all__ = [
"skyvern_select_option",
"skyvern_press_key",
"skyvern_wait",
# Workflow management
"skyvern_workflow_list",
"skyvern_workflow_get",
"skyvern_workflow_create",
"skyvern_workflow_update",
"skyvern_workflow_delete",
"skyvern_workflow_run",
"skyvern_workflow_status",
"skyvern_workflow_cancel",
]

View File

@@ -0,0 +1,704 @@
"""Skyvern MCP workflow tools — CRUD and execution for Skyvern workflows.
Tools for listing, creating, updating, deleting, running, and monitoring
Skyvern workflows via the Skyvern HTTP API. These tools do not require a
browser session.
"""
from __future__ import annotations
import asyncio
import json
from typing import Annotated, Any
import structlog
from pydantic import Field
from skyvern.client.errors import NotFoundError
from skyvern.client.types import WorkflowCreateYamlRequest
from skyvern.schemas.runs import ProxyLocation
from ._common import ErrorCode, Timer, make_error, make_result
from ._session import get_skyvern
LOG = structlog.get_logger()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _serialize_workflow(wf: Any) -> dict[str, Any]:
"""Pick the fields we expose from a Workflow Pydantic model.
Uses Any to avoid tight coupling with Fern-generated client types.
"""
return {
"workflow_permanent_id": wf.workflow_permanent_id,
"workflow_id": wf.workflow_id,
"title": wf.title,
"version": wf.version,
"status": str(wf.status) if wf.status else None,
"description": wf.description,
"is_saved_task": wf.is_saved_task,
"folder_id": wf.folder_id,
"created_at": wf.created_at.isoformat() if wf.created_at else None,
"modified_at": wf.modified_at.isoformat() if wf.modified_at else None,
}
def _serialize_workflow_full(wf: Any) -> dict[str, Any]:
"""Like _serialize_workflow but includes the full definition."""
data = _serialize_workflow(wf)
if hasattr(wf, "workflow_definition") and wf.workflow_definition is not None:
try:
data["workflow_definition"] = wf.workflow_definition.model_dump(mode="json")
except Exception:
data["workflow_definition"] = str(wf.workflow_definition)
return data
def _serialize_run(run: Any) -> dict[str, Any]:
"""Pick fields from a run response (GetRunResponse variant or WorkflowRunResponse).
Uses Any to avoid tight coupling with Fern-generated client types.
"""
data: dict[str, Any] = {
"run_id": run.run_id,
"status": str(run.status) if run.status else None,
}
for field in (
"run_type",
"step_count",
"failure_reason",
"recording_url",
"app_url",
"browser_session_id",
"run_with",
):
val = getattr(run, field, None)
if val is not None:
data[field] = str(val) if not isinstance(val, (str, int, bool)) else val
if hasattr(run, "output") and run.output is not None:
try:
data["output"] = run.output.model_dump(mode="json") if hasattr(run.output, "model_dump") else run.output
except Exception:
data["output"] = str(run.output)
for ts_field in ("created_at", "modified_at", "started_at", "finished_at", "queued_at"):
val = getattr(run, ts_field, None)
if val is not None:
data[ts_field] = val.isoformat()
return data
def _validate_workflow_id(workflow_id: str, action: str) -> dict[str, Any] | None:
"""Validate workflow_id format. Returns a make_result error dict or None if valid."""
if "/" in workflow_id or "\\" in workflow_id:
return make_result(
action,
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
"workflow_id must not contain path separators",
"Provide a valid workflow permanent ID (starts with wpid_)",
),
)
if not workflow_id.startswith("wpid_"):
return make_result(
action,
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid workflow_id format: {workflow_id!r}",
"Workflow IDs start with wpid_. Use skyvern_workflow_list to find valid IDs.",
),
)
return None
def _validate_run_id(run_id: str, action: str) -> dict[str, Any] | None:
"""Validate run_id format. Returns a make_result error dict or None if valid."""
if "/" in run_id or "\\" in run_id:
return make_result(
action,
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
"run_id must not contain path separators",
"Provide a valid run ID (starts with wr_ or tsk_v2_)",
),
)
if not run_id.startswith("wr_") and not run_id.startswith("tsk_v2_"):
return make_result(
action,
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid run_id format: {run_id!r}",
"Run IDs start with wr_ (workflow runs) or tsk_v2_ (task runs). Check skyvern_workflow_run output.",
),
)
return None
async def _get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, Any]:
"""Fetch a single workflow by ID via the Skyvern API.
The Fern-generated client has get_workflows() (list) but no get_workflow(id).
This helper isolates the private client access so the workaround is contained
in one place. Replace with ``skyvern.get_workflow(id)`` when the SDK adds it.
Raises NotFoundError on 404, or RuntimeError on other HTTP errors, so callers
can use the same ``except NotFoundError`` pattern as all other workflow tools.
"""
skyvern = get_skyvern()
params: dict[str, Any] = {}
if version is not None:
params["version"] = version
# TODO(SKY-7807): Replace with skyvern.get_workflow() when the Fern client adds it.
response = await skyvern._client_wrapper.httpx_client.request(
f"api/v1/workflows/{workflow_id}",
method="GET",
params=params,
)
if response.status_code == 404:
raise NotFoundError(body={"detail": f"Workflow {workflow_id!r} not found"})
if response.status_code >= 400:
detail = ""
try:
detail = response.json().get("detail", response.text)
except Exception:
detail = response.text
raise RuntimeError(f"HTTP {response.status_code}: {detail}")
return response.json()
def _validate_definition_structure(json_def: WorkflowCreateYamlRequest | None, action: str) -> dict[str, Any] | None:
"""Validate required fields in a JSON workflow definition.
Returns a make_result error dict if validation fails, or None if valid.
Only validates JSON definitions — YAML is validated server-side.
Note: WorkflowCreateYamlRequest already enforces ``title`` and
``workflow_definition`` as required fields via Pydantic, so this is
a belt-and-suspenders check that produces user-friendly error messages.
"""
if json_def is None:
return None
if not json_def.title:
return make_result(
action,
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
"Workflow definition missing 'title' field",
"Add a 'title' field to your workflow definition",
),
)
return None
def _parse_definition(
definition: str, fmt: str
) -> tuple[WorkflowCreateYamlRequest | None, str | None, dict[str, Any] | None]:
"""Parse a workflow definition string.
Returns (json_definition, yaml_definition, error).
Exactly one of the first two will be set on success, or error on failure.
JSON input is parsed into a WorkflowCreateYamlRequest (the type the SDK expects).
"""
if fmt == "json":
try:
raw = json.loads(definition)
return WorkflowCreateYamlRequest(**raw), None, None
except (json.JSONDecodeError, TypeError) as e:
return (
None,
None,
make_error(
ErrorCode.INVALID_INPUT,
f"Invalid JSON definition: {e}",
"Provide a valid JSON object for the workflow definition",
),
)
except Exception as e:
return (
None,
None,
make_error(
ErrorCode.INVALID_INPUT,
f"Invalid workflow definition: {e}",
"Check the workflow definition fields (title, workflow_definition with blocks)",
),
)
elif fmt == "yaml":
return None, definition, None
else:
# auto: try JSON first, fall back to YAML
try:
raw = json.loads(definition)
return WorkflowCreateYamlRequest(**raw), None, None
except (json.JSONDecodeError, TypeError):
return None, definition, None
except Exception:
# JSON parsed but failed model validation — treat as YAML
return None, definition, None
# ---------------------------------------------------------------------------
# SKY-7807: Workflow CRUD
# ---------------------------------------------------------------------------
async def skyvern_workflow_list(
search: Annotated[str | None, "Search across workflow titles, folder names, and parameter metadata"] = None,
page: Annotated[int, Field(description="Page number (1-based)", ge=1)] = 1,
page_size: Annotated[int, Field(description="Results per page", ge=1, le=100)] = 10,
only_workflows: Annotated[bool, "Only return multi-step workflows (exclude saved tasks)"] = False,
) -> dict[str, Any]:
"""Find and browse available Skyvern workflows. Use when you need to discover what workflows exist,
search for a workflow by name, or list all workflows for an organization."""
skyvern = get_skyvern()
with Timer() as timer:
try:
workflows = await skyvern.get_workflows(
search_key=search,
page=page,
page_size=page_size,
only_workflows=only_workflows,
)
timer.mark("sdk")
except Exception as e:
return make_result(
"skyvern_workflow_list",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(ErrorCode.API_ERROR, str(e), "Check your API key and Skyvern connection"),
)
return make_result(
"skyvern_workflow_list",
data={
"workflows": [_serialize_workflow(wf) for wf in workflows],
"page": page,
"page_size": page_size,
"count": len(workflows),
"has_more": len(workflows) == page_size,
"sdk_equivalent": f"await skyvern.get_workflows(search_key={search!r}, page={page}, page_size={page_size})",
},
timing_ms=timer.timing_ms,
)
async def skyvern_workflow_get(
workflow_id: Annotated[str, "Workflow permanent ID (starts with wpid_)"],
version: Annotated[int | None, "Specific version to retrieve (latest if omitted)"] = None,
) -> dict[str, Any]:
"""Get the full definition of a specific workflow. Use when you need to inspect a workflow's
blocks, parameters, and configuration before running or updating it."""
if err := _validate_workflow_id(workflow_id, "skyvern_workflow_get"):
return err
with Timer() as timer:
try:
wf_data = await _get_workflow_by_id(workflow_id, version)
timer.mark("sdk")
except NotFoundError:
return make_result(
"skyvern_workflow_get",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.WORKFLOW_NOT_FOUND,
f"Workflow {workflow_id!r} not found",
"Verify the workflow ID with skyvern_workflow_list",
),
)
except Exception as e:
return make_result(
"skyvern_workflow_get",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(ErrorCode.API_ERROR, str(e), "Check your API key and workflow ID"),
)
version_str = f", version={version}" if version is not None else ""
return make_result(
"skyvern_workflow_get",
data={
**wf_data,
"sdk_equivalent": f"# No SDK method yet — GET /api/v1/workflows/{workflow_id}{version_str}",
},
timing_ms=timer.timing_ms,
)
async def skyvern_workflow_create(
definition: Annotated[str, "Workflow definition as a YAML or JSON string"],
format: Annotated[ # noqa: A002
str, Field(description="Definition format: 'json', 'yaml', or 'auto' (tries JSON first, falls back to YAML)")
] = "auto",
folder_id: Annotated[str | None, "Folder ID (fld_...) to organize the workflow in"] = None,
) -> dict[str, Any]:
"""Create a new Skyvern workflow from a YAML or JSON definition. Use when you need to save
a new automation workflow that can be run repeatedly with different parameters."""
if format not in ("json", "yaml", "auto"):
return make_result(
"skyvern_workflow_create",
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid format: {format!r}",
"Use 'json', 'yaml', or 'auto'",
),
)
json_def, yaml_def, parse_err = _parse_definition(definition, format)
if parse_err is not None:
return make_result("skyvern_workflow_create", ok=False, error=parse_err)
if err := _validate_definition_structure(json_def, "skyvern_workflow_create"):
return err
skyvern = get_skyvern()
with Timer() as timer:
try:
workflow = await skyvern.create_workflow(
json_definition=json_def,
yaml_definition=yaml_def,
folder_id=folder_id,
)
timer.mark("sdk")
except Exception as e:
LOG.error("workflow_create_failed", error=str(e))
return make_result(
"skyvern_workflow_create",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.API_ERROR,
str(e),
"Check the workflow definition syntax and required fields (title, workflow_definition.blocks)",
),
)
LOG.info("workflow_created", workflow_id=workflow.workflow_permanent_id)
data = _serialize_workflow(workflow)
fmt_label = "json_definition" if json_def is not None else "yaml_definition"
folder_str = f", folder_id={folder_id!r}" if folder_id is not None else ""
data["sdk_equivalent"] = f"await skyvern.create_workflow({fmt_label}=<definition>{folder_str})"
return make_result("skyvern_workflow_create", data=data, timing_ms=timer.timing_ms)
async def skyvern_workflow_update(
workflow_id: Annotated[str, "Workflow permanent ID (wpid_...) to update"],
definition: Annotated[str, "Updated workflow definition as a YAML or JSON string"],
format: Annotated[ # noqa: A002
str, Field(description="Definition format: 'json', 'yaml', or 'auto'")
] = "auto",
) -> dict[str, Any]:
"""Update an existing workflow's definition. Use when you need to modify a workflow's blocks,
parameters, or configuration. Creates a new version of the workflow."""
if err := _validate_workflow_id(workflow_id, "skyvern_workflow_update"):
return err
if format not in ("json", "yaml", "auto"):
return make_result(
"skyvern_workflow_update",
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid format: {format!r}",
"Use 'json', 'yaml', or 'auto'",
),
)
json_def, yaml_def, parse_err = _parse_definition(definition, format)
if parse_err is not None:
return make_result("skyvern_workflow_update", ok=False, error=parse_err)
if err := _validate_definition_structure(json_def, "skyvern_workflow_update"):
return err
skyvern = get_skyvern()
with Timer() as timer:
try:
workflow = await skyvern.update_workflow(
workflow_id,
json_definition=json_def,
yaml_definition=yaml_def,
)
timer.mark("sdk")
except Exception as e:
return make_result(
"skyvern_workflow_update",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.API_ERROR,
str(e),
"Check the workflow ID and definition syntax",
),
)
data = _serialize_workflow(workflow)
fmt_label = "json_definition" if json_def is not None else "yaml_definition"
data["sdk_equivalent"] = f"await skyvern.update_workflow({workflow_id!r}, {fmt_label}=<definition>)"
return make_result("skyvern_workflow_update", data=data, timing_ms=timer.timing_ms)
async def skyvern_workflow_delete(
workflow_id: Annotated[str, "Workflow permanent ID (wpid_...) to delete"],
force: Annotated[bool, "Must be true to confirm deletion — prevents accidental deletes"] = False,
) -> dict[str, Any]:
"""Delete a workflow permanently. Use when you need to remove a workflow that is no longer needed.
Requires force=true to prevent accidental deletion."""
if err := _validate_workflow_id(workflow_id, "skyvern_workflow_delete"):
return err
if not force:
return make_result(
"skyvern_workflow_delete",
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Deletion of workflow {workflow_id!r} requires confirmation",
"Set force=true to confirm deletion. This action is irreversible.",
),
)
skyvern = get_skyvern()
with Timer() as timer:
try:
await skyvern.delete_workflow(workflow_id)
timer.mark("sdk")
except NotFoundError:
return make_result(
"skyvern_workflow_delete",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.WORKFLOW_NOT_FOUND,
f"Workflow {workflow_id!r} not found",
"Verify the workflow ID with skyvern_workflow_list",
),
)
except Exception as e:
LOG.error("workflow_delete_failed", workflow_id=workflow_id, error=str(e))
return make_result(
"skyvern_workflow_delete",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(ErrorCode.API_ERROR, str(e), "Check the workflow ID and your permissions"),
)
LOG.info("workflow_deleted", workflow_id=workflow_id)
return make_result(
"skyvern_workflow_delete",
data={
"workflow_permanent_id": workflow_id,
"deleted": True,
"sdk_equivalent": f"await skyvern.delete_workflow({workflow_id!r})",
},
timing_ms=timer.timing_ms,
)
# ---------------------------------------------------------------------------
# SKY-7808: Workflow Execution
# ---------------------------------------------------------------------------
async def skyvern_workflow_run(
workflow_id: Annotated[str, "Workflow permanent ID (wpid_...) to run"],
parameters: Annotated[str | None, Field(description="JSON string of workflow parameters")] = None,
browser_session_id: Annotated[
str | None, Field(description="Reuse an existing browser session (pbs_...) to preserve login state")
] = None,
webhook_url: Annotated[str | None, Field(description="URL for status webhook callbacks after completion")] = None,
proxy_location: Annotated[
str | None, Field(description="Geographic proxy: RESIDENTIAL, RESIDENTIAL_GB, NONE, etc.")
] = None,
wait: Annotated[bool, "Wait for the workflow to complete before returning (default: return immediately)"] = False,
timeout_seconds: Annotated[
int, Field(description="Max wait time in seconds when wait=true (default 300)", ge=10, le=3600)
] = 300,
) -> dict[str, Any]:
"""Run a Skyvern workflow with parameters. Use when you need to execute an automation workflow.
Returns immediately by default (async) — set wait=true to block until completion.
Default timeout is 300s (5 minutes). For longer workflows, increase timeout_seconds
or use wait=false and poll with skyvern_workflow_status."""
if err := _validate_workflow_id(workflow_id, "skyvern_workflow_run"):
return err
parsed_params: dict[str, Any] | None = None
if parameters is not None:
try:
parsed_params = json.loads(parameters)
except (json.JSONDecodeError, TypeError) as e:
return make_result(
"skyvern_workflow_run",
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid parameters JSON: {e}",
"Provide parameters as a valid JSON object string",
),
)
proxy: ProxyLocation | None = None
if proxy_location is not None:
try:
proxy = ProxyLocation(proxy_location)
except ValueError:
return make_result(
"skyvern_workflow_run",
ok=False,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid proxy_location: {proxy_location!r}",
"Use RESIDENTIAL, RESIDENTIAL_GB, NONE, etc.",
),
)
skyvern = get_skyvern()
with Timer() as timer:
try:
run = await skyvern.run_workflow(
workflow_id=workflow_id,
parameters=parsed_params,
browser_session_id=browser_session_id,
webhook_url=webhook_url,
proxy_location=proxy,
wait_for_completion=wait,
timeout=timeout_seconds,
)
timer.mark("sdk")
except asyncio.TimeoutError:
return make_result(
"skyvern_workflow_run",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.TIMEOUT,
f"Workflow did not complete within {timeout_seconds}s",
"Increase timeout_seconds or set wait=false and poll with skyvern_workflow_status",
),
)
except NotFoundError:
return make_result(
"skyvern_workflow_run",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.WORKFLOW_NOT_FOUND,
f"Workflow {workflow_id!r} not found",
"Verify the workflow ID with skyvern_workflow_list",
),
)
except Exception as e:
LOG.error("workflow_run_failed", workflow_id=workflow_id, error=str(e))
return make_result(
"skyvern_workflow_run",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(ErrorCode.API_ERROR, str(e), "Check the workflow ID, parameters, and API key"),
)
LOG.info("workflow_run_started", workflow_id=workflow_id, run_id=run.run_id, wait=wait)
data = _serialize_run(run)
params_str = f", parameters={parsed_params}" if parsed_params else ""
wait_str = f", wait_for_completion=True, timeout={timeout_seconds}" if wait else ""
data["sdk_equivalent"] = f"await skyvern.run_workflow(workflow_id={workflow_id!r}{params_str}{wait_str})"
return make_result("skyvern_workflow_run", data=data, timing_ms=timer.timing_ms)
async def skyvern_workflow_status(
run_id: Annotated[str, "Run ID to check (wr_... for workflow runs, tsk_v2_... for task runs)"],
) -> dict[str, Any]:
"""Check the status and progress of a workflow or task run. Use when you need to monitor
a running workflow, check if it completed, or retrieve its output."""
if err := _validate_run_id(run_id, "skyvern_workflow_status"):
return err
skyvern = get_skyvern()
with Timer() as timer:
try:
run = await skyvern.get_run(run_id)
timer.mark("sdk")
except NotFoundError:
return make_result(
"skyvern_workflow_status",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.RUN_NOT_FOUND,
f"Run {run_id!r} not found",
"Verify the run ID — it should start with wr_ or tsk_v2_",
),
)
except Exception as e:
return make_result(
"skyvern_workflow_status",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(ErrorCode.API_ERROR, str(e), "Check the run ID and your API key"),
)
data = _serialize_run(run)
data["sdk_equivalent"] = f"await skyvern.get_run({run_id!r})"
return make_result("skyvern_workflow_status", data=data, timing_ms=timer.timing_ms)
async def skyvern_workflow_cancel(
run_id: Annotated[str, "Run ID to cancel (wr_... for workflow runs, tsk_v2_... for task runs)"],
) -> dict[str, Any]:
"""Cancel a running workflow or task. Use when you need to stop a workflow that is taking
too long, is stuck, or is no longer needed."""
if err := _validate_run_id(run_id, "skyvern_workflow_cancel"):
return err
skyvern = get_skyvern()
with Timer() as timer:
try:
await skyvern.cancel_run(run_id)
timer.mark("sdk")
except NotFoundError:
return make_result(
"skyvern_workflow_cancel",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(
ErrorCode.RUN_NOT_FOUND,
f"Run {run_id!r} not found",
"Verify the run ID — it should start with wr_ or tsk_v2_",
),
)
except Exception as e:
LOG.error("workflow_cancel_failed", run_id=run_id, error=str(e))
return make_result(
"skyvern_workflow_cancel",
ok=False,
timing_ms=timer.timing_ms,
error=make_error(ErrorCode.API_ERROR, str(e), "Check the run ID and your API key"),
)
LOG.info("workflow_cancelled", run_id=run_id)
return make_result(
"skyvern_workflow_cancel",
data={
"run_id": run_id,
"cancelled": True,
"sdk_equivalent": f"await skyvern.cancel_run({run_id!r})",
},
timing_ms=timer.timing_ms,
)

View File

@@ -56,55 +56,6 @@ EXTRACT_ACTION_DEFAULT_THINKING_BUDGET = settings.EXTRACT_ACTION_THINKING_BUDGET
DEFAULT_THINKING_BUDGET = settings.DEFAULT_THINKING_BUDGET
def _is_stale_client_error(exc: BaseException) -> bool:
"""Check if an exception chain contains the httpx 'client has been closed' RuntimeError.
litellm caches AsyncOpenAI/AsyncAzureOpenAI clients keyed by event loop ID.
In long-lived Temporal worker pods the event loop can change (task completion,
activity recycling), leaving stale clients whose underlying httpx.AsyncClient
is closed. Subsequent requests through those clients raise:
RuntimeError: Cannot send a request, as the client has been closed
which litellm wraps as APIConnectionError or InternalServerError.
"""
cur: BaseException | None = exc
while cur is not None:
if isinstance(cur, RuntimeError) and "client has been closed" in str(cur):
return True
next_exc = cur.__cause__ or cur.__context__
cur = next_exc if next_exc is not cur else None
return False
async def _acompletion_with_stale_client_retry(
acompletion_callable: Any,
**kwargs: Any,
) -> Any:
"""Call *acompletion_callable* and retry once if the failure is a stale httpx client.
On first failure the entire ``litellm.in_memory_llm_clients_cache`` is flushed
so that the retry creates a fresh client bound to the current event loop.
"""
try:
return await acompletion_callable(**kwargs)
except Exception as first_err:
if not _is_stale_client_error(first_err):
raise
model = kwargs.get("model", "unknown")
LOG.warning(
"Stale httpx client detected flushing litellm client cache and retrying",
error_type=type(first_err).__name__,
model=model,
)
try:
litellm.in_memory_llm_clients_cache.flush_cache()
except Exception:
LOG.warning("Failed to flush litellm client cache", exc_info=True)
# Retry once with a fresh client
return await acompletion_callable(**kwargs)
def _safe_model_dump_json(response: ModelResponse, indent: int = 2) -> str:
"""
Call model_dump_json() while suppressing Pydantic serialization warnings.
@@ -658,8 +609,7 @@ class LLMAPIHandlerFactory:
cache_variant=cache_variant_name,
)
request_payload_json = await _log_llm_request_artifact(request_model, True)
response = await _acompletion_with_stale_client_retry(
litellm.acompletion,
response = await litellm.acompletion(
model=request_model,
messages=active_messages,
timeout=settings.LLM_CONFIG_TIMEOUT,
@@ -670,8 +620,7 @@ class LLMAPIHandlerFactory:
async def _call_router_without_cache() -> tuple[ModelResponse, str]:
request_payload_json = await _log_llm_request_artifact(llm_key, False)
response = await _acompletion_with_stale_client_retry(
router.acompletion,
response = await router.acompletion(
model=main_model_group,
messages=messages,
timeout=settings.LLM_CONFIG_TIMEOUT,
@@ -1092,8 +1041,7 @@ class LLMAPIHandlerFactory:
try:
# TODO (kerem): add a retry mechanism to this call (acompletion_with_retries)
# TODO (kerem): use litellm fallbacks? https://litellm.vercel.app/docs/tutorials/fallbacks#how-does-completion_with_fallbacks-work
response = await _acompletion_with_stale_client_retry(
litellm.acompletion,
response = await litellm.acompletion(
model=model_name,
messages=active_messages,
drop_params=True, # Drop unsupported parameters gracefully
@@ -1679,8 +1627,7 @@ class LLMCaller:
if self.llm_key and "UI_TARS" in self.llm_key:
return await self._call_ui_tars(messages, tools, timeout, **active_parameters)
return await _acompletion_with_stale_client_retry(
litellm.acompletion,
return await litellm.acompletion(
model=self.llm_config.model_name,
messages=messages,
tools=tools,