Files
Dorod-Sky/docs/sdk-reference/complete-reference.mdx
Naman bf8c7de8f9 cloud ui docs + cookbooks (#4759)
Co-authored-by: Ritik Sahni <ritiksahni0203@gmail.com>
Co-authored-by: Kunal Mishra <kunalm2345@gmail.com>
2026-02-16 22:14:40 +00:00

594 lines
16 KiB
Plaintext

---
title: Complete Reference (For LLMs)
subtitle: Every method, parameter, and type in one page
slug: sdk-reference/complete-reference
---
## Install and initialize
```bash
pip install skyvern
```
```python
import asyncio
from skyvern import Skyvern
async def main():
client = Skyvern(api_key="YOUR_API_KEY")
result = await client.run_task(
prompt="Get the title of the top post on Hacker News",
url="https://news.ycombinator.com",
wait_for_completion=True,
)
print(result.output)
asyncio.run(main())
```
**Constructor:**
```python
Skyvern(
api_key: str, # Required
base_url: str | None = None, # Override for self-hosted deployments
environment: SkyvernEnvironment = CLOUD,# CLOUD or LOCAL
timeout: float | None = None, # HTTP request timeout (seconds)
)
```
**Local mode** (runs entirely on your machine, reads `.env`):
```python
client = Skyvern.local()
```
All methods are async. Use `await` inside `async def`, with `asyncio.run()` as entry point.
---
## Imports
```python
from skyvern import Skyvern
from skyvern.client import SkyvernEnvironment # CLOUD, LOCAL
from skyvern.client.core import ApiError, RequestOptions # Base error, per-request config
from skyvern.client.errors import ( # HTTP error subclasses
BadRequestError, # 400
ForbiddenError, # 403
NotFoundError, # 404
ConflictError, # 409
UnprocessableEntityError, # 422
)
from skyvern.schemas.runs import RunEngine # skyvern_v1, skyvern_v2, openai_cua, anthropic_cua, ui_tars
from skyvern.schemas.run_blocks import CredentialType # skyvern, bitwarden, onepassword, azure_vault
```
**Important:** `ApiError` lives in `skyvern.client.core`, not `skyvern.client.errors`. The subclasses (`NotFoundError`, etc.) live in `skyvern.client.errors`.
---
## Tasks
### `run_task`
```python
result = await client.run_task(
prompt: str, # Required. Natural language instructions.
url: str | None = None, # Starting page URL.
engine: RunEngine = RunEngine.skyvern_v2, # AI engine.
wait_for_completion: bool = False, # Block until finished.
timeout: float = 1800, # Max wait (seconds). Only with wait_for_completion.
max_steps: int | None = None, # Cap AI steps to limit cost.
data_extraction_schema: dict | str | None = None, # JSON Schema constraining output shape.
browser_session_id: str | None = None, # Run in existing session.
publish_workflow: bool = False, # Save generated code as reusable workflow.
proxy_location: ProxyLocation | None = None,
webhook_url: str | None = None,
error_code_mapping: dict[str, str] | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
title: str | None = None,
user_agent: str | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
) -> TaskRunResponse
```
**`TaskRunResponse` fields:**
| Field | Type | Description |
|-------|------|-------------|
| `run_id` | `str` | Unique ID (`tsk_...`). |
| `status` | `str` | `created` \| `queued` \| `running` \| `completed` \| `failed` \| `terminated` \| `timed_out` \| `canceled` |
| `output` | `dict \| list \| None` | Extracted data. `None` until completed. |
| `failure_reason` | `str \| None` | Error description if failed. |
| `downloaded_files` | `list[FileInfo] \| None` | Files downloaded during the run. |
| `recording_url` | `str \| None` | Session recording video URL. |
| `screenshot_urls` | `list[str] \| None` | Final screenshots. |
| `app_url` | `str \| None` | Link to run in Skyvern UI. |
| `step_count` | `int \| None` | Number of AI steps taken. |
| `created_at` | `datetime` | When the run was created. |
| `finished_at` | `datetime \| None` | When the run finished. |
### `get_run`
```python
run = await client.get_run(run_id: str) -> GetRunResponse
```
Returns status and results for any run (task or workflow).
### `cancel_run`
```python
await client.cancel_run(run_id: str)
```
### `get_run_timeline`
```python
timeline = await client.get_run_timeline(run_id: str) -> list[WorkflowRunTimeline]
```
### `get_run_artifacts`
```python
artifacts = await client.get_run_artifacts(
run_id: str,
artifact_type: ArtifactType | list[ArtifactType] | None = None,
) -> list[Artifact]
```
### `get_artifact`
```python
artifact = await client.get_artifact(artifact_id: str) -> Artifact
```
### `retry_run_webhook`
```python
await client.retry_run_webhook(run_id: str)
```
---
## Workflows
### `run_workflow`
```python
result = await client.run_workflow(
workflow_id: str, # Required. Permanent ID (wpid_...).
parameters: dict | None = None, # Input params matching workflow definition.
wait_for_completion: bool = False,
timeout: float = 1800,
run_with: str | None = None, # "code" (cached Playwright) or "agent" (AI).
ai_fallback: bool | None = None, # Fall back to AI if code fails.
browser_session_id: str | None = None,
browser_profile_id: str | None = None, # Load saved browser state.
proxy_location: ProxyLocation | None = None,
max_steps_override: int | None = None,
webhook_url: str | None = None,
title: str | None = None,
template: bool | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
user_agent: str | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
) -> WorkflowRunResponse
```
**`WorkflowRunResponse` fields:** Same as `TaskRunResponse` plus `run_with`, `ai_fallback`, `script_run`.
### `create_workflow`
```python
workflow = await client.create_workflow(
json_definition: dict | None = None,
yaml_definition: str | None = None,
folder_id: str | None = None,
) -> Workflow
```
Provide either `json_definition` or `yaml_definition`. Example:
```python
workflow = await client.create_workflow(
json_definition={
"title": "Extract Products",
"workflow_definition": {
"parameters": [
{
"key": "target_url",
"parameter_type": "workflow",
"workflow_parameter_type": "string",
"description": "URL to scrape",
}
],
"blocks": [
{
"block_type": "task",
"label": "extract",
"prompt": "Extract the top 3 products with name and price",
"url": "{{ target_url }}",
}
],
},
},
)
print(workflow.workflow_permanent_id) # wpid_... — use this to run it
```
**`Workflow` fields:** `workflow_id`, `workflow_permanent_id`, `version`, `title`, `workflow_definition`, `status`, `created_at`
### `get_workflows`
```python
workflows = await client.get_workflows(
page: int | None = None,
page_size: int | None = None,
only_saved_tasks: bool | None = None,
only_workflows: bool | None = None,
title: str | None = None,
search_key: str | None = None,
folder_id: str | None = None,
status: WorkflowStatus | list[WorkflowStatus] | None = None,
) -> list[Workflow]
```
### `get_workflow`
```python
workflow = await client.get_workflow(
workflow_permanent_id: str,
version: int | None = None,
template: bool | None = None,
) -> Workflow
```
### `get_workflow_versions`
```python
versions = await client.get_workflow_versions(
workflow_permanent_id: str,
template: bool | None = None,
) -> list[Workflow]
```
### `update_workflow`
```python
updated = await client.update_workflow(
workflow_id: str, # The permanent ID (wpid_...).
json_definition: dict | None = None,
yaml_definition: str | None = None,
) -> Workflow
```
### `delete_workflow`
```python
await client.delete_workflow(workflow_id: str)
```
---
## Browser sessions
A persistent browser instance that stays alive between API calls. Use to chain tasks without losing cookies or login state.
### `create_browser_session`
```python
session = await client.create_browser_session(
timeout: int | None = 60, # Minutes (5-1440).
proxy_location: ProxyLocation | None = None,
extensions: list[Extensions] | None = None, # "ad-blocker", "captcha-solver"
browser_type: PersistentBrowserType | None = None, # "chrome", "msedge"
) -> BrowserSessionResponse
```
**`BrowserSessionResponse` fields:** `browser_session_id`, `status`, `browser_address`, `app_url`, `timeout`, `started_at`, `created_at`
### `get_browser_session`
```python
session = await client.get_browser_session(browser_session_id: str) -> BrowserSessionResponse
```
### `get_browser_sessions`
```python
sessions = await client.get_browser_sessions() -> list[BrowserSessionResponse]
```
### `close_browser_session`
```python
await client.close_browser_session(browser_session_id: str)
```
### Session chaining example
```python
session = await client.create_browser_session()
# Step 1: Log in
await client.run_task(
prompt="Log in with username demo@example.com",
url="https://app.example.com/login",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
# Step 2: Extract data (same browser, already logged in)
result = await client.run_task(
prompt="Go to the invoices page and extract all invoice numbers",
browser_session_id=session.browser_session_id,
wait_for_completion=True,
)
print(result.output)
# Clean up
await client.close_browser_session(session.browser_session_id)
```
---
## Browser profiles
A saved snapshot of browser state (cookies, local storage). Persists indefinitely. Create from a completed workflow run, then reuse to skip login.
### `create_browser_profile`
```python
profile = await client.create_browser_profile(
name: str,
description: str | None = None,
workflow_run_id: str | None = None, # Run must have used persist_browser_session=True.
browser_session_id: str | None = None, # One of workflow_run_id or browser_session_id is required.
) -> BrowserProfile
```
**`BrowserProfile` fields:** `browser_profile_id`, `name`, `description`, `created_at`
### `list_browser_profiles`
```python
profiles = await client.list_browser_profiles(
include_deleted: bool | None = None,
) -> list[BrowserProfile]
```
### `get_browser_profile`
```python
profile = await client.get_browser_profile(profile_id: str) -> BrowserProfile
```
### `delete_browser_profile`
```python
await client.delete_browser_profile(profile_id: str)
```
### Profile workflow example
```python
# Step 1: Run a login workflow
run = await client.run_workflow(
workflow_id="wpid_login_flow",
parameters={"username": "demo@example.com"},
wait_for_completion=True,
)
# Step 2: Create a profile from the run
profile = await client.create_browser_profile(
name="demo-account-login",
workflow_run_id=run.run_id,
)
# Step 3: Use the profile in future runs (skip login)
result = await client.run_workflow(
workflow_id="wpid_extract_invoices",
browser_profile_id=profile.browser_profile_id,
wait_for_completion=True,
)
```
---
## Credentials
Store login information securely. Reference by ID instead of passing secrets in code.
### `create_credential`
```python
credential = await client.create_credential(
name: str,
credential_type: CredentialType, # e.g. "password"
credential: dict, # Password: {"username": "...", "password": "..."}
) -> CredentialResponse
```
### `get_credentials`
```python
creds = await client.get_credentials(
page: int | None = None,
page_size: int | None = None,
) -> list[CredentialResponse]
```
### `get_credential`
```python
cred = await client.get_credential(credential_id: str) -> CredentialResponse
```
### `delete_credential`
```python
await client.delete_credential(credential_id: str)
```
### `send_totp_code`
Send a TOTP code to Skyvern during a run that requires 2FA.
```python
await client.send_totp_code(
totp_identifier: str,
content: str, # The TOTP code value.
task_id: str | None = None,
workflow_id: str | None = None,
workflow_run_id: str | None = None,
source: str | None = None,
expired_at: datetime | None = None,
) -> TotpCode
```
---
## Helper methods
### `login`
Automate logging into a website using stored credentials.
```python
from skyvern.schemas.run_blocks import CredentialType
result = await client.login(
credential_type: CredentialType, # Required. skyvern, bitwarden, onepassword, azure_vault.
url: str | None = None,
credential_id: str | None = None, # When using CredentialType.skyvern.
prompt: str | None = None,
browser_session_id: str | None = None,
wait_for_completion: bool = False,
timeout: float = 1800,
# Bitwarden: bitwarden_collection_id, bitwarden_item_id
# 1Password: onepassword_vault_id, onepassword_item_id
# Azure: azure_vault_name, azure_vault_username_key, azure_vault_password_key, azure_vault_totp_secret_key
) -> WorkflowRunResponse
```
### `download_files`
Does **not** support `wait_for_completion`. Returns immediately — poll with `get_run()`.
```python
result = await client.download_files(
navigation_goal: str, # Required. What to download.
url: str | None = None,
browser_session_id: str | None = None,
browser_profile_id: str | None = None,
download_suffix: str | None = None, # Expected extension, e.g. ".pdf"
download_timeout: float | None = None,
max_steps_per_run: int | None = None,
) -> WorkflowRunResponse
```
### `upload_file`
```python
with open("data.csv", "rb") as f:
upload = await client.upload_file(file=f)
print(upload.s3uri) # s3://skyvern-uploads/...
print(upload.presigned_url) # https://...signed download URL
```
Returns `UploadFileResponse` with fields: `s3uri`, `presigned_url`.
---
## Error handling
```python
from skyvern.client.core import ApiError
from skyvern.client.errors import NotFoundError
try:
run = await client.get_run("tsk_nonexistent")
except NotFoundError as e:
print(e.status_code, e.body) # 404
except ApiError as e:
print(e.status_code, e.body) # Any other HTTP error
```
**Error types:** `BadRequestError` (400), `ForbiddenError` (403), `NotFoundError` (404), `ConflictError` (409), `UnprocessableEntityError` (422). All inherit from `ApiError`.
**Completion timeout** raises Python's built-in `TimeoutError`:
```python
try:
result = await client.run_task(
prompt="...", url="...", wait_for_completion=True, timeout=300,
)
except TimeoutError:
print("Task didn't complete in time")
```
**Run failure** is not an exception — check `result.status`:
```python
if result.status == "failed":
print(result.failure_reason)
elif result.status == "completed":
print(result.output)
```
---
## Request options
Override timeout, retries, or headers per-request:
```python
from skyvern.client.core import RequestOptions
result = await client.run_task(
prompt="...",
url="...",
request_options=RequestOptions(
timeout_in_seconds=120,
max_retries=3,
additional_headers={"x-custom-header": "value"},
),
)
```
---
## Polling pattern
When not using `wait_for_completion`:
```python
import asyncio
task = await client.run_task(prompt="...", url="...")
while True:
run = await client.get_run(task.run_id)
if run.status in ("completed", "failed", "terminated", "timed_out", "canceled"):
break
await asyncio.sleep(5)
print(run.output)
```
---
## Key constraints
- `browser_profile_id` works with `run_workflow` only — silently ignored by `run_task`.
- `download_files` does not support `wait_for_completion` — poll manually or use webhooks.
- Only workflow runs with `persist_browser_session=True` produce archives for profile creation.
- Session archiving is async — profile creation may need a short retry delay after a run completes.
- `engine` accepts `RunEngine` enum values: `skyvern_v1`, `skyvern_v2`, `openai_cua`, `anthropic_cua`, `ui_tars`.