Merge SkyvernSdk and Skyvern extending Fern client (#3987)

Co-authored-by: Shuchang Zheng <wintonzheng0325@gmail.com>
This commit is contained in:
Stanislav Novosad
2025-11-12 15:44:53 -07:00
committed by GitHub
parent d88ca1ca27
commit e1595abf84
14 changed files with 360 additions and 699 deletions

View File

@@ -3,10 +3,9 @@ from typing import Any
if typing.TYPE_CHECKING:
from skyvern.library.skyvern import Skyvern # noqa: E402
from skyvern.library.skyvern_sdk import SkyvernSdk # noqa: E402
# noinspection PyUnresolvedReferences
__all__ = ["Skyvern", "SkyvernSdk"]
__all__ = ["Skyvern"]
def __getattr__(name: str) -> Any:
@@ -16,9 +15,4 @@ def __getattr__(name: str) -> Any:
globals()["Skyvern"] = Skyvern
return Skyvern
if name == "SkyvernSdk":
from skyvern.library.skyvern_sdk import SkyvernSdk # noqa: PLC0415
globals()["SkyvernSdk"] = SkyvernSdk
return SkyvernSdk
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -1,23 +1,27 @@
import httpx
from httpx import ASGITransport
from skyvern.client import AsyncSkyvern, SkyvernEnvironment
from skyvern.config import settings
from skyvern.forge.api_app import app
def create_embedded_server(
api_key: str,
open_api_key: str | None,
) -> AsyncSkyvern:
settings.BROWSER_LOGS_ENABLED = False
openai_api_key: str | None,
) -> httpx.AsyncClient:
class EmbeddedServerTransport(httpx.AsyncBaseTransport):
def __init__(self) -> None:
self._transport: ASGITransport | None = None
if open_api_key:
settings.OPENAI_API_KEY = open_api_key
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
if self._transport is None:
from skyvern.config import settings # noqa: PLC0415
from skyvern.forge.api_app import app # noqa: PLC0415
transport = ASGITransport(app=app)
return AsyncSkyvern(
environment=SkyvernEnvironment.LOCAL,
api_key=api_key,
httpx_client=httpx.AsyncClient(transport=transport, base_url="http://skyvern-embedded"),
)
settings.BROWSER_LOGS_ENABLED = False
if openai_api_key:
settings.OPENAI_API_KEY = openai_api_key
self._transport = ASGITransport(app=app)
response = await self._transport.handle_async_request(request)
return response
return httpx.AsyncClient(transport=EmbeddedServerTransport(), base_url="http://skyvern-embedded")

View File

@@ -1,299 +1,172 @@
import asyncio
import typing
from typing import Any
import os
from typing import Any, overload
import httpx
from dotenv import load_dotenv
from playwright.async_api import Playwright, async_playwright
from skyvern.client import AsyncSkyvern
from skyvern.client.core.pydantic_utilities import parse_obj_as
from skyvern.client.environment import SkyvernEnvironment
from skyvern.client.types.get_run_response import GetRunResponse
from skyvern.client import AsyncSkyvern, BrowserSessionResponse, SkyvernEnvironment
from skyvern.client.types.task_run_response import TaskRunResponse
from skyvern.client.types.workflow_run_response import WorkflowRunResponse
from skyvern.config import settings
from skyvern.forge import app
from skyvern.forge.sdk.core import security, skyvern_context
from skyvern.forge.sdk.core.hashing import generate_url_hash
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType
from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Request, TaskV2Status
from skyvern.forge.sdk.schemas.tasks import CreateTaskResponse, Task, TaskRequest, TaskResponse, TaskStatus
from skyvern.forge.sdk.services.local_org_auth_token_service import SKYVERN_LOCAL_DOMAIN, SKYVERN_LOCAL_ORG
from skyvern.forge.sdk.services.org_auth_token_service import API_KEY_LIFETIME
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus
from skyvern.library.constants import DEFAULT_AGENT_HEARTBEAT_INTERVAL, DEFAULT_AGENT_TIMEOUT
from skyvern.library.constants import DEFAULT_AGENT_HEARTBEAT_INTERVAL, DEFAULT_AGENT_TIMEOUT, DEFAULT_CDP_PORT
from skyvern.library.embedded_server_factory import create_embedded_server
from skyvern.library.skyvern_browser import SkyvernBrowser
from skyvern.schemas.run_blocks import CredentialType
from skyvern.schemas.runs import CUA_ENGINES, ProxyLocation, RunEngine, RunStatus, RunType
from skyvern.services import run_service, task_v1_service, task_v2_service
from skyvern.utils import migrate_db
from skyvern.utils.env_paths import resolve_backend_env_path
from skyvern.schemas.runs import ProxyLocation, RunEngine, RunStatus
class Skyvern(AsyncSkyvern):
"""Main entry point for the Skyvern SDK.
This class provides methods to launch and connect to browsers (both local and cloud-hosted),
and access the Skyvern API client for task and workflow management. It combines browser
automation capabilities with AI-powered task execution.
Example:
```python
# Initialize with remote environment and API key
skyvern = Skyvern(environment=SkyvernEnvironment.CLOUD, api_key="your-api-key")
# Or in embedded mode (run `skyvern quickstart` first):
skyvern = Skyvern()
# Launch a local browser
browser = await skyvern.launch_local_browser(headless=False)
page = await browser.get_working_page()
# Or use a cloud browser (works only in cloud environment)
browser = await skyvern.use_cloud_browser()
page = await browser.get_working_page()
# Execute AI-powered tasks
await page.run.run_task("Fill out the form and submit it")
```
You can also mix AI-powered tasks with direct browser control in the same session:
```python
# Create credentials via API
credential = await skyvern.create_credential(
name="my_user",
credential_type="password",
credential=NonEmptyPasswordCredential(username="user@example.com", password="my_password"),
)
# Get a browser page
browser = await skyvern.launch_cloud_browser()
page = await browser.get_working_page()
# Navigate manually
await page.goto("https://example.com")
# Use AI to handle login
await page.run.login(
credential_type=CredentialType.skyvern,
credential_id=credential.credential_id,
)
# Continue with manual browser control
await page.click("#invoices-button")
await page.fill("#search", "my invoice")
await page.screenshot(path="screenshot.png", full_page=True)
```
"""
@overload
def __init__(
self,
*,
environment: SkyvernEnvironment,
api_key: str,
base_url: str | None = None,
api_key: str | None = None,
cdp_url: str | None = None,
browser_path: str | None = None,
browser_type: str | None = None,
environment: SkyvernEnvironment = SkyvernEnvironment.CLOUD,
timeout: float | None = None,
follow_redirects: bool | None = True,
httpx_client: httpx.AsyncClient | None = None,
) -> None:
super().__init__(
base_url=base_url,
api_key=api_key,
environment=environment,
timeout=timeout,
follow_redirects=follow_redirects,
httpx_client=httpx_client,
)
if base_url is None and api_key is None:
env_path = resolve_backend_env_path()
if not env_path.exists():
raise Exception("No .env file found. Please run 'skyvern init' first to set up your environment.")
"""Remote mode: Connect to Skyvern Cloud or self-hosted instance.
load_dotenv(env_path)
migrate_db()
Args:
environment: The Skyvern environment to connect to. Use SkyvernEnvironment.CLOUD
for Skyvern Cloud or SkyvernEnvironment.PRODUCTION/STAGING for self-hosted
instances.
api_key: API key for authenticating with Skyvern.
Can be found on the settings page: https://app.skyvern.com/settings
base_url: Override the base URL for the Skyvern API. If not provided, uses the default URL for
the specified environment.
timeout: Timeout in seconds for API requests. If not provided, uses the default timeout.
follow_redirects: Whether to automatically follow HTTP redirects. Defaults to True.
httpx_client: Custom httpx AsyncClient for making API requests.
If not provided, a default client will be created.
"""
...
self._api_key = api_key
self._cdp_url = cdp_url
self._browser_path = browser_path
self._browser_type = browser_type
if browser_path:
# TODO validate browser_path
# Supported Browsers: Google Chrome, Brave Browser, Microsoft Edge, Firefox
if "Chrome" in browser_path or "Brave" in browser_path or "Edge" in browser_path:
self._cdp_url = "http://127.0.0.1:9222"
settings.BROWSER_TYPE = "cdp-connect"
settings.BROWSER_REMOTE_DEBUGGING_URL = self._cdp_url
settings.CHROME_EXECUTABLE_PATH = browser_path
else:
raise ValueError(
f"Unsupported browser or invalid path: {browser_path}. "
"Here's a list of supported browsers Skyvern can connect to: Google Chrome, Brave Browser, Microsoft Edge, Firefox."
)
elif cdp_url:
self._cdp_url = cdp_url
settings.BROWSER_TYPE = "cdp-connect"
settings.BROWSER_REMOTE_DEBUGGING_URL = self._cdp_url
elif base_url is None and api_key is None:
if not browser_type:
# if "BROWSER_TYPE" not in os.environ:
# raise Exception("browser type is missing")
browser_type = "chromium-headful"
self._browser_type = browser_type
settings.BROWSER_TYPE = browser_type
elif api_key:
self._api_key = api_key
else:
raise ValueError("Initializing Skyvern failed: api_key must be provided")
async def get_organization(self) -> Organization:
organization = await app.DATABASE.get_organization_by_domain(SKYVERN_LOCAL_DOMAIN)
if not organization:
organization = await app.DATABASE.create_organization(
organization_name=SKYVERN_LOCAL_ORG,
domain=SKYVERN_LOCAL_DOMAIN,
max_steps_per_run=10,
max_retries_per_step=3,
)
api_key = security.create_access_token(
organization.organization_id,
expires_delta=API_KEY_LIFETIME,
)
# generate OrganizationAutoToken
await app.DATABASE.create_org_auth_token(
organization_id=organization.organization_id,
token=api_key,
token_type=OrganizationAuthTokenType.api,
)
return organization
async def _run_task(
@overload
def __init__(
self,
organization: Organization,
task: Task,
max_steps: int | None = None,
engine: RunEngine = RunEngine.skyvern_v1,
*,
openai_api_key: str | None = None,
) -> None:
org_auth_token = await app.DATABASE.get_valid_org_auth_token(
organization_id=organization.organization_id,
token_type=OrganizationAuthTokenType.api.value,
)
"""Embedded mode: Run Skyvern locally in-process.
step = await app.DATABASE.create_step(
task.task_id,
order=0,
retry_index=0,
organization_id=organization.organization_id,
)
updated_task = await app.DATABASE.update_task(
task.task_id,
status=TaskStatus.running,
organization_id=organization.organization_id,
)
try:
context: skyvern_context.SkyvernContext | None = skyvern_context.current()
current_run_id = context.run_id if context and context.run_id else task.task_id
skyvern_context.set(
SkyvernContext(
organization_id=organization.organization_id,
organization_name=organization.organization_name,
task_id=task.task_id,
run_id=current_run_id,
max_steps_override=max_steps,
)
)
To use this mode, run `skyvern quickstart` first.
step, _, _ = await app.agent.execute_step(
organization=organization,
task=updated_task,
step=step,
api_key=org_auth_token.token if org_auth_token else None,
engine=engine,
)
finally:
skyvern_context.reset()
Args:
openai_api_key: Optional OpenAI API key override for LLM operations.
If not provided, the one from the .env file will be used.
"""
...
async def _run_task_v2(self, organization: Organization, task_v2: TaskV2) -> None:
# mark task v2 as queued
await app.DATABASE.update_task_v2(
task_v2_id=task_v2.observer_cruise_id,
status=TaskV2Status.queued,
organization_id=organization.organization_id,
)
assert task_v2.workflow_run_id
await app.DATABASE.update_workflow_run(
workflow_run_id=task_v2.workflow_run_id,
status=WorkflowRunStatus.queued,
)
await task_v2_service.run_task_v2(
organization=organization,
task_v2_id=task_v2.observer_cruise_id,
)
async def create_task_v1(
def __init__(
self,
task_request: TaskRequest,
) -> CreateTaskResponse:
organization = await self.get_organization()
*,
environment: SkyvernEnvironment | None = None,
openai_api_key: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
timeout: float | None = None,
follow_redirects: bool | None = True,
httpx_client: httpx.AsyncClient | None = None,
):
if environment is None:
if httpx_client is not None:
raise ValueError("httpx_client is not supported in embedded mode")
created_task = await app.agent.create_task(task_request, organization.organization_id)
if not os.path.exists(".env"):
raise ValueError("Please run `skyvern quickstart` to set up your local Skyvern environment")
asyncio.create_task(self._run_task(organization, created_task, max_steps=task_request.max_steps_per_run))
return CreateTaskResponse(task_id=created_task.task_id)
load_dotenv(".env")
api_key = os.getenv("SKYVERN_API_KEY")
if not api_key:
raise ValueError("SKYVERN_API_KEY is not set. Provide api_key or set SKYVERN_API_KEY in .env file.")
async def get_task(
self,
task_id: str,
) -> TaskResponse | None:
organization = await self.get_organization()
task = await app.DATABASE.get_task(task_id, organization.organization_id)
if task is None:
return None
latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=organization.organization_id)
if not latest_step:
return await app.agent.build_task_response(task=task)
failure_reason: str | None = None
if task.status == TaskStatus.failed and (task.failure_reason):
failure_reason = ""
if task.failure_reason:
failure_reason += task.failure_reason or ""
if latest_step.output is not None and latest_step.output.actions_and_results is not None:
action_results_string: list[str] = []
for action, results in latest_step.output.actions_and_results:
if len(results) == 0:
continue
if results[-1].success:
continue
action_results_string.append(f"{action.action_type} action failed.")
if len(action_results_string) > 0:
failure_reason += "(Exceptions: " + str(action_results_string) + ")"
return await app.agent.build_task_response(
task=task, last_step=latest_step, failure_reason=failure_reason, need_browser_log=True
)
async def run_task_v1(
self,
task_request: TaskRequest,
timeout_seconds: int = 600,
) -> TaskResponse:
created_task = await self.create_task_v1(task_request)
async with asyncio.timeout(timeout_seconds):
while True:
task_response = await self.get_task(created_task.task_id)
assert task_response is not None
if task_response.status.is_final():
return task_response
await asyncio.sleep(1)
async def observer_task_v_2(self, task_request: TaskV2Request) -> TaskV2:
organization = await self.get_organization()
task_v2 = await task_v2_service.initialize_task_v2(
organization=organization,
user_prompt=task_request.user_prompt,
user_url=str(task_request.url) if task_request.url else None,
totp_identifier=task_request.totp_identifier,
totp_verification_url=task_request.totp_verification_url,
webhook_callback_url=task_request.webhook_callback_url,
proxy_location=task_request.proxy_location,
publish_workflow=task_request.publish_workflow,
)
if not task_v2.workflow_run_id:
raise Exception("Task v2 missing workflow run id")
asyncio.create_task(self._run_task_v2(organization, task_v2))
return task_v2
async def get_observer_task_v_2(self, task_id: str) -> TaskV2 | None:
organization = await self.get_organization()
return await app.DATABASE.get_task_v2(task_id, organization.organization_id)
async def run_observer_task_v_2(self, task_request: TaskV2Request, timeout_seconds: int = 600) -> TaskV2:
task_v2 = await self.observer_task_v_2(task_request)
async with asyncio.timeout(timeout_seconds):
while True:
refreshed_task_v2 = await self.get_observer_task_v_2(task_v2.observer_cruise_id)
assert refreshed_task_v2 is not None
if refreshed_task_v2.status.is_final():
return refreshed_task_v2
await asyncio.sleep(1)
############### officially supported interfaces ###############
async def get_run(self, run_id: str) -> GetRunResponse | None:
if not self._api_key:
organization = await self.get_organization()
get_run_internal_resp = await run_service.get_run_response(
run_id, organization_id=organization.organization_id
)
if not get_run_internal_resp:
return None
return typing.cast(
GetRunResponse,
parse_obj_as(
type_=GetRunResponse, # type: ignore
object_=get_run_internal_resp.model_dump(),
super().__init__(
environment=SkyvernEnvironment.LOCAL,
api_key=api_key,
timeout=timeout,
follow_redirects=follow_redirects,
httpx_client=create_embedded_server(
openai_api_key=openai_api_key,
),
)
else:
if not api_key:
raise ValueError(f"Missing api_key for {environment.name}")
return await super().get_run(run_id)
super().__init__(
base_url=base_url,
environment=environment,
api_key=api_key,
timeout=timeout,
follow_redirects=follow_redirects,
httpx_client=httpx_client,
)
self._environment = environment
self._api_key = api_key
self._playwright: Playwright | None = None
async def run_task(
self,
@@ -313,98 +186,32 @@ class Skyvern(AsyncSkyvern):
timeout: float = DEFAULT_AGENT_TIMEOUT,
browser_session_id: str | None = None,
user_agent: str | None = None,
extra_http_headers: dict[str, str] | None = None,
publish_workflow: bool | None = None,
include_action_history_in_verification: bool | None = None,
max_screenshot_scrolls: int | None = None,
browser_address: str | None = None,
) -> TaskRunResponse:
if not self._api_key:
if engine == RunEngine.skyvern_v1 or engine in CUA_ENGINES:
data_extraction_goal = None
navigation_goal = prompt
navigation_payload = None
organization = await self.get_organization()
task_generation = await task_v1_service.generate_task(
user_prompt=prompt,
organization=organization,
)
url = url or task_generation.url
navigation_goal = task_generation.navigation_goal or prompt
navigation_payload = task_generation.navigation_payload
data_extraction_goal = task_generation.data_extraction_goal
data_extraction_schema = data_extraction_schema or task_generation.extracted_information_schema
task_request = TaskRequest(
title=title or task_generation.suggested_title,
url=url,
model=model,
navigation_goal=navigation_goal,
navigation_payload=navigation_payload,
data_extraction_goal=data_extraction_goal,
extracted_information_schema=data_extraction_schema,
error_code_mapping=error_code_mapping,
proxy_location=proxy_location,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
browser_session_id=browser_session_id,
)
created_task = await app.agent.create_task(task_request, organization.organization_id)
url_hash = generate_url_hash(task_request.url)
await app.DATABASE.create_task_run(
task_run_type=RunType.task_v1,
organization_id=organization.organization_id,
run_id=created_task.task_id,
title=task_request.title,
url=task_request.url,
url_hash=url_hash,
)
try:
await self._run_task(organization, created_task, engine=engine)
run_obj = await self.get_run(run_id=created_task.task_id)
except Exception:
# TODO: better error handling and logging
run_obj = await self.get_run(run_id=created_task.task_id)
if not run_obj:
raise Exception("Failed to get the task run after creating the task.")
return from_run_to_task_run_response(run_obj)
elif engine == RunEngine.skyvern_v2:
# initialize task v2
organization = await self.get_organization()
task_v2 = await task_v2_service.initialize_task_v2(
organization=organization,
user_prompt=prompt,
user_url=url,
totp_identifier=totp_identifier,
totp_verification_url=totp_url,
webhook_callback_url=webhook_url,
proxy_location=proxy_location,
publish_workflow=False,
extracted_information_schema=data_extraction_schema,
error_code_mapping=error_code_mapping,
create_task_run=True,
model=model,
)
await self._run_task_v2(organization, task_v2)
run_obj = await self.get_run(run_id=task_v2.observer_cruise_id)
if not run_obj:
raise Exception("Failed to get the task run after creating the task.")
return from_run_to_task_run_response(run_obj)
else:
raise ValueError("Local mode is not supported for this method")
task_run = await super().run_task(
prompt=prompt,
engine=engine,
model=model,
url=url,
webhook_url=webhook_url,
totp_identifier=totp_identifier,
totp_url=totp_url,
title=title,
error_code_mapping=error_code_mapping,
data_extraction_schema=data_extraction_schema,
proxy_location=proxy_location,
max_steps=max_steps,
browser_session_id=browser_session_id,
user_agent=user_agent,
extra_http_headers=extra_http_headers,
publish_workflow=publish_workflow,
include_action_history_in_verification=include_action_history_in_verification,
max_screenshot_scrolls=max_screenshot_scrolls,
browser_address=browser_address,
)
if wait_for_completion:
@@ -427,13 +234,17 @@ class Skyvern(AsyncSkyvern):
totp_url: str | None = None,
totp_identifier: str | None = None,
browser_session_id: str | None = None,
max_steps_override: int | None = None,
user_agent: str | None = None,
browser_profile_id: str | None = None,
max_screenshot_scrolls: int | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
ai_fallback: bool | None = None,
run_with: str | None = None,
wait_for_completion: bool = False,
timeout: float = DEFAULT_AGENT_TIMEOUT,
) -> WorkflowRunResponse:
if not self._api_key:
raise ValueError(
"Local mode is not supported for run_workflow. Please instantiate Skyvern with an API key like this: Skyvern(api_key='your-api-key')"
)
workflow_run = await super().run_workflow(
workflow_id=workflow_id,
parameters=parameters,
@@ -444,6 +255,14 @@ class Skyvern(AsyncSkyvern):
totp_url=totp_url,
totp_identifier=totp_identifier,
browser_session_id=browser_session_id,
max_steps_override=max_steps_override,
user_agent=user_agent,
browser_profile_id=browser_profile_id,
max_screenshot_scrolls=max_screenshot_scrolls,
extra_http_headers=extra_http_headers,
browser_address=browser_address,
ai_fallback=ai_fallback,
run_with=run_with,
)
if wait_for_completion:
async with asyncio.timeout(timeout):
@@ -470,14 +289,16 @@ class Skyvern(AsyncSkyvern):
totp_identifier: str | None = None,
totp_url: str | None = None,
browser_session_id: str | None = None,
browser_address: str | None = None,
extra_http_headers: dict[str, str] | None = None,
max_screenshot_scrolling_times: int | None = None,
azure_vault_name: str | None = None,
azure_vault_username_key: str | None = None,
azure_vault_password_key: str | None = None,
azure_vault_totp_secret_key: str | None = None,
wait_for_completion: bool = False,
timeout: float = DEFAULT_AGENT_TIMEOUT,
) -> None:
if not self._api_key:
raise ValueError(
"Local mode is not supported for login. Please instantiate Skyvern with an API key like this: Skyvern(api_key='your-api-key')"
)
) -> WorkflowRunResponse:
workflow_run = await super().login(
credential_type=credential_type,
url=url,
@@ -492,7 +313,13 @@ class Skyvern(AsyncSkyvern):
totp_identifier=totp_identifier,
totp_url=totp_url,
browser_session_id=browser_session_id,
browser_address=browser_address,
extra_http_headers=extra_http_headers,
max_screenshot_scrolling_times=max_screenshot_scrolling_times,
azure_vault_name=azure_vault_name,
azure_vault_username_key=azure_vault_username_key,
azure_vault_password_key=azure_vault_password_key,
azure_vault_totp_secret_key=azure_vault_totp_secret_key,
)
if wait_for_completion:
async with asyncio.timeout(timeout):
@@ -503,6 +330,113 @@ class Skyvern(AsyncSkyvern):
await asyncio.sleep(DEFAULT_AGENT_HEARTBEAT_INTERVAL)
return WorkflowRunResponse.model_validate(workflow_run.model_dump())
async def launch_local_browser(self, *, headless: bool = False, port: int = DEFAULT_CDP_PORT) -> SkyvernBrowser:
"""Launch a new local Chromium browser with Chrome DevTools Protocol (CDP) enabled.
def from_run_to_task_run_response(run_obj: GetRunResponse) -> TaskRunResponse:
return TaskRunResponse.model_validate(run_obj.model_dump())
This method launches a browser on your local machine with remote debugging enabled,
allowing Skyvern to control it via CDP. Useful for development and debugging.
Args:
headless: Whether to run the browser in headless mode. Defaults to False.
port: The port number for the CDP endpoint. Defaults to DEFAULT_CDP_PORT.
Returns:
SkyvernBrowser: A browser instance with Skyvern capabilities.
"""
playwright = await self._get_playwright()
browser = await playwright.chromium.launch(
headless=headless,
args=[f"--remote-debugging-port={port}"],
)
browser_address = f"http://localhost:{port}"
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(self, browser_context, browser_address=browser_address)
async def connect_to_browser_over_cdp(self, cdp_url: str) -> SkyvernBrowser:
"""Connect to an existing browser instance via Chrome DevTools Protocol (CDP).
Use this to connect to a browser that's already running with CDP enabled,
whether local or remote.
Args:
cdp_url: The CDP WebSocket URL (e.g., "http://localhost:9222").
Returns:
SkyvernBrowser: A browser instance connected to the existing browser.
"""
playwright = await self._get_playwright()
browser = await playwright.chromium.connect_over_cdp(cdp_url)
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(self, browser_context, browser_address=cdp_url)
async def connect_to_cloud_browser_session(self, browser_session_id: str) -> SkyvernBrowser:
"""Connect to an existing cloud-hosted browser session by ID.
Args:
browser_session_id: The ID of the cloud browser session to connect to.
Returns:
SkyvernBrowser: A browser instance connected to the cloud session.
"""
self._ensure_cloud_environment()
browser_session = await self.get_browser_session(browser_session_id)
return await self._connect_to_cloud_browser_session(browser_session)
async def launch_cloud_browser(self) -> SkyvernBrowser:
"""Launch a new cloud-hosted browser session.
This creates a new browser session in Skyvern's cloud infrastructure and connects to it.
Returns:
SkyvernBrowser: A browser instance connected to the new cloud session.
"""
self._ensure_cloud_environment()
browser_session = await self.create_browser_session()
return await self._connect_to_cloud_browser_session(browser_session)
async def use_cloud_browser(self) -> SkyvernBrowser:
"""Get or create a cloud browser session.
This method attempts to reuse the most recent available cloud browser session.
If no session exists, it creates a new one. This is useful for cost efficiency
and session persistence.
Returns:
SkyvernBrowser: A browser instance connected to an existing or new cloud session.
"""
self._ensure_cloud_environment()
browser_sessions = await self.get_browser_sessions()
browser_session = max(
(s for s in browser_sessions if s.runnable_id is None), key=lambda s: s.started_at, default=None
)
if browser_session is None:
browser_session = await self.create_browser_session()
return await self._connect_to_cloud_browser_session(browser_session)
def _ensure_cloud_environment(self) -> None:
if self._environment not in (SkyvernEnvironment.CLOUD, SkyvernEnvironment.STAGING):
raise ValueError("Cloud browser sessions are supported only in the cloud environment")
async def _connect_to_cloud_browser_session(self, browser_session: BrowserSessionResponse) -> SkyvernBrowser:
if browser_session.browser_address is None:
raise ValueError(f"Browser address is missing for session {browser_session.browser_session_id}")
playwright = await self._get_playwright()
browser = await playwright.chromium.connect_over_cdp(
browser_session.browser_address, headers={"x-api-key": self._api_key}
)
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(self, browser_context, browser_session_id=browser_session.browser_session_id)
async def _get_playwright(self) -> Playwright:
if self._playwright is None:
self._playwright = await async_playwright().start()
return self._playwright
async def aclose(self) -> None:
"""Close Playwright and release resources."""
if self._playwright is not None:
try:
await self._playwright.stop()
finally:
self._playwright = None

View File

@@ -2,11 +2,10 @@ from typing import TYPE_CHECKING, Any
from playwright.async_api import BrowserContext, Page
from skyvern.client import AsyncSkyvern
from skyvern.library.skyvern_browser_page import SkyvernBrowserPage
if TYPE_CHECKING:
from skyvern.library.skyvern_sdk import SkyvernSdk
from skyvern.library.skyvern import Skyvern
class SkyvernBrowser(BrowserContext):
@@ -19,8 +18,8 @@ class SkyvernBrowser(BrowserContext):
Example:
```python
sdk = SkyvernSdk()
browser = await sdk.launch_local_browser()
skyvern = Skyvern()
browser = await skyvern.launch_local_browser()
# Get or create the working page
page = await browser.get_working_page()
@@ -38,14 +37,14 @@ class SkyvernBrowser(BrowserContext):
def __init__(
self,
sdk: "SkyvernSdk",
skyvern: "Skyvern",
browser_context: BrowserContext,
*,
browser_session_id: str | None = None,
browser_address: str | None = None,
):
super().__init__(browser_context)
self._sdk = sdk
self._skyvern = skyvern
self._browser_context = browser_context
self._browser_session_id = browser_session_id
self._browser_address = browser_address
@@ -73,12 +72,8 @@ class SkyvernBrowser(BrowserContext):
return self._browser_address
@property
def client(self) -> AsyncSkyvern:
return self._sdk.api
@property
def sdk(self) -> "SkyvernSdk":
return self._sdk
def skyvern(self) -> "Skyvern":
return self._skyvern
async def get_working_page(self) -> SkyvernBrowserPage:
"""Get the most recent page or create a new one if none exists.

View File

@@ -70,7 +70,7 @@ class SkyvernPageRun:
LOG.info("AI run task", prompt=prompt)
task_run = await self._browser.client.run_task(
task_run = await self._browser.skyvern.run_task(
prompt=prompt,
engine=engine,
model=model,
@@ -130,7 +130,7 @@ class SkyvernPageRun:
LOG.info("AI login", prompt=prompt)
workflow_run = await self._browser.client.login(
workflow_run = await self._browser.skyvern.login(
credential_type=credential_type,
url=url or self._get_page_url(),
credential_id=credential_id,
@@ -179,7 +179,7 @@ class SkyvernPageRun:
LOG.info("AI run workflow", workflow_id=workflow_id)
workflow_run = await self._browser.client.run_workflow(
workflow_run = await self._browser.skyvern.run_workflow(
workflow_id=workflow_id,
parameters=parameters,
template=template,
@@ -197,7 +197,7 @@ class SkyvernPageRun:
async def _wait_for_run_completion(self, run_id: str, timeout: float) -> GetRunResponse:
async with asyncio.timeout(timeout):
while True:
task_run = await self._browser.client.get_run(run_id)
task_run = await self._browser.skyvern.get_run(run_id)
if RunStatus(task_run.status).is_final():
break
await asyncio.sleep(DEFAULT_AGENT_HEARTBEAT_INTERVAL)

View File

@@ -42,7 +42,7 @@ class SdkSkyvernPageAi(SkyvernPageAi):
LOG.info("AI click", intention=intention, workflow_run_id=self._browser.workflow_run_id)
response = await self._browser.client.run_sdk_action(
response = await self._browser.skyvern.run_sdk_action(
url=self._page.url,
browser_session_id=self._browser.browser_session_id,
browser_address=self._browser.browser_address,
@@ -71,7 +71,7 @@ class SdkSkyvernPageAi(SkyvernPageAi):
LOG.info("AI input text", intention=intention, workflow_run_id=self._browser.workflow_run_id)
response = await self._browser.client.run_sdk_action(
response = await self._browser.skyvern.run_sdk_action(
url=self._page.url,
action=RunSdkActionRequestAction_AiInputText(
selector=selector,
@@ -101,7 +101,7 @@ class SdkSkyvernPageAi(SkyvernPageAi):
LOG.info("AI select option", intention=intention, workflow_run_id=self._browser.workflow_run_id)
response = await self._browser.client.run_sdk_action(
response = await self._browser.skyvern.run_sdk_action(
url=self._page.url,
action=RunSdkActionRequestAction_AiSelectOption(
selector=selector,
@@ -130,7 +130,7 @@ class SdkSkyvernPageAi(SkyvernPageAi):
LOG.info("AI upload file", intention=intention, workflow_run_id=self._browser.workflow_run_id)
response = await self._browser.client.run_sdk_action(
response = await self._browser.skyvern.run_sdk_action(
url=self._page.url,
action=RunSdkActionRequestAction_AiUploadFile(
selector=selector,
@@ -158,7 +158,7 @@ class SdkSkyvernPageAi(SkyvernPageAi):
LOG.info("AI extract", prompt=prompt, workflow_run_id=self._browser.workflow_run_id)
response = await self._browser.client.run_sdk_action(
response = await self._browser.skyvern.run_sdk_action(
url=self._page.url,
action=RunSdkActionRequestAction_Extract(
prompt=prompt,
@@ -182,7 +182,7 @@ class SdkSkyvernPageAi(SkyvernPageAi):
LOG.info("AI act", prompt=prompt, workflow_run_id=self._browser.workflow_run_id)
response = await self._browser.client.run_sdk_action(
response = await self._browser.skyvern.run_sdk_action(
url=self._page.url,
action=RunSdkActionRequestAction_AiAct(
intention=prompt,

View File

@@ -1,290 +0,0 @@
import os
from typing import Callable
import httpx
from dotenv import load_dotenv
from playwright.async_api import Playwright, async_playwright
from typing_extensions import overload
from skyvern.client import AsyncSkyvern, BrowserSessionResponse, SkyvernEnvironment
from skyvern.library.constants import DEFAULT_CDP_PORT
from skyvern.library.skyvern_browser import SkyvernBrowser
class SkyvernSdk:
"""Main entry point for the Skyvern SDK.
This class provides methods to launch and connect to browsers (both local and cloud-hosted),
and access the Skyvern API client for task and workflow management. It combines browser
automation capabilities with AI-powered task execution.
Example:
```python
# Initialize with remote environment and API key
skyvern = SkyvernSdk(environment=SkyvernEnvironment.CLOUD, api_key="your-api-key")
# Or in embedded mode (run `skyvern quickstart` first):
skyvern = SkyvernSdk()
# Launch a local browser
browser = await skyvern.launch_local_browser(headless=False)
page = await browser.get_working_page()
# Or use a cloud browser (works only in cloud environment)
browser = await skyvern.use_cloud_browser()
page = await browser.get_working_page()
# Execute AI-powered tasks
await page.run.run_task("Fill out the form and submit it")
```
You can also mix AI-powered tasks with direct browser control in the same session:
```python
# Create credentials via API
credential = await skyvern.api.create_credential(
name="my_user",
credential_type="password",
credential=NonEmptyPasswordCredential(username="user@example.com", password="my_password"),
)
# Get a browser page
browser = await skyvern.launch_cloud_browser()
page = await browser.get_working_page()
# Navigate manually
await page.goto("https://example.com")
# Use AI to handle login
await page.run.login(
credential_type=CredentialType.skyvern,
credential_id=credential.credential_id,
)
# Continue with manual browser control
await page.click("#invoices-button")
await page.fill("#search", "my invoice")
await page.screenshot(path="screenshot.png", full_page=True)
```
"""
@overload
def __init__(
self,
*,
environment: SkyvernEnvironment,
api_key: str,
base_url: str | None = None,
timeout: float | None = None,
follow_redirects: bool | None = True,
httpx_client: httpx.AsyncClient | None = None,
) -> None:
"""Remote mode: Connect to Skyvern Cloud or self-hosted instance.
Args:
environment: The Skyvern environment to connect to. Use SkyvernEnvironment.CLOUD
for Skyvern Cloud or SkyvernEnvironment.PRODUCTION/STAGING for self-hosted
instances.
api_key: API key for authenticating with Skyvern.
Can be found on the settings page: https://app.skyvern.com/settings
base_url: Override the base URL for the Skyvern API. If not provided, uses the default URL for
the specified environment.
timeout: Timeout in seconds for API requests. If not provided, uses the default timeout.
follow_redirects: Whether to automatically follow HTTP redirects. Defaults to True.
httpx_client: Custom httpx AsyncClient for making API requests.
If not provided, a default client will be created.
"""
...
@overload
def __init__(
self,
*,
open_api_key: str | None = None,
) -> None:
"""Embedded mode: Run Skyvern locally in-process.
To use this mode, run `skyvern quickstart` first.
Args:
open_api_key: Optional OpenAI API key override for LLM operations.
If not provided, the one from the .env file will be used.
"""
...
def __init__(
self,
*,
environment: SkyvernEnvironment | None = None,
open_api_key: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
timeout: float | None = None,
follow_redirects: bool | None = True,
httpx_client: httpx.AsyncClient | None = None,
):
if environment is None:
if httpx_client is not None:
raise ValueError("httpx_client is not supported in embedded mode")
if not os.path.exists(".env"):
raise ValueError("Please run `skyvern quickstart` to set up your local Skyvern environment")
load_dotenv(".env")
api_key = os.getenv("SKYVERN_API_KEY")
if not api_key:
raise ValueError("SKYVERN_API_KEY is not set. Provide api_key or set SKYVERN_API_KEY in .env file.")
def create_embedded_api() -> AsyncSkyvern:
from skyvern.library.embedded_server_factory import create_embedded_server # noqa: PLC0415
return create_embedded_server(
api_key=api_key,
open_api_key=open_api_key,
)
api_factory = create_embedded_api
else:
if not api_key:
raise ValueError(f"Missing api_key for {environment.name}")
def create_remote_api() -> AsyncSkyvern:
return AsyncSkyvern(
environment=environment,
base_url=base_url,
api_key=api_key,
timeout=timeout,
follow_redirects=follow_redirects,
httpx_client=httpx_client,
)
api_factory = create_remote_api
self._api_factory: Callable[[], AsyncSkyvern] = api_factory
self._environment = environment
self._api_key = api_key
self._api: AsyncSkyvern | None = None
self._playwright: Playwright | None = None
@property
def api(self) -> AsyncSkyvern:
"""Get the AsyncSkyvern API client for direct API access."""
if not self._api:
self._api = self._api_factory()
return self._api
async def launch_local_browser(self, *, headless: bool = False, port: int = DEFAULT_CDP_PORT) -> SkyvernBrowser:
"""Launch a new local Chromium browser with Chrome DevTools Protocol (CDP) enabled.
This method launches a browser on your local machine with remote debugging enabled,
allowing Skyvern to control it via CDP. Useful for development and debugging.
Args:
headless: Whether to run the browser in headless mode. Defaults to False.
port: The port number for the CDP endpoint. Defaults to DEFAULT_CDP_PORT.
Returns:
SkyvernBrowser: A browser instance with Skyvern capabilities.
"""
playwright = await self._get_playwright()
browser = await playwright.chromium.launch(
headless=headless,
args=[f"--remote-debugging-port={port}"],
)
browser_address = f"http://localhost:{port}"
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(self, browser_context, browser_address=browser_address)
async def connect_to_browser_over_cdp(self, cdp_url: str) -> SkyvernBrowser:
"""Connect to an existing browser instance via Chrome DevTools Protocol (CDP).
Use this to connect to a browser that's already running with CDP enabled,
whether local or remote.
Args:
cdp_url: The CDP WebSocket URL (e.g., "http://localhost:9222").
Returns:
SkyvernBrowser: A browser instance connected to the existing browser.
"""
playwright = await self._get_playwright()
browser = await playwright.chromium.connect_over_cdp(cdp_url)
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(self, browser_context, browser_address=cdp_url)
async def connect_to_cloud_browser_session(self, browser_session_id: str) -> SkyvernBrowser:
"""Connect to an existing cloud-hosted browser session by ID.
Args:
browser_session_id: The ID of the cloud browser session to connect to.
Returns:
SkyvernBrowser: A browser instance connected to the cloud session.
"""
if self._environment != SkyvernEnvironment.CLOUD and self._environment != SkyvernEnvironment.STAGING:
raise Exception("Cloud browser sessions are supported only in the cloud environment")
browser_session = await self.api.get_browser_session(browser_session_id)
return await self._connect_to_cloud_browser_session(browser_session)
async def launch_cloud_browser(self) -> SkyvernBrowser:
"""Launch a new cloud-hosted browser session.
This creates a new browser session in Skyvern's cloud infrastructure and connects to it.
Returns:
SkyvernBrowser: A browser instance connected to the new cloud session.
"""
if self._environment != SkyvernEnvironment.CLOUD and self._environment != SkyvernEnvironment.STAGING:
raise Exception("Cloud browser sessions are supported only in the cloud environment")
browser_session = await self.api.create_browser_session()
return await self._connect_to_cloud_browser_session(browser_session)
async def use_cloud_browser(self) -> SkyvernBrowser:
"""Get or create a cloud browser session.
This method attempts to reuse the most recent available cloud browser session.
If no session exists, it creates a new one. This is useful for cost efficiency
and session persistence.
Returns:
SkyvernBrowser: A browser instance connected to an existing or new cloud session.
"""
if self._environment != SkyvernEnvironment.CLOUD and self._environment != SkyvernEnvironment.STAGING:
raise Exception("Cloud browser sessions are supported only in the cloud environment")
browser_sessions = await self.api.get_browser_sessions()
browser_session = max(
(s for s in browser_sessions if s.runnable_id is None), key=lambda s: s.started_at, default=None
)
if browser_session is None:
browser_session = await self.api.create_browser_session()
return await self._connect_to_cloud_browser_session(browser_session)
async def _connect_to_cloud_browser_session(self, browser_session: BrowserSessionResponse) -> SkyvernBrowser:
if browser_session.browser_address is None:
raise Exception(f"Browser address is missing for session {browser_session.browser_session_id}")
playwright = await self._get_playwright()
browser = await playwright.chromium.connect_over_cdp(
browser_session.browser_address, headers={"x-api-key": self._api_key}
)
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(self, browser_context, browser_session_id=browser_session.browser_session_id)
async def _get_playwright(self) -> Playwright:
if self._playwright is None:
self._playwright = await async_playwright().start()
return self._playwright
async def aclose(self) -> None:
"""Close Playwright and release resources."""
if self._playwright is not None:
try:
await self._playwright.stop()
finally:
self._playwright = None