From 4ee3a52522af83985baaf17920833b10d7bf8eb5 Mon Sep 17 00:00:00 2001 From: Stanislav Novosad Date: Mon, 10 Nov 2025 16:08:57 -0700 Subject: [PATCH] SDK: embedded server mode (#3949) --- skyvern/library/embedded_server_factory.py | 21 +++ skyvern/library/local_server_runner.py | 131 ----------------- skyvern/library/skyvern_browser_page.py | 6 - skyvern/library/skyvern_browser_page_ai.py | 12 -- skyvern/library/skyvern_sdk.py | 156 ++++++++++++++------- 5 files changed, 127 insertions(+), 199 deletions(-) create mode 100644 skyvern/library/embedded_server_factory.py delete mode 100644 skyvern/library/local_server_runner.py diff --git a/skyvern/library/embedded_server_factory.py b/skyvern/library/embedded_server_factory.py new file mode 100644 index 00000000..f12e5f57 --- /dev/null +++ b/skyvern/library/embedded_server_factory.py @@ -0,0 +1,21 @@ +import httpx +from httpx import ASGITransport + +from skyvern.client import AsyncSkyvern, SkyvernEnvironment +from skyvern.config import settings +from skyvern.forge.api_app import app + + +def create_embedded_server( + api_key: str, + open_api_key: str | None, +) -> AsyncSkyvern: + if open_api_key: + settings.OPENAI_API_KEY = open_api_key + + transport = ASGITransport(app=app) + return AsyncSkyvern( + environment=SkyvernEnvironment.LOCAL, + api_key=api_key, + httpx_client=httpx.AsyncClient(transport=transport, base_url="http://skyvern-embedded"), + ) diff --git a/skyvern/library/local_server_runner.py b/skyvern/library/local_server_runner.py deleted file mode 100644 index 12afdce4..00000000 --- a/skyvern/library/local_server_runner.py +++ /dev/null @@ -1,131 +0,0 @@ -import asyncio -import atexit -import threading - -import httpx -import structlog -import uvicorn - -from skyvern.config import settings - -LOG = structlog.get_logger() - -# Global server tracker for cleanup -_server: uvicorn.Server | None = None -_server_thread: threading.Thread | None = None - - -async def _is_server_running(port: int) -> bool: - """Check if the server is running by making an HTTP request.""" - try: - async with httpx.AsyncClient(timeout=1.0) as client: - await client.get(f"http://localhost:{port}") - return True - except (httpx.ConnectError, httpx.TimeoutException): - return False - - -def _cleanup_on_exit() -> None: - """Synchronous cleanup handler for atexit.""" - global _server, _server_thread - - if _server is None: - return - - LOG.info("Shutting down local Skyvern server (atexit)...") - - # Signal server to exit - _server.should_exit = True - - # Wait for server thread to finish - if _server_thread is not None and _server_thread.is_alive(): - _server_thread.join(timeout=5.0) - - _server = None - _server_thread = None - - -async def _wait_for_server(port: int, timeout: float = 10.0, interval: float = 0.5) -> bool: - """Wait for the server to become available on the specified port.""" - start_time = asyncio.get_event_loop().time() - while asyncio.get_event_loop().time() - start_time < timeout: - if await _is_server_running(port): - return True - await asyncio.sleep(interval) - return False - - -async def ensure_local_server_running() -> None: - """Ensure a local Skyvern server is running. - - If the server is not running, starts it in a separate thread with its own event loop. - The server will automatically stop when the process exits. - """ - global _server, _server_thread - - port = settings.PORT - - # Check if server is already running - if await _is_server_running(port): - LOG.info(f"Local Skyvern server already running on port {port}") - return - - # Check if we already have a server instance - if _server is not None: - LOG.info("Local Skyvern server already started by this process") - return - - # Server not running, start it in a separate thread - LOG.info(f"Starting local Skyvern server on port {port}...") - - # Import here to avoid circular imports - from skyvern.forge.api_app import app # noqa: PLC0415 - - # Create uvicorn server configuration (disable reload in programmatic mode) - uvicorn_config = uvicorn.Config( - app=app, - host="127.0.0.1", - port=port, - log_level="error", - reload=False, - access_log=False, - ) - - _server = uvicorn.Server(uvicorn_config) - - # Run server in a separate thread with its own event loop - def _run_server_in_thread() -> None: - """Run the server in a separate thread with its own event loop.""" - asyncio.run(_server.serve()) - - _server_thread = threading.Thread(target=_run_server_in_thread, daemon=True, name="skyvern-server") - _server_thread.start() - - # Register atexit handler to ensure cleanup - atexit.register(_cleanup_on_exit) - - # Wait for server to start - if await _wait_for_server(port, timeout=10.0): - LOG.info("Local Skyvern server started successfully") - else: - LOG.error("Failed to start local Skyvern server (timeout)") - await _stop_local_server() - raise RuntimeError(f"Local Skyvern server failed to start on port {port}") - - -async def _stop_local_server() -> None: - """Stop the local server if it was started by this process.""" - global _server, _server_thread - - if _server is not None: - LOG.info("Shutting down local Skyvern server...") - _server.should_exit = True - - # Wait for server thread to finish (in a thread pool to avoid blocking event loop) - if _server_thread is not None and _server_thread.is_alive(): - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _server_thread.join, 5.0) - - _server_thread = None - _server = None - LOG.info("Local Skyvern server stopped") diff --git a/skyvern/library/skyvern_browser_page.py b/skyvern/library/skyvern_browser_page.py index 77b011cd..4c8b3d42 100644 --- a/skyvern/library/skyvern_browser_page.py +++ b/skyvern/library/skyvern_browser_page.py @@ -68,8 +68,6 @@ class SkyvernPageRun: TaskRunResponse containing the task execution results. """ - await self._browser.sdk.ensure_has_server() - LOG.info("AI run task", prompt=prompt) task_run = await self._browser.client.run_task( @@ -130,8 +128,6 @@ class SkyvernPageRun: WorkflowRunResponse containing the login workflow execution results. """ - await self._browser.sdk.ensure_has_server() - LOG.info("AI login", prompt=prompt) workflow_run = await self._browser.client.login( @@ -181,8 +177,6 @@ class SkyvernPageRun: WorkflowRunResponse containing the workflow execution results. """ - await self._browser.sdk.ensure_has_server() - LOG.info("AI run workflow", workflow_id=workflow_id) workflow_run = await self._browser.client.run_workflow( diff --git a/skyvern/library/skyvern_browser_page_ai.py b/skyvern/library/skyvern_browser_page_ai.py index 83d95911..a52957f3 100644 --- a/skyvern/library/skyvern_browser_page_ai.py +++ b/skyvern/library/skyvern_browser_page_ai.py @@ -40,8 +40,6 @@ class SdkSkyvernPageAi(SkyvernPageAi): ) -> str | None: """Click an element using AI via API call.""" - await self._browser.sdk.ensure_has_server() - LOG.info("AI click", intention=intention, workflow_run_id=self._browser.workflow_run_id) response = await self._browser.client.run_sdk_action( @@ -71,8 +69,6 @@ class SdkSkyvernPageAi(SkyvernPageAi): ) -> str: """Input text into an element using AI via API call.""" - await self._browser.sdk.ensure_has_server() - LOG.info("AI input text", intention=intention, workflow_run_id=self._browser.workflow_run_id) response = await self._browser.client.run_sdk_action( @@ -103,8 +99,6 @@ class SdkSkyvernPageAi(SkyvernPageAi): ) -> str: """Select an option from a dropdown using AI via API call.""" - await self._browser.sdk.ensure_has_server() - LOG.info("AI select option", intention=intention, workflow_run_id=self._browser.workflow_run_id) response = await self._browser.client.run_sdk_action( @@ -134,8 +128,6 @@ class SdkSkyvernPageAi(SkyvernPageAi): ) -> str: """Upload a file using AI via API call.""" - await self._browser.sdk.ensure_has_server() - LOG.info("AI upload file", intention=intention, workflow_run_id=self._browser.workflow_run_id) response = await self._browser.client.run_sdk_action( @@ -164,8 +156,6 @@ class SdkSkyvernPageAi(SkyvernPageAi): ) -> dict[str, Any] | list | str | None: """Extract information from the page using AI via API call.""" - await self._browser.sdk.ensure_has_server() - LOG.info("AI extract", prompt=prompt, workflow_run_id=self._browser.workflow_run_id) response = await self._browser.client.run_sdk_action( @@ -190,8 +180,6 @@ class SdkSkyvernPageAi(SkyvernPageAi): ) -> None: """Perform an action on the page using AI via API call.""" - await self._browser.sdk.ensure_has_server() - LOG.info("AI act", prompt=prompt, workflow_run_id=self._browser.workflow_run_id) response = await self._browser.client.run_sdk_action( diff --git a/skyvern/library/skyvern_sdk.py b/skyvern/library/skyvern_sdk.py index 2928099e..46c8acaa 100644 --- a/skyvern/library/skyvern_sdk.py +++ b/skyvern/library/skyvern_sdk.py @@ -1,12 +1,13 @@ import os +from typing import Callable import httpx from dotenv import load_dotenv from playwright.async_api import Playwright, async_playwright +from typing_extensions import overload from skyvern.client import AsyncSkyvern, BrowserSessionResponse, SkyvernEnvironment from skyvern.library.constants import DEFAULT_CDP_PORT -from skyvern.library.local_server_runner import ensure_local_server_running from skyvern.library.skyvern_browser import SkyvernBrowser @@ -19,14 +20,18 @@ class SkyvernSdk: Example: ```python - # Initialize with environment and API key + + # Initialize with remote environment and API key skyvern = SkyvernSdk(environment=SkyvernEnvironment.CLOUD, api_key="your-api-key") + # Or in embedded mode (run `skyvern quickstart` first): + skyvern = SkyvernSdk() + # Launch a local browser browser = await skyvern.launch_local_browser(headless=False) page = await browser.get_working_page() - # Or use a cloud browser + # Or use a cloud browser (works only in cloud environment) browser = await skyvern.use_cloud_browser() page = await browser.get_working_page() @@ -41,7 +46,7 @@ class SkyvernSdk: credential = await skyvern.api.create_credential( name="my_user", credential_type="password", - credential=NonEmptyPasswordCredential(username="user@example.com",password="secure_password"), + credential=NonEmptyPasswordCredential(username="user@example.com", password="my_password"), ) # Get a browser page @@ -64,60 +69,111 @@ class SkyvernSdk: ``` """ + @overload def __init__( self, *, - environment: SkyvernEnvironment = SkyvernEnvironment.LOCAL, + environment: SkyvernEnvironment, + api_key: str, + base_url: str | None = None, + timeout: float | None = None, + follow_redirects: bool | None = True, + httpx_client: httpx.AsyncClient | None = None, + ) -> None: + """Remote mode: Connect to Skyvern Cloud or self-hosted instance. + + Args: + environment: The Skyvern environment to connect to. Use SkyvernEnvironment.CLOUD + for Skyvern Cloud or SkyvernEnvironment.PRODUCTION/STAGING for self-hosted + instances. + api_key: API key for authenticating with Skyvern. + Can be found on the settings page: https://app.skyvern.com/settings + base_url: Override the base URL for the Skyvern API. If not provided, uses the default URL for + the specified environment. + timeout: Timeout in seconds for API requests. If not provided, uses the default timeout. + follow_redirects: Whether to automatically follow HTTP redirects. Defaults to True. + httpx_client: Custom httpx AsyncClient for making API requests. + If not provided, a default client will be created. + """ + ... + + @overload + def __init__( + self, + *, + open_api_key: str | None = None, + ) -> None: + """Embedded mode: Run Skyvern locally in-process. + + To use this mode, run `skyvern quickstart` first. + + Args: + open_api_key: Optional OpenAI API key override for LLM operations. + If not provided, the one from the .env file will be used. + """ + ... + + def __init__( + self, + *, + environment: SkyvernEnvironment | None = None, + open_api_key: str | None = None, base_url: str | None = None, api_key: str | None = None, timeout: float | None = None, follow_redirects: bool | None = True, httpx_client: httpx.AsyncClient | None = None, ): - """Initialize the Skyvern SDK client. + if environment is None: + if httpx_client is not None: + raise ValueError("httpx_client is not supported in embedded mode") - Args: - environment: The Skyvern environment to connect to (LOCAL or CLOUD). - base_url: Custom base URL for the Skyvern API. Overrides environment setting. - api_key: Skyvern API key. If not provided, loads from SKYVERN_API_KEY environment variable. - timeout: HTTP request timeout in seconds. - follow_redirects: Whether to follow HTTP redirects. Defaults to True. - httpx_client: Custom httpx.AsyncClient instance for HTTP requests. - - Raises: - Exception: If no API key is provided and no .env file exists. - """ - - self._environment = environment - - if api_key is None: - if os.path.exists(".env"): - load_dotenv(".env") - elif environment == SkyvernEnvironment.LOCAL: + if not os.path.exists(".env"): raise ValueError("Please run `skyvern quickstart` to set up your local Skyvern environment") - env_key = os.getenv("SKYVERN_API_KEY") - if not env_key: + load_dotenv(".env") + api_key = os.getenv("SKYVERN_API_KEY") + if not api_key: raise ValueError("SKYVERN_API_KEY is not set. Provide api_key or set SKYVERN_API_KEY in .env file.") - self._api_key = env_key + + def create_embedded_api() -> AsyncSkyvern: + from skyvern.library.embedded_server_factory import create_embedded_server # noqa: PLC0415 + + return create_embedded_server( + api_key=api_key, + open_api_key=open_api_key, + ) + + api_factory = create_embedded_api else: - self._api_key = api_key + if not api_key: + raise ValueError(f"Missing api_key for {environment.name}") - self._api = AsyncSkyvern( - environment=environment, - base_url=base_url, - api_key=self._api_key, - timeout=timeout, - follow_redirects=follow_redirects, - httpx_client=httpx_client, - ) + def create_remote_api() -> AsyncSkyvern: + return AsyncSkyvern( + environment=environment, + base_url=base_url, + api_key=api_key, + timeout=timeout, + follow_redirects=follow_redirects, + httpx_client=httpx_client, + ) + api_factory = create_remote_api + + self._api_factory: Callable[[], AsyncSkyvern] = api_factory + + self._environment = environment + self._api_key = api_key + + self._api: AsyncSkyvern | None = None self._playwright: Playwright | None = None - self._verified_has_server: bool = False @property def api(self) -> AsyncSkyvern: """Get the AsyncSkyvern API client for direct API access.""" + if not self._api: + self._api = self._api_factory() return self._api async def launch_local_browser(self, *, headless: bool = False, port: int = DEFAULT_CDP_PORT) -> SkyvernBrowser: @@ -168,7 +224,10 @@ class SkyvernSdk: Returns: SkyvernBrowser: A browser instance connected to the cloud session. """ - browser_session = await self._api.get_browser_session(browser_session_id) + if self._environment != SkyvernEnvironment.CLOUD and self._environment != SkyvernEnvironment.STAGING: + raise Exception("Cloud browser sessions are supported only in the cloud environment") + + browser_session = await self.api.get_browser_session(browser_session_id) return await self._connect_to_cloud_browser_session(browser_session) async def launch_cloud_browser(self) -> SkyvernBrowser: @@ -179,7 +238,10 @@ class SkyvernSdk: Returns: SkyvernBrowser: A browser instance connected to the new cloud session. """ - browser_session = await self._api.create_browser_session() + if self._environment != SkyvernEnvironment.CLOUD and self._environment != SkyvernEnvironment.STAGING: + raise Exception("Cloud browser sessions are supported only in the cloud environment") + + browser_session = await self.api.create_browser_session() return await self._connect_to_cloud_browser_session(browser_session) async def use_cloud_browser(self) -> SkyvernBrowser: @@ -192,23 +254,17 @@ class SkyvernSdk: Returns: SkyvernBrowser: A browser instance connected to an existing or new cloud session. """ - browser_sessions = await self._api.get_browser_sessions() + if self._environment != SkyvernEnvironment.CLOUD and self._environment != SkyvernEnvironment.STAGING: + raise Exception("Cloud browser sessions are supported only in the cloud environment") + + browser_sessions = await self.api.get_browser_sessions() browser_session = max( (s for s in browser_sessions if s.runnable_id is None), key=lambda s: s.started_at, default=None ) if browser_session is None: - browser_session = await self._api.create_browser_session() + browser_session = await self.api.create_browser_session() return await self._connect_to_cloud_browser_session(browser_session) - async def ensure_has_server(self) -> None: - if self._verified_has_server: - return - - if self._environment == SkyvernEnvironment.LOCAL: - await ensure_local_server_running() - - self._verified_has_server = True - async def _connect_to_cloud_browser_session(self, browser_session: BrowserSessionResponse) -> SkyvernBrowser: if browser_session.browser_address is None: raise Exception(f"Browser address is missing for session {browser_session.browser_session_id}")