Add Python SDK reference docs with LLM-optimized complete reference (#4713)

This commit is contained in:
Naman
2026-02-12 18:52:50 +05:30
committed by GitHub
parent 03bf353e0e
commit d1447b8ee1
11 changed files with 2069 additions and 2 deletions

View File

@@ -117,6 +117,25 @@
}
]
},
{
"tab": "Python SDK",
"groups": [
{
"group": "Python SDK",
"pages": [
"sdk-reference/overview",
"sdk-reference/tasks",
"sdk-reference/workflows",
"sdk-reference/browser-sessions",
"sdk-reference/browser-profiles",
"sdk-reference/credentials",
"sdk-reference/helpers",
"sdk-reference/error-handling",
"sdk-reference/complete-reference"
]
}
]
},
{
"tab": "API Reference",
"openapi": "api-reference/openapi.json"

View File

@@ -290,7 +290,7 @@ A saved snapshot of browser state. Unlike sessions, profiles persist indefinitel
```python
# Create a profile from a completed run
profile = await skyvern.browser_profiles.create_browser_profile(
profile = await skyvern.create_browser_profile(
name="my-authenticated-profile",
workflow_run_id=run.run_id
)
@@ -350,10 +350,12 @@ Skyvern supports multiple AI engines for task execution:
Specify the engine when running a task:
```python
from skyvern.schemas.runs import RunEngine
result = await skyvern.run_task(
prompt="Extract pricing data",
url="https://example.com",
engine="skyvern-2.0"
engine=RunEngine.skyvern_v2
)
```

View File

@@ -0,0 +1,128 @@
---
title: Browser Profiles
subtitle: Save and reuse browser state across runs
slug: sdk-reference/browser-profiles
---
A browser profile is a snapshot of browser state — cookies, local storage, session data. Create a profile from a completed run, then load it into future workflow runs to skip login and setup steps.
For conceptual background, see [Browser Profiles](/optimization/browser-profiles).
---
## `create_browser_profile`
Create a profile from a completed workflow run.
```python
profile = await client.create_browser_profile(
name="production-login",
workflow_run_id="wr_abc123",
)
print(profile.browser_profile_id) # bpf_abc123
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `name` | `str` | Yes | Display name for the profile. |
| `description` | `str` | No | Optional description. |
| `workflow_run_id` | `str` | No | The workflow run ID to snapshot. The run must have used `persist_browser_session=True`. |
| `browser_session_id` | `str` | No | The browser session ID to snapshot. |
### Returns `BrowserProfile`
| Field | Type | Description |
|-------|------|-------------|
| `browser_profile_id` | `str` | Unique ID. Starts with `bpf_`. |
| `name` | `str` | Profile name. |
| `description` | `str \| None` | Profile description. |
| `created_at` | `datetime` | When the profile was created. |
### Example: Create a profile from a login workflow
```python
# Step 1: Run a workflow with persist_browser_session
run = await client.run_workflow(
workflow_id="wpid_login_flow",
parameters={"username": "demo@example.com"},
wait_for_completion=True,
)
# Step 2: Create a profile from the run
profile = await client.create_browser_profile(
name="demo-account-login",
workflow_run_id=run.run_id,
)
# Step 3: Use the profile in future runs (skip login)
result = await client.run_workflow(
workflow_id="wpid_extract_invoices",
browser_profile_id=profile.browser_profile_id,
wait_for_completion=True,
)
```
<Info>
Session archiving is asynchronous. If `create_browser_profile` fails immediately after a workflow completes, wait a few seconds and retry.
</Info>
---
## `list_browser_profiles`
List all browser profiles.
```python
profiles = await client.list_browser_profiles()
for p in profiles:
print(f"{p.name} ({p.browser_profile_id})")
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `include_deleted` | `bool` | No | `None` | Include soft-deleted profiles in the results. |
### Returns `list[BrowserProfile]`
---
## `get_browser_profile`
Get a single profile by ID.
```python
profile = await client.get_browser_profile("bpf_abc123")
print(profile.name)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `profile_id` | `str` | Yes | The browser profile ID. |
### Returns `BrowserProfile`
---
## `delete_browser_profile`
Delete a browser profile.
```python
await client.delete_browser_profile("bpf_abc123")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `profile_id` | `str` | Yes | The browser profile ID to delete. |
<Warning>
`browser_profile_id` only works with `run_workflow`, not `run_task`. If you pass it to `run_task`, it will be silently ignored.
</Warning>

View File

@@ -0,0 +1,119 @@
---
title: Browser Sessions
subtitle: Maintain live browser state between API calls
slug: sdk-reference/browser-sessions
---
A browser session is a persistent browser instance that stays alive between API calls. Use sessions to chain multiple tasks in the same browser without losing cookies, local storage, or login state.
For conceptual background, see [Browser Sessions](/optimization/browser-sessions).
---
## `create_browser_session`
Spin up a new cloud browser session.
```python
session = await client.create_browser_session(timeout=60)
print(session.browser_session_id) # pbs_abc123
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `timeout` | `int` | No | `60` | Session timeout in minutes (51440). Timer starts after the session is ready. |
| `proxy_location` | `ProxyLocation` | No | `None` | Route browser traffic through a geographic proxy. |
| `extensions` | `list[Extensions]` | No | `None` | Browser extensions to install. Options: `"ad-blocker"`, `"captcha-solver"`. |
| `browser_type` | `PersistentBrowserType` | No | `None` | Browser type. Options: `"chrome"`, `"msedge"`. |
### Returns `BrowserSessionResponse`
| Field | Type | Description |
|-------|------|-------------|
| `browser_session_id` | `str` | Unique ID. Starts with `pbs_`. |
| `status` | `str \| None` | Current session status. |
| `browser_address` | `str \| None` | CDP address for connecting to the browser. |
| `app_url` | `str \| None` | Link to the live browser view in the Cloud UI. |
| `timeout` | `int \| None` | Configured timeout in minutes. |
| `started_at` | `datetime \| None` | When the session became ready. |
| `created_at` | `datetime` | When the session was requested. |
### Example: Chain multiple tasks in one session
```python
session = await client.create_browser_session()
# Step 1: Log in
await client.run_task(
prompt="Log in with username demo@example.com",
url="https://app.example.com/login",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
# Step 2: Extract data (same browser, already logged in)
result = await client.run_task(
prompt="Go to the invoices page and extract all invoice numbers",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
print(result.output)
# Clean up
await client.close_browser_session(session.browser_session_id)
```
---
## `get_browser_session`
Get the status and details of a session.
```python
session = await client.get_browser_session("pbs_abc123")
print(session.status, session.browser_address)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `browser_session_id` | `str` | Yes | The session ID. |
### Returns `BrowserSessionResponse`
---
## `get_browser_sessions`
List all active browser sessions.
```python
sessions = await client.get_browser_sessions()
for s in sessions:
print(f"{s.browser_session_id} — {s.status}")
```
### Returns `list[BrowserSessionResponse]`
---
## `close_browser_session`
Close a browser session and release its resources.
```python
await client.close_browser_session("pbs_abc123")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `browser_session_id` | `str` | Yes | The session ID to close. |
<Warning>
Closing a session is irreversible. Any unsaved state (cookies, local storage) is lost unless you created a [browser profile](/sdk-reference/browser-profiles) from it.
</Warning>

View File

@@ -0,0 +1,593 @@
---
title: Complete Reference (For LLMs)
subtitle: Every method, parameter, and type in one page
slug: sdk-reference/complete-reference
---
## Install and initialize
```bash
pip install skyvern
```
```python
import asyncio
from skyvern import Skyvern
async def main():
client = Skyvern(api_key="YOUR_API_KEY")
result = await client.run_task(
prompt="Get the title of the top post on Hacker News",
url="https://news.ycombinator.com",
wait_for_completion=True,
)
print(result.output)
asyncio.run(main())
```
**Constructor:**
```python
Skyvern(
api_key: str, # Required
base_url: str | None = None, # Override for self-hosted deployments
environment: SkyvernEnvironment = CLOUD,# CLOUD or LOCAL
timeout: float | None = None, # HTTP request timeout (seconds)
)
```
**Local mode** (runs entirely on your machine, reads `.env`):
```python
client = Skyvern.local()
```
All methods are async. Use `await` inside `async def`, with `asyncio.run()` as entry point.
---
## Imports
```python
from skyvern import Skyvern
from skyvern.client import SkyvernEnvironment # CLOUD, LOCAL
from skyvern.client.core import ApiError, RequestOptions # Base error, per-request config
from skyvern.client.errors import ( # HTTP error subclasses
BadRequestError, # 400
ForbiddenError, # 403
NotFoundError, # 404
ConflictError, # 409
UnprocessableEntityError, # 422
)
from skyvern.schemas.runs import RunEngine # skyvern_v1, skyvern_v2, openai_cua, anthropic_cua, ui_tars
from skyvern.schemas.run_blocks import CredentialType # skyvern, bitwarden, onepassword, azure_vault
```
**Important:** `ApiError` lives in `skyvern.client.core`, not `skyvern.client.errors`. The subclasses (`NotFoundError`, etc.) live in `skyvern.client.errors`.
---
## Tasks
### `run_task`
```python
result = await client.run_task(
prompt: str, # Required. Natural language instructions.
url: str | None = None, # Starting page URL.
engine: RunEngine = RunEngine.skyvern_v2, # AI engine.
wait_for_completion: bool = False, # Block until finished.
timeout: float = 1800, # Max wait (seconds). Only with wait_for_completion.
max_steps: int | None = None, # Cap AI steps to limit cost.
data_extraction_schema: dict | str | None = None, # JSON Schema constraining output shape.
browser_session_id: str | None = None, # Run in existing session.
publish_workflow: bool = False, # Save generated code as reusable workflow.
proxy_location: ProxyLocation | None = None,
webhook_url: str | None = None,
error_code_mapping: dict[str, str] | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
title: str | None = None,
user_agent: str | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
) -> TaskRunResponse
```
**`TaskRunResponse` fields:**
| Field | Type | Description |
|-------|------|-------------|
| `run_id` | `str` | Unique ID (`tsk_...`). |
| `status` | `str` | `created` \| `queued` \| `running` \| `completed` \| `failed` \| `terminated` \| `timed_out` \| `canceled` |
| `output` | `dict \| list \| None` | Extracted data. `None` until completed. |
| `failure_reason` | `str \| None` | Error description if failed. |
| `downloaded_files` | `list[FileInfo] \| None` | Files downloaded during the run. |
| `recording_url` | `str \| None` | Session recording video URL. |
| `screenshot_urls` | `list[str] \| None` | Final screenshots. |
| `app_url` | `str \| None` | Link to run in Skyvern UI. |
| `step_count` | `int \| None` | Number of AI steps taken. |
| `created_at` | `datetime` | When the run was created. |
| `finished_at` | `datetime \| None` | When the run finished. |
### `get_run`
```python
run = await client.get_run(run_id: str) -> GetRunResponse
```
Returns status and results for any run (task or workflow).
### `cancel_run`
```python
await client.cancel_run(run_id: str)
```
### `get_run_timeline`
```python
timeline = await client.get_run_timeline(run_id: str) -> list[WorkflowRunTimeline]
```
### `get_run_artifacts`
```python
artifacts = await client.get_run_artifacts(
run_id: str,
artifact_type: ArtifactType | list[ArtifactType] | None = None,
) -> list[Artifact]
```
### `get_artifact`
```python
artifact = await client.get_artifact(artifact_id: str) -> Artifact
```
### `retry_run_webhook`
```python
await client.retry_run_webhook(run_id: str)
```
---
## Workflows
### `run_workflow`
```python
result = await client.run_workflow(
workflow_id: str, # Required. Permanent ID (wpid_...).
parameters: dict | None = None, # Input params matching workflow definition.
wait_for_completion: bool = False,
timeout: float = 1800,
run_with: str | None = None, # "code" (cached Playwright) or "agent" (AI).
ai_fallback: bool | None = None, # Fall back to AI if code fails.
browser_session_id: str | None = None,
browser_profile_id: str | None = None, # Load saved browser state.
proxy_location: ProxyLocation | None = None,
max_steps_override: int | None = None,
webhook_url: str | None = None,
title: str | None = None,
template: bool | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
user_agent: str | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
) -> WorkflowRunResponse
```
**`WorkflowRunResponse` fields:** Same as `TaskRunResponse` plus `run_with`, `ai_fallback`, `script_run`.
### `create_workflow`
```python
workflow = await client.create_workflow(
json_definition: dict | None = None,
yaml_definition: str | None = None,
folder_id: str | None = None,
) -> Workflow
```
Provide either `json_definition` or `yaml_definition`. Example:
```python
workflow = await client.create_workflow(
json_definition={
"title": "Extract Products",
"workflow_definition": {
"parameters": [
{
"key": "target_url",
"parameter_type": "workflow",
"workflow_parameter_type": "string",
"description": "URL to scrape",
}
],
"blocks": [
{
"block_type": "task",
"label": "extract",
"prompt": "Extract the top 3 products with name and price",
"url": "{{ target_url }}",
}
],
},
},
)
print(workflow.workflow_permanent_id) # wpid_... — use this to run it
```
**`Workflow` fields:** `workflow_id`, `workflow_permanent_id`, `version`, `title`, `workflow_definition`, `status`, `created_at`
### `get_workflows`
```python
workflows = await client.get_workflows(
page: int | None = None,
page_size: int | None = None,
only_saved_tasks: bool | None = None,
only_workflows: bool | None = None,
title: str | None = None,
search_key: str | None = None,
folder_id: str | None = None,
status: WorkflowStatus | list[WorkflowStatus] | None = None,
) -> list[Workflow]
```
### `get_workflow`
```python
workflow = await client.get_workflow(
workflow_permanent_id: str,
version: int | None = None,
template: bool | None = None,
) -> Workflow
```
### `get_workflow_versions`
```python
versions = await client.get_workflow_versions(
workflow_permanent_id: str,
template: bool | None = None,
) -> list[Workflow]
```
### `update_workflow`
```python
updated = await client.update_workflow(
workflow_id: str, # The version ID (not permanent ID).
json_definition: dict | None = None,
yaml_definition: str | None = None,
) -> Workflow
```
### `delete_workflow`
```python
await client.delete_workflow(workflow_id: str)
```
---
## Browser sessions
A persistent browser instance that stays alive between API calls. Use to chain tasks without losing cookies or login state.
### `create_browser_session`
```python
session = await client.create_browser_session(
timeout: int | None = 60, # Minutes (5-1440).
proxy_location: ProxyLocation | None = None,
extensions: list[Extensions] | None = None, # "ad-blocker", "captcha-solver"
browser_type: PersistentBrowserType | None = None, # "chrome", "msedge"
) -> BrowserSessionResponse
```
**`BrowserSessionResponse` fields:** `browser_session_id`, `status`, `browser_address`, `app_url`, `timeout`, `started_at`, `created_at`
### `get_browser_session`
```python
session = await client.get_browser_session(browser_session_id: str) -> BrowserSessionResponse
```
### `get_browser_sessions`
```python
sessions = await client.get_browser_sessions() -> list[BrowserSessionResponse]
```
### `close_browser_session`
```python
await client.close_browser_session(browser_session_id: str)
```
### Session chaining example
```python
session = await client.create_browser_session()
# Step 1: Log in
await client.run_task(
prompt="Log in with username demo@example.com",
url="https://app.example.com/login",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
# Step 2: Extract data (same browser, already logged in)
result = await client.run_task(
prompt="Go to the invoices page and extract all invoice numbers",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
print(result.output)
# Clean up
await client.close_browser_session(session.browser_session_id)
```
---
## Browser profiles
A saved snapshot of browser state (cookies, local storage). Persists indefinitely. Create from a completed workflow run, then reuse to skip login.
### `create_browser_profile`
```python
profile = await client.create_browser_profile(
name: str,
description: str | None = None,
workflow_run_id: str | None = None, # Run must have used persist_browser_session=True.
browser_session_id: str | None = None,
) -> BrowserProfile
```
**`BrowserProfile` fields:** `browser_profile_id`, `name`, `description`, `created_at`
### `list_browser_profiles`
```python
profiles = await client.list_browser_profiles(
include_deleted: bool | None = None,
) -> list[BrowserProfile]
```
### `get_browser_profile`
```python
profile = await client.get_browser_profile(profile_id: str) -> BrowserProfile
```
### `delete_browser_profile`
```python
await client.delete_browser_profile(profile_id: str)
```
### Profile workflow example
```python
# Step 1: Run a login workflow
run = await client.run_workflow(
workflow_id="wpid_login_flow",
parameters={"username": "demo@example.com"},
wait_for_completion=True,
)
# Step 2: Create a profile from the run
profile = await client.create_browser_profile(
name="demo-account-login",
workflow_run_id=run.run_id,
)
# Step 3: Use the profile in future runs (skip login)
result = await client.run_workflow(
workflow_id="wpid_extract_invoices",
browser_profile_id=profile.browser_profile_id,
wait_for_completion=True,
)
```
---
## Credentials
Store login information securely. Reference by ID instead of passing secrets in code.
### `create_credential`
```python
credential = await client.create_credential(
name: str,
credential_type: CredentialType, # e.g. "password"
credential: dict, # Password: {"username": "...", "password": "..."}
) -> CredentialResponse
```
### `get_credentials`
```python
creds = await client.get_credentials(
page: int | None = None,
page_size: int | None = None,
) -> list[CredentialResponse]
```
### `get_credential`
```python
cred = await client.get_credential(credential_id: str) -> CredentialResponse
```
### `delete_credential`
```python
await client.delete_credential(credential_id: str)
```
### `send_totp_code`
Send a TOTP code to Skyvern during a run that requires 2FA.
```python
await client.send_totp_code(
totp_identifier: str,
content: str, # The TOTP code value.
task_id: str | None = None,
workflow_id: str | None = None,
workflow_run_id: str | None = None,
source: str | None = None,
expired_at: datetime | None = None,
) -> TotpCode
```
---
## Helper methods
### `login`
Automate logging into a website using stored credentials.
```python
from skyvern.schemas.run_blocks import CredentialType
result = await client.login(
credential_type: CredentialType, # Required. skyvern, bitwarden, onepassword, azure_vault.
url: str | None = None,
credential_id: str | None = None, # When using CredentialType.skyvern.
prompt: str | None = None,
browser_session_id: str | None = None,
wait_for_completion: bool = False,
timeout: float = 1800,
# Bitwarden: bitwarden_collection_id, bitwarden_item_id
# 1Password: onepassword_vault_id, onepassword_item_id
# Azure: azure_vault_name, azure_vault_username_key, azure_vault_password_key, azure_vault_totp_secret_key
) -> WorkflowRunResponse
```
### `download_files`
Does **not** support `wait_for_completion`. Returns immediately — poll with `get_run()`.
```python
result = await client.download_files(
navigation_goal: str, # Required. What to download.
url: str | None = None,
browser_session_id: str | None = None,
browser_profile_id: str | None = None,
download_suffix: str | None = None, # Expected extension, e.g. ".pdf"
download_timeout: float | None = None,
max_steps_per_run: int | None = None,
) -> WorkflowRunResponse
```
### `upload_file`
```python
with open("data.csv", "rb") as f:
upload = await client.upload_file(file=f)
print(upload.s3uri) # s3://skyvern-uploads/...
print(upload.presigned_url) # https://...signed download URL
```
Returns `UploadFileResponse` with fields: `s3uri`, `presigned_url`.
---
## Error handling
```python
from skyvern.client.core import ApiError
from skyvern.client.errors import NotFoundError
try:
run = await client.get_run("tsk_nonexistent")
except NotFoundError as e:
print(e.status_code, e.body) # 404
except ApiError as e:
print(e.status_code, e.body) # Any other HTTP error
```
**Error types:** `BadRequestError` (400), `ForbiddenError` (403), `NotFoundError` (404), `ConflictError` (409), `UnprocessableEntityError` (422). All inherit from `ApiError`.
**Completion timeout** raises Python's built-in `TimeoutError`:
```python
try:
result = await client.run_task(
prompt="...", url="...", wait_for_completion=True, timeout=300,
)
except TimeoutError:
print("Task didn't complete in time")
```
**Run failure** is not an exception — check `result.status`:
```python
if result.status == "failed":
print(result.failure_reason)
elif result.status == "completed":
print(result.output)
```
---
## Request options
Override timeout, retries, or headers per-request:
```python
from skyvern.client.core import RequestOptions
result = await client.run_task(
prompt="...",
url="...",
request_options=RequestOptions(
timeout_in_seconds=120,
max_retries=3,
additional_headers={"x-custom-header": "value"},
),
)
```
---
## Polling pattern
When not using `wait_for_completion`:
```python
import asyncio
task = await client.run_task(prompt="...", url="...")
while True:
run = await client.get_run(task.run_id)
if run.status in ("completed", "failed", "terminated", "timed_out", "canceled"):
break
await asyncio.sleep(5)
print(run.output)
```
---
## Key constraints
- `browser_profile_id` works with `run_workflow` only — silently ignored by `run_task`.
- `download_files` does not support `wait_for_completion` — poll manually or use webhooks.
- Only workflow runs with `persist_browser_session=True` produce archives for profile creation.
- Session archiving is async — profile creation may need a short retry delay after a run completes.
- `engine` accepts `RunEngine` enum values: `skyvern_v1`, `skyvern_v2`, `openai_cua`, `anthropic_cua`, `ui_tars`.

View File

@@ -0,0 +1,119 @@
---
title: Credentials
subtitle: Store and manage authentication credentials securely
slug: sdk-reference/credentials
---
Credentials let you store login information (username/password, TOTP secrets) securely in Skyvern's vault. Reference them by ID in tasks and workflows instead of passing secrets in your code.
---
## `create_credential`
Store a new credential.
```python
credential = await client.create_credential(
name="my-app-login",
credential_type="password",
credential={
"username": "demo@example.com",
"password": "s3cur3-p4ss",
},
)
print(credential.credential_id)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `name` | `str` | Yes | Display name for the credential. |
| `credential_type` | `CredentialType` | Yes | Type of credential. |
| `credential` | `CreateCredentialRequestCredential` | Yes | The credential data. Shape depends on `credential_type`. |
### Returns `CredentialResponse`
---
## `get_credentials`
List all credentials. Credential values are never returned — only metadata.
```python
creds = await client.get_credentials()
for c in creds:
print(f"{c.name} ({c.credential_id})")
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `page` | `int` | No | `None` | Page number. |
| `page_size` | `int` | No | `None` | Results per page. |
### Returns `list[CredentialResponse]`
---
## `get_credential`
Get a single credential's metadata by ID.
```python
cred = await client.get_credential("cred_abc123")
print(cred.name, cred.credential_type)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `credential_id` | `str` | Yes | The credential ID. |
### Returns `CredentialResponse`
---
## `delete_credential`
Delete a credential.
```python
await client.delete_credential("cred_abc123")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `credential_id` | `str` | Yes | The credential ID to delete. |
---
## `send_totp_code`
Send a TOTP (time-based one-time password) code to Skyvern during a run that requires 2FA. Call this when your webhook or polling detects that Skyvern is waiting for a TOTP code.
```python
await client.send_totp_code(
totp_identifier="demo@example.com",
content="123456",
)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `totp_identifier` | `str` | Yes | The identifier matching the `totp_identifier` used in the task/workflow. |
| `content` | `str` | Yes | The TOTP code value. |
| `task_id` | `str` | No | Associate with a specific task run. |
| `workflow_id` | `str` | No | Associate with a specific workflow. |
| `workflow_run_id` | `str` | No | Associate with a specific workflow run. |
| `source` | `str` | No | Source of the TOTP code. |
| `expired_at` | `datetime` | No | When this code expires. |
| `type` | `OtpType` | No | OTP type. |
### Returns `TotpCode`

View File

@@ -0,0 +1,168 @@
---
title: Error Handling
subtitle: Handle API errors, timeouts, and configure retries
slug: sdk-reference/error-handling
---
The SDK raises typed exceptions for API errors. All errors extend `ApiError` and include the HTTP status code, response headers, and body.
---
## Error types
| Exception | Status Code | When it's raised |
|-----------|-------------|------------------|
| `BadRequestError` | 400 | Invalid request parameters. |
| `ForbiddenError` | 403 | Invalid or missing API key. |
| `NotFoundError` | 404 | Resource (run, workflow, session) not found. |
| `ConflictError` | 409 | Resource conflict (e.g., duplicate creation). |
| `UnprocessableEntityError` | 422 | Request validation failed. |
| `ApiError` | Any | Base class for all API errors. Catch this as a fallback. |
The specific error classes live in `skyvern.client.errors`. The base `ApiError` class lives in `skyvern.client.core`:
```python
from skyvern.client.core import ApiError
from skyvern.client.errors import (
BadRequestError,
ForbiddenError,
NotFoundError,
ConflictError,
UnprocessableEntityError,
)
```
---
## Catching errors
```python
from skyvern import Skyvern
from skyvern.client.core import ApiError
from skyvern.client.errors import NotFoundError
client = Skyvern(api_key="YOUR_API_KEY")
try:
run = await client.get_run("tsk_nonexistent")
except NotFoundError as e:
print(f"Run not found: {e.body}")
except ApiError as e:
print(f"API error {e.status_code}: {e.body}")
```
### Error properties
Every error has these attributes:
| Property | Type | Description |
|----------|------|-------------|
| `status_code` | `int \| None` | HTTP status code. |
| `body` | `Any` | Response body (usually a dict with error details). |
| `headers` | `dict[str, str] \| None` | Response headers. |
---
## Timeouts
Two different timeouts apply:
### HTTP request timeout
Controls how long the SDK waits for the HTTP response from the Skyvern API. Set it in the constructor or per-request:
```python
# Global timeout (applies to all requests)
client = Skyvern(api_key="YOUR_API_KEY", timeout=30.0)
# Per-request timeout
from skyvern.client.core import RequestOptions
result = await client.get_run(
"tsk_abc123",
request_options=RequestOptions(timeout_in_seconds=10),
)
```
### Completion timeout
Controls how long `wait_for_completion` polls before giving up. This is separate from the HTTP timeout:
```python
try:
result = await client.run_task(
prompt="Extract data",
url="https://example.com",
wait_for_completion=True,
timeout=300, # Give up after 5 minutes
)
except TimeoutError:
print("Task didn't complete in time")
```
The completion timeout raises Python's built-in `TimeoutError` (via `asyncio.timeout`), not `ApiError`.
---
## Retries
Configure automatic retries for transient failures using `RequestOptions`:
```python
from skyvern.client.core import RequestOptions
result = await client.run_task(
prompt="Extract product data",
url="https://example.com/products",
request_options=RequestOptions(max_retries=3),
)
```
Retries apply to the HTTP request level (network errors, 5xx responses). They do not retry the entire task if it fails at the AI level — use `get_run` to check the status and re-run if needed.
---
## Run failure vs API errors
There are two distinct failure modes:
**API error** — The HTTP request itself failed. The SDK raises an exception.
```python
from skyvern.client.core import ApiError
try:
result = await client.run_task(prompt="...")
except ApiError as e:
print(f"API call failed: {e.status_code}")
```
**Run failure** — The API call succeeded, but the task/workflow failed during execution. No exception is raised. Check the `status` field:
```python
result = await client.run_task(
prompt="Fill out the form",
url="https://example.com",
wait_for_completion=True,
)
if result.status == "failed":
print(f"Task failed: {result.failure_reason}")
elif result.status == "timed_out":
print(f"Task exceeded step limit after {result.step_count} steps")
elif result.status == "completed":
print(f"Success: {result.output}")
```
### Run statuses
| Status | Description |
|--------|-------------|
| `created` | Run initialized, not yet queued. |
| `queued` | Waiting for an available browser. |
| `running` | AI is executing. |
| `completed` | Finished successfully. |
| `failed` | Encountered an error during execution. |
| `terminated` | Manually stopped. |
| `timed_out` | Exceeded step limit (`max_steps`). |
| `canceled` | Canceled before starting. |

View File

@@ -0,0 +1,159 @@
---
title: Helper Methods
subtitle: High-level methods for common automation patterns
slug: sdk-reference/helpers
---
These methods wrap common multi-step patterns into single API calls. Under the hood, they create and run specialized workflows.
---
## `login`
Automate logging into a website using stored credentials. This creates a login workflow, executes it, and optionally waits for completion.
```python
from skyvern.schemas.run_blocks import CredentialType
result = await client.login(
credential_type=CredentialType.skyvern,
credential_id="cred_abc123",
url="https://app.example.com/login",
wait_for_completion=True,
)
print(result.status)
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `credential_type` | `CredentialType` | Yes | — | How credentials are stored. Options: `skyvern`, `bitwarden`, `onepassword`, `azure_vault`. |
| `url` | `str` | No | `None` | The login page URL. |
| `credential_id` | `str` | No | `None` | The Skyvern credential ID (when using `CredentialType.skyvern`). |
| `prompt` | `str` | No | `None` | Additional instructions for the AI during login. |
| `browser_session_id` | `str` | No | `None` | Run login inside an existing browser session. |
| `browser_address` | `str` | No | `None` | Connect to a browser at this CDP address. |
| `proxy_location` | `ProxyLocation` | No | `None` | Route browser traffic through a geographic proxy. |
| `webhook_url` | `str` | No | `None` | URL to receive a POST when the login finishes. |
| `totp_identifier` | `str` | No | `None` | Identifier for TOTP verification. |
| `totp_url` | `str` | No | `None` | URL to receive TOTP codes. |
| `wait_for_completion` | `bool` | No | `False` | Block until the login finishes. |
| `timeout` | `float` | No | `1800` | Max wait time in seconds. |
| `extra_http_headers` | `dict[str, str]` | No | `None` | Additional HTTP headers. |
| `max_screenshot_scrolling_times` | `int` | No | `None` | Number of screenshot scrolls. |
**Bitwarden-specific parameters:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `bitwarden_collection_id` | `str` | Bitwarden collection ID. |
| `bitwarden_item_id` | `str` | Bitwarden item ID. |
**1Password-specific parameters:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `onepassword_vault_id` | `str` | 1Password vault ID. |
| `onepassword_item_id` | `str` | 1Password item ID. |
**Azure Key Vault-specific parameters:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `azure_vault_name` | `str` | Azure Key Vault name. |
| `azure_vault_username_key` | `str` | Secret name for the username. |
| `azure_vault_password_key` | `str` | Secret name for the password. |
| `azure_vault_totp_secret_key` | `str` | Secret name for the TOTP secret. |
### Returns `WorkflowRunResponse`
### Example: Login then extract data
```python
from skyvern.schemas.run_blocks import CredentialType
session = await client.create_browser_session()
# Login
await client.login(
credential_type=CredentialType.skyvern,
credential_id="cred_abc123",
url="https://app.example.com/login",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
# Now extract data from the authenticated session
result = await client.run_task(
prompt="Go to the billing page and extract all invoices",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
print(result.output)
```
---
## `download_files`
Navigate to a page and download files. Unlike `run_task` and `run_workflow`, this method does **not** support `wait_for_completion` — it returns immediately with a `run_id`. Poll with `get_run()` or use a webhook to know when the download finishes.
```python
result = await client.download_files(
navigation_goal="Download the latest monthly report PDF",
url="https://app.example.com/reports",
)
# Poll for completion
import asyncio
while True:
run = await client.get_run(result.run_id)
if run.status in ("completed", "failed", "terminated", "timed_out", "canceled"):
break
await asyncio.sleep(5)
for f in run.downloaded_files:
print(f.name)
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `navigation_goal` | `str` | Yes | — | Natural language description of what to download. |
| `url` | `str` | No | `None` | Starting page URL. |
| `browser_session_id` | `str` | No | `None` | Run inside an existing browser session. |
| `browser_profile_id` | `str` | No | `None` | Load a browser profile. |
| `proxy_location` | `ProxyLocation` | No | `None` | Route through a geographic proxy. |
| `webhook_url` | `str` | No | `None` | URL to receive a POST when the download finishes. |
| `download_suffix` | `str` | No | `None` | Expected file extension to wait for (e.g., `".pdf"`). |
| `download_timeout` | `float` | No | `None` | Max time to wait for the download in seconds. |
| `max_steps_per_run` | `int` | No | `None` | Cap AI steps. |
| `extra_http_headers` | `dict[str, str]` | No | `None` | Additional HTTP headers. |
### Returns `WorkflowRunResponse`
The `downloaded_files` field contains the list of files that were downloaded.
---
## `upload_file`
Upload a file to Skyvern's storage. Returns a presigned URL and S3 URI you can reference in tasks and workflows.
```python
with open("data.csv", "rb") as f:
upload = await client.upload_file(file=f)
print(upload.s3uri) # s3://skyvern-uploads/...
print(upload.presigned_url) # https://...signed download URL
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `file` | `File` | Yes | The file to upload. Accepts file objects, byte streams, or paths. |
| `file_storage_type` | `FileStorageType` | No | Storage backend type. |
### Returns `UploadFileResponse`

View File

@@ -0,0 +1,189 @@
---
title: Python SDK
subtitle: Install, authenticate, and configure the Skyvern Python client
slug: sdk-reference/overview
---
The `skyvern` package wraps the Skyvern REST API in a typed, async Python client.
```bash
pip install skyvern
```
<Note>
Requires Python 3.11+. If you hit version errors, use `pipx install skyvern` to install in an isolated environment.
</Note>
---
## Initialize the client
The `Skyvern` class is async — all methods are coroutines. Wrap calls in an `async` function and use `asyncio.run()` as the entry point:
```python
import asyncio
from skyvern import Skyvern
async def main():
client = Skyvern(api_key="YOUR_API_KEY")
result = await client.run_task(
prompt="Get the title of the top post on Hacker News",
url="https://news.ycombinator.com",
wait_for_completion=True,
)
print(result.output)
asyncio.run(main())
```
If you're inside a framework that already runs an event loop (FastAPI, Django ASGI), `await` directly without `asyncio.run()`.
### Constructor parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `api_key` | `str` | — | **Required.** Your Skyvern API key. Get one at [app.skyvern.com/settings](https://app.skyvern.com/settings/api-keys). |
| `environment` | `SkyvernEnvironment` | `CLOUD` | Target environment. Options: `CLOUD`, `STAGING`, `LOCAL`. |
| `base_url` | `str \| None` | `None` | Override the API base URL. Use this for self-hosted deployments. |
| `timeout` | `float \| None` | `None` | HTTP request timeout in seconds. |
| `follow_redirects` | `bool` | `True` | Whether to follow HTTP redirects. |
| `httpx_client` | `AsyncClient \| None` | `None` | Provide your own httpx client for custom TLS, proxying, or connection pooling. |
---
## Environments
The SDK ships with two built-in environment URLs:
```python
from skyvern.client import SkyvernEnvironment
```
| Environment | URL | When to use |
|-------------|-----|-------------|
| `SkyvernEnvironment.CLOUD` | `https://api.skyvern.com` | Skyvern Cloud (default) |
| `SkyvernEnvironment.LOCAL` | `http://localhost:8000` | Local server started with `skyvern run server` |
For a self-hosted instance at a custom URL, pass `base_url` instead:
```python
client = Skyvern(
api_key="YOUR_API_KEY",
base_url="https://skyvern.your-company.com",
)
```
---
## Local mode
Run Skyvern entirely on your machine — no cloud, no network calls. `Skyvern.local()` reads your `.env` file, boots the engine in-process, and connects the client to it.
**Prerequisite:** Run `skyvern quickstart` once to create the `.env` file with your database connection and LLM API keys.
```python
from skyvern import Skyvern
client = Skyvern.local()
result = await client.run_task(
prompt="Get the title of the top post",
url="https://news.ycombinator.com",
wait_for_completion=True,
)
```
If you configured headful mode during `skyvern quickstart`, a Chromium window opens on your machine so you can watch the AI work.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `llm_config` | `LLMConfig \| LLMRouterConfig \| None` | `None` | Override the LLM. If omitted, uses `LLM_KEY` from `.env`. |
| `settings` | `dict \| None` | `None` | Override `.env` settings at runtime. Example: `{"MAX_STEPS_PER_RUN": 100}` |
---
## `wait_for_completion`
By default, `run_task` and `run_workflow` return immediately after the run is queued — you get a `run_id` and need to poll `get_run()` yourself. Pass `wait_for_completion=True` to have the SDK poll automatically until the run reaches a terminal state (`completed`, `failed`, `terminated`, `timed_out`, or `canceled`):
```python
# Returns only after the task finishes (up to 30 min by default)
result = await client.run_task(
prompt="Fill out the contact form",
url="https://example.com/contact",
wait_for_completion=True,
timeout=600, # give up after 10 minutes
)
# Without wait_for_completion — returns immediately
task = await client.run_task(
prompt="Fill out the contact form",
url="https://example.com/contact",
)
print(task.run_id) # poll with client.get_run(task.run_id)
```
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `wait_for_completion` | `bool` | `False` | Poll until the run finishes. |
| `timeout` | `float` | `1800` | Maximum wait time in seconds. Raises Python's `TimeoutError` if exceeded. |
Supported on `run_task`, `run_workflow`, and `login`.
---
## Request options
Every method accepts an optional `request_options` parameter for per-request overrides of timeout, retries, and headers:
```python
from skyvern.client.core import RequestOptions
result = await client.run_task(
prompt="Extract data",
url="https://example.com",
request_options=RequestOptions(
timeout_in_seconds=120,
max_retries=3,
additional_headers={"x-custom-header": "value"},
),
)
```
These override the client-level defaults for that single call only.
---
## Next steps
<CardGroup cols={2}>
<Card
title="Tasks"
icon="play"
href="/sdk-reference/tasks"
>
Run browser automations with `run_task`
</Card>
<Card
title="Workflows"
icon="diagram-project"
href="/sdk-reference/workflows"
>
Create and run multi-step automations
</Card>
<Card
title="Browser Sessions"
icon="browser"
href="/sdk-reference/browser-sessions"
>
Maintain live browser state between calls
</Card>
<Card
title="Error Handling"
icon="triangle-exclamation"
href="/sdk-reference/error-handling"
>
Handle errors and configure retries
</Card>
</CardGroup>

View File

@@ -0,0 +1,297 @@
---
title: Tasks
subtitle: Run single browser automations with natural language
slug: sdk-reference/tasks
---
A task is a single browser automation. You describe what you want in natural language — Skyvern opens a browser, navigates to the URL, and executes the instructions with AI.
For when to use tasks vs workflows, see [Run a Task](/running-automations/run-a-task).
---
## `run_task`
Start a browser automation. Skyvern opens a cloud browser, navigates to the URL, and executes your prompt with AI.
```python
result = await client.run_task(
prompt="Get the title of the top post",
url="https://news.ycombinator.com",
wait_for_completion=True,
)
print(result.output)
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `prompt` | `str` | Yes | — | Natural language instructions for what the AI should do. |
| `url` | `str` | No | `None` | Starting page URL. If omitted, the AI navigates from a blank page. |
| `engine` | `RunEngine` | No | `skyvern_v2` | AI engine. Options: `skyvern_v2`, `skyvern_v1`, `openai_cua`, `anthropic_cua`, `ui_tars`. |
| `wait_for_completion` | `bool` | No | `False` | Block until the run finishes. |
| `timeout` | `float` | No | `1800` | Max wait time in seconds when `wait_for_completion=True`. |
| `max_steps` | `int` | No | `None` | Cap the number of AI steps to limit cost. Run terminates with `timed_out` if hit. |
| `data_extraction_schema` | `dict \| str` | No | `None` | JSON schema or Pydantic model name constraining the output shape. |
| `proxy_location` | `ProxyLocation` | No | `None` | Route the browser through a geographic proxy. |
| `browser_session_id` | `str` | No | `None` | Run inside an existing [browser session](/optimization/browser-sessions). |
| `publish_workflow` | `bool` | No | `False` | Save the generated code as a reusable workflow. Only works with `skyvern_v2`. |
| `webhook_url` | `str` | No | `None` | URL to receive a POST when the run finishes. |
| `error_code_mapping` | `dict[str, str]` | No | `None` | Map custom error codes to failure reasons. |
| `totp_identifier` | `str` | No | `None` | Identifier for TOTP verification. |
| `totp_url` | `str` | No | `None` | URL to receive TOTP codes. |
| `title` | `str` | No | `None` | Display name for this run in the dashboard. |
| `model` | `dict` | No | `None` | Override the output model definition. |
| `user_agent` | `str` | No | `None` | Custom User-Agent header for the browser. |
| `extra_http_headers` | `dict[str, str]` | No | `None` | Additional HTTP headers injected into every browser request. |
| `include_action_history_in_verification` | `bool` | No | `None` | Include action history when verifying task completion. |
| `max_screenshot_scrolls` | `int` | No | `None` | Number of scrolls for post-action screenshots. Useful for lazy-loaded content. |
| `browser_address` | `str` | No | `None` | Connect to a browser at this CDP address instead of spinning up a new one. |
### Returns `TaskRunResponse`
| Field | Type | Description |
|-------|------|-------------|
| `run_id` | `str` | Unique identifier. Starts with `tsk_` for task runs. |
| `status` | `str` | `created`, `queued`, `running`, `completed`, `failed`, `terminated`, `timed_out`, or `canceled`. |
| `output` | `dict \| None` | Extracted data from the run. Shape depends on your prompt or `data_extraction_schema`. |
| `downloaded_files` | `list[FileInfo] \| None` | Files downloaded during the run. |
| `recording_url` | `str \| None` | URL to the session recording video. |
| `screenshot_urls` | `list[str] \| None` | Final screenshots (most recent first). |
| `failure_reason` | `str \| None` | Error description if the run failed. |
| `app_url` | `str \| None` | Link to view this run in the Cloud UI. |
| `step_count` | `int \| None` | Number of AI steps taken. |
| `script_run` | `ScriptRunResponse \| None` | Code execution result if the run used generated code. |
| `created_at` | `datetime` | When the run was created. |
| `finished_at` | `datetime \| None` | When the run finished. |
### Examples
**Extract structured data:**
```python
result = await client.run_task(
prompt="Extract the name, price, and rating of the top 3 products",
url="https://example.com/products",
data_extraction_schema={
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"price": {"type": "string"},
"rating": {"type": "number"},
},
},
},
wait_for_completion=True,
)
print(result.output)
# [{"name": "Widget A", "price": "$29.99", "rating": 4.5}, ...]
```
**Run inside an existing browser session:**
```python
session = await client.create_browser_session()
result = await client.run_task(
prompt="Log in and download the latest invoice",
url="https://app.example.com/login",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
```
**Limit cost with max_steps:**
```python
result = await client.run_task(
prompt="Fill out the contact form",
url="https://example.com/contact",
max_steps=10,
wait_for_completion=True,
)
```
**Use a lighter engine:**
```python
from skyvern.schemas.runs import RunEngine
result = await client.run_task(
prompt="Get the page title",
url="https://example.com",
engine=RunEngine.skyvern_v1,
wait_for_completion=True,
)
```
**Publish as a reusable workflow:**
```python
result = await client.run_task(
prompt="Fill out the contact form with the provided data",
url="https://example.com/contact",
publish_workflow=True,
wait_for_completion=True,
)
# The generated workflow is saved and can be re-triggered via run_workflow
```
---
## `get_run`
Get the current status and results of any run (task or workflow).
```python
run = await client.get_run("tsk_v2_486305187432193504")
print(run.status, run.output)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `run_id` | `str` | Yes | The run ID returned by `run_task` or `run_workflow`. |
### Returns `GetRunResponse`
A discriminated union based on `run_type`. All variants share the same core fields as `TaskRunResponse` above, plus a `run_type` field (`task_v1`, `task_v2`, `openai_cua`, `anthropic_cua`, `ui_tars`, `workflow_run`).
Workflow run responses additionally include `run_with` and `ai_fallback` fields.
---
## `cancel_run`
Cancel a running or queued run.
```python
await client.cancel_run("tsk_v2_486305187432193504")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `run_id` | `str` | Yes | The run ID to cancel. |
The run transitions to `canceled` status. If the run has already finished, this is a no-op.
---
## `get_run_timeline`
Get the step-by-step timeline of a run. Each entry represents one AI action with screenshots and reasoning.
```python
timeline = await client.get_run_timeline("tsk_v2_486305187432193504")
for step in timeline:
print(f"Step {step.order}: {step.type} — {step.status}")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `run_id` | `str` | Yes | The run ID. |
### Returns `list[WorkflowRunTimeline]`
Each timeline entry contains step details including type, status, order, and associated artifacts.
---
## `get_run_artifacts`
Get all artifacts (screenshots, recordings, generated code, etc.) for a run.
```python
artifacts = await client.get_run_artifacts("tsk_v2_486305187432193504")
for artifact in artifacts:
print(f"{artifact.artifact_type}: {artifact.uri}")
```
Filter by type to get specific artifacts:
```python
# Get only the generated Playwright scripts
scripts = await client.get_run_artifacts(
"tsk_v2_486305187432193504",
artifact_type=["script_file"],
)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `run_id` | `str` | Yes | The run ID. |
| `artifact_type` | `ArtifactType \| list[ArtifactType]` | No | Filter by artifact type. |
### Returns `list[Artifact]`
---
## `get_artifact`
Get a single artifact by ID.
```python
artifact = await client.get_artifact("art_486305187432193504")
print(artifact.uri)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `artifact_id` | `str` | Yes | The artifact ID. |
### Returns `Artifact`
---
## `retry_run_webhook`
Re-send the webhook notification for a completed run. Useful if your webhook endpoint was down when the run finished.
```python
await client.retry_run_webhook("tsk_v2_486305187432193504")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `run_id` | `str` | Yes | The run ID. |
---
## Polling pattern
If you don't use `wait_for_completion`, poll `get_run` manually:
```python
import asyncio
task = await client.run_task(
prompt="Extract product data",
url="https://example.com/products",
)
while True:
run = await client.get_run(task.run_id)
if run.status in ("completed", "failed", "terminated", "timed_out", "canceled"):
break
await asyncio.sleep(5)
print(run.output)
```
<Tip>
For production, prefer `wait_for_completion=True` or [webhooks](/going-to-production/webhooks) over manual polling.
</Tip>

View File

@@ -0,0 +1,274 @@
---
title: Workflows
subtitle: Create and run multi-step browser automations
slug: sdk-reference/workflows
---
A workflow chains multiple steps (blocks) into a single automation. Workflows support loops, conditionals, data passing between steps, and code-based re-execution.
For conceptual background, see [Build a Workflow](/multi-step-automations/build-a-workflow).
---
## `run_workflow`
Execute a workflow by its permanent ID. Skyvern opens a cloud browser and runs each block in sequence.
```python
result = await client.run_workflow(
workflow_id="wpid_abc123",
wait_for_completion=True,
)
print(result.output)
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `workflow_id` | `str` | Yes | — | The workflow's permanent ID (`wpid_...`). |
| `parameters` | `dict` | No | `None` | Input parameters defined in the workflow. Keys must match parameter names. |
| `wait_for_completion` | `bool` | No | `False` | Block until the workflow finishes. |
| `timeout` | `float` | No | `1800` | Max wait time in seconds when `wait_for_completion=True`. |
| `run_with` | `str` | No | `None` | Force execution mode: `"code"` (use cached Playwright code) or `"agent"` (use AI). |
| `ai_fallback` | `bool` | No | `None` | Fall back to AI if the cached code fails. |
| `browser_session_id` | `str` | No | `None` | Run inside an existing [browser session](/optimization/browser-sessions). |
| `browser_profile_id` | `str` | No | `None` | Load a [browser profile](/optimization/browser-profiles) (cookies, storage) into the session. |
| `proxy_location` | `ProxyLocation` | No | `None` | Route the browser through a geographic proxy. |
| `max_steps_override` | `int` | No | `None` | Cap total AI steps across all blocks. |
| `webhook_url` | `str` | No | `None` | URL to receive a POST when the run finishes. |
| `title` | `str` | No | `None` | Display name for this run in the dashboard. |
| `totp_identifier` | `str` | No | `None` | Identifier for TOTP verification. |
| `totp_url` | `str` | No | `None` | URL to receive TOTP codes. |
| `template` | `bool` | No | `None` | Run a template workflow. |
| `user_agent` | `str` | No | `None` | Custom User-Agent header for the browser. |
| `extra_http_headers` | `dict[str, str]` | No | `None` | Additional HTTP headers injected into every browser request. |
| `max_screenshot_scrolls` | `int` | No | `None` | Number of scrolls for post-action screenshots. |
| `browser_address` | `str` | No | `None` | Connect to a browser at this CDP address. |
### Returns `WorkflowRunResponse`
| Field | Type | Description |
|-------|------|-------------|
| `run_id` | `str` | Unique identifier. Starts with `wr_` for workflow runs. |
| `status` | `str` | `created`, `queued`, `running`, `completed`, `failed`, `terminated`, `timed_out`, or `canceled`. |
| `output` | `dict \| None` | Extracted data from the workflow's output block. |
| `downloaded_files` | `list[FileInfo] \| None` | Files downloaded during the run. |
| `recording_url` | `str \| None` | URL to the session recording. |
| `failure_reason` | `str \| None` | Error description if the run failed. |
| `app_url` | `str \| None` | Link to view this run in the Cloud UI. |
| `step_count` | `int \| None` | Total AI steps taken across all blocks. |
| `run_with` | `str \| None` | Whether the run used `"code"` or `"agent"`. |
| `ai_fallback` | `bool \| None` | Whether AI fallback was configured. |
| `script_run` | `ScriptRunResponse \| None` | Code execution result. Contains `ai_fallback_triggered` if code was used. |
### Examples
**Pass parameters to a workflow:**
```python
result = await client.run_workflow(
workflow_id="wpid_invoice_extraction",
parameters={
"company_name": "Acme Corp",
"date_range": "2025-01-01 to 2025-12-31",
},
wait_for_completion=True,
)
```
**Run with cached code (skip AI, use generated Playwright scripts):**
```python
result = await client.run_workflow(
workflow_id="wpid_daily_report",
run_with="code",
ai_fallback=True, # Fall back to AI if code fails
wait_for_completion=True,
)
```
**Run with a browser profile (skip login):**
```python
result = await client.run_workflow(
workflow_id="wpid_daily_report",
browser_profile_id="bpf_abc123",
wait_for_completion=True,
)
```
---
## `create_workflow`
Create a new workflow from a JSON or YAML definition.
```python
workflow = await client.create_workflow(
json_definition={
"blocks": [
{
"block_type": "task",
"label": "extract_data",
"prompt": "Extract the top 3 products",
"url": "https://example.com/products",
}
],
"parameters": [],
},
)
print(workflow.workflow_permanent_id)
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `json_definition` | `WorkflowCreateYamlRequest` | No | Workflow definition as a JSON object. |
| `yaml_definition` | `str` | No | Workflow definition as a YAML string. |
| `folder_id` | `str` | No | Folder to organize the workflow in. |
You must provide either `json_definition` or `yaml_definition`.
### Returns `Workflow`
| Field | Type | Description |
|-------|------|-------------|
| `workflow_id` | `str` | Unique ID for this version. |
| `workflow_permanent_id` | `str` | Stable ID across all versions. Use this to run workflows. |
| `version` | `int` | Version number. |
| `title` | `str` | Workflow title. |
| `workflow_definition` | `WorkflowDefinition` | The full definition including blocks and parameters. |
| `status` | `str \| None` | Workflow status. |
| `created_at` | `datetime` | When the workflow was created. |
---
## `get_workflows`
List all workflows. Supports filtering and pagination.
```python
workflows = await client.get_workflows()
for wf in workflows:
print(f"{wf.title} ({wf.workflow_permanent_id})")
```
### Parameters
| Parameter | Type | Required | Default | Description |
|-----------|------|----------|---------|-------------|
| `page` | `int` | No | `None` | Page number for pagination. |
| `page_size` | `int` | No | `None` | Number of results per page. |
| `only_saved_tasks` | `bool` | No | `None` | Only return saved tasks. |
| `only_workflows` | `bool` | No | `None` | Only return workflows (not saved tasks). |
| `only_templates` | `bool` | No | `None` | Only return templates. |
| `title` | `str` | No | `None` | Filter by exact title. |
| `search_key` | `str` | No | `None` | Search by title. |
| `folder_id` | `str` | No | `None` | Filter by folder. |
| `status` | `WorkflowStatus \| list[WorkflowStatus]` | No | `None` | Filter by status. |
### Returns `list[Workflow]`
---
## `get_workflow`
<Info>
Requires `skyvern` version 1.1.0 or later. Run `pip install --upgrade skyvern` to update.
</Info>
Get a specific workflow by its permanent ID.
```python
workflow = await client.get_workflow("wpid_abc123")
print(workflow.title, f"v{workflow.version}")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `workflow_permanent_id` | `str` | Yes | The workflow's permanent ID. |
| `version` | `int` | No | Specific version to retrieve. Defaults to latest. |
| `template` | `bool` | No | Whether to fetch a template workflow. |
### Returns `Workflow`
---
## `get_workflow_versions`
<Info>
Requires `skyvern` version 1.1.0 or later. Run `pip install --upgrade skyvern` to update.
</Info>
List all versions of a workflow.
```python
versions = await client.get_workflow_versions("wpid_abc123")
for v in versions:
print(f"v{v.version} — {v.modified_at}")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `workflow_permanent_id` | `str` | Yes | The workflow's permanent ID. |
| `template` | `bool` | No | Whether to fetch template versions. |
### Returns `list[Workflow]`
---
## `update_workflow`
Update an existing workflow's definition.
```python
updated = await client.update_workflow(
"wf_abc123",
json_definition={
"blocks": [
{
"block_type": "task",
"label": "extract_data",
"prompt": "Extract the top 5 products",
"url": "https://example.com/products",
}
],
"parameters": [],
},
)
print(f"Updated to v{updated.version}")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `workflow_id` | `str` | Yes | The workflow version ID (not the permanent ID). |
| `json_definition` | `WorkflowCreateYamlRequest` | No | Updated workflow definition as JSON. |
| `yaml_definition` | `str` | No | Updated workflow definition as YAML. |
### Returns `Workflow`
Creates a new version of the workflow.
---
## `delete_workflow`
Delete a workflow.
```python
await client.delete_workflow("wf_abc123")
```
### Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `workflow_id` | `str` | Yes | The workflow version ID to delete. |