Skyvern SDK Prototype (#3624)

This commit is contained in:
Stanislav Novosad
2025-10-17 13:15:24 -06:00
committed by GitHub
parent 770ddadc2f
commit fb24641212
9 changed files with 648 additions and 3 deletions

View File

@@ -1959,6 +1959,7 @@ class AsyncSkyvern:
totp_identifier: typing.Optional[str] = OMIT,
totp_url: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
browser_address: typing.Optional[str] = OMIT,
model: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT,
extra_http_headers: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
publish_workflow: typing.Optional[bool] = OMIT,
@@ -2041,6 +2042,9 @@ class AsyncSkyvern:
Run the task or workflow in the specific Skyvern browser session. Having a browser session can persist the real-time state of the browser, so that the next run can continue from where the previous run left off.
browser_address : typing.Optional[str]
The CDP address for the task
model : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]]
Optional model configuration.
@@ -2103,6 +2107,7 @@ class AsyncSkyvern:
"totp_identifier": totp_identifier,
"totp_url": totp_url,
"browser_session_id": browser_session_id,
"browser_address": browser_address,
"model": model,
"extra_http_headers": extra_http_headers,
"publish_workflow": publish_workflow,
@@ -2163,6 +2168,7 @@ class AsyncSkyvern:
totp_url: typing.Optional[str] = OMIT,
totp_identifier: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
browser_address: typing.Optional[str] = OMIT,
max_screenshot_scrolls: typing.Optional[int] = OMIT,
extra_http_headers: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
request_options: typing.Optional[RequestOptions] = None,
@@ -2226,6 +2232,9 @@ class AsyncSkyvern:
browser_session_id : typing.Optional[str]
ID of a Skyvern browser session to reuse, having it continue from the current screen state
browser_address : typing.Optional[str]
The CDP address for the workflow
max_screenshot_scrolls : typing.Optional[int]
The maximum number of scrolls for the post action screenshot. When it's None or 0, it takes the current viewpoint screenshot.
@@ -2275,6 +2284,7 @@ class AsyncSkyvern:
"totp_url": totp_url,
"totp_identifier": totp_identifier,
"browser_session_id": browser_session_id,
"browser_address": browser_address,
"max_screenshot_scrolls": max_screenshot_scrolls,
"extra_http_headers": extra_http_headers,
},
@@ -3757,6 +3767,7 @@ class AsyncSkyvern:
totp_identifier: typing.Optional[str] = OMIT,
totp_url: typing.Optional[str] = OMIT,
browser_session_id: typing.Optional[str] = OMIT,
browser_address: typing.Optional[str] = OMIT,
extra_http_headers: typing.Optional[typing.Dict[str, typing.Optional[str]]] = OMIT,
max_screenshot_scrolling_times: typing.Optional[int] = OMIT,
credential_id: typing.Optional[str] = OMIT,
@@ -3795,6 +3806,9 @@ class AsyncSkyvern:
browser_session_id : typing.Optional[str]
ID of the browser session to use, which is prefixed by `pbs_` e.g. `pbs_123456`
browser_address : typing.Optional[str]
The CDP address for the task
extra_http_headers : typing.Optional[typing.Dict[str, typing.Optional[str]]]
Additional HTTP headers to include in requests
@@ -3856,6 +3870,7 @@ class AsyncSkyvern:
"totp_identifier": totp_identifier,
"totp_url": totp_url,
"browser_session_id": browser_session_id,
"browser_address": browser_address,
"extra_http_headers": extra_http_headers,
"max_screenshot_scrolling_times": max_screenshot_scrolling_times,
"credential_id": credential_id,

View File

@@ -6,4 +6,4 @@ import enum
class SkyvernEnvironment(enum.Enum):
PRODUCTION = "https://api.skyvern.com"
STAGING = "https://api-staging.skyvern.com"
DEVELOPMENT = "http://localhost:8000"
LOCAL = "http://localhost:8000"

View File

@@ -194,6 +194,7 @@ async def login(
totp_identifier=login_request.totp_identifier,
totp_verification_url=login_request.totp_url,
browser_session_id=login_request.browser_session_id,
browser_address=login_request.browser_address,
max_screenshot_scrolls=login_request.max_screenshot_scrolling_times,
extra_http_headers=login_request.extra_http_headers,
)

View File

@@ -1,2 +1,3 @@
DEFAULT_AGENT_TIMEOUT = 1800 # 30 minutes
DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10 # 10 seconds
DEFAULT_CDP_PORT = 9222

View File

@@ -0,0 +1,77 @@
from playwright.async_api import BrowserContext, Page
from skyvern.client import AsyncSkyvern
from skyvern.library.skyvern_browser_page import SkyvernBrowserPage, SkyvernPageRun
class SkyvernBrowser:
"""A browser context wrapper that creates Skyvern-enabled pages.
This class wraps a Playwright BrowserContext and provides methods to create
SkyvernBrowserPage instances that combine traditional browser automation with
AI-powered task execution capabilities. It manages browser session state and
enables persistent browser sessions across multiple pages.
Example:
```python
sdk = SkyvernSdk()
browser = await sdk.launch_local_browser()
# Get or create the working page
page = await browser.get_working_page()
# Create a new page
new_page = await browser.new_page()
```
Attributes:
_browser_context: The underlying Playwright BrowserContext.
_browser_session_id: Optional session ID for persistent browser sessions.
_browser_address: Optional address for remote browser connections.
_client: The AsyncSkyvern client for API communication.
"""
def __init__(
self,
browser_context: BrowserContext,
client: AsyncSkyvern,
*,
browser_session_id: str | None = None,
browser_address: str | None = None,
):
self._browser_context = browser_context
self._browser_session_id = browser_session_id
self._browser_address = browser_address
self._client = client
async def get_working_page(self) -> SkyvernBrowserPage:
"""Get the most recent page or create a new one if none exists.
This method returns the last page in the browser context, or creates a new page
if the context has no pages. This is useful for continuing work on an existing
page without creating unnecessary new tabs.
Returns:
SkyvernBrowserPage: The most recent page wrapped with Skyvern capabilities.
"""
if self._browser_context.pages:
page = self._browser_context.pages[-1]
else:
page = await self._browser_context.new_page()
return await self._create_skyvern_page(page)
async def new_page(self) -> SkyvernBrowserPage:
"""Create a new page (tab) in the browser context.
This method always creates a new page, similar to opening a new tab in a browser.
The new page will have both Playwright's standard API and Skyvern's AI capabilities.
Returns:
SkyvernBrowserPage: A new page wrapped with Skyvern capabilities.
"""
page = await self._browser_context.new_page()
return await self._create_skyvern_page(page)
async def _create_skyvern_page(self, page: Page) -> SkyvernBrowserPage:
page_ai = SkyvernPageRun(page, self._browser_session_id, self._browser_address, self._client)
return SkyvernBrowserPage(page, page_ai)

View File

@@ -0,0 +1,296 @@
import asyncio
from typing import Any
from playwright.async_api import Page
from skyvern.client import AsyncSkyvern, GetRunResponse
from skyvern.client.types.workflow_run_response import WorkflowRunResponse
from skyvern.library.constants import DEFAULT_AGENT_HEARTBEAT_INTERVAL, DEFAULT_AGENT_TIMEOUT
from skyvern.schemas.run_blocks import CredentialType
from skyvern.schemas.runs import RunEngine, RunStatus, TaskRunResponse
class SkyvernPageRun:
"""Provides methods to run Skyvern tasks and workflows in the context of a browser page.
This class enables executing AI-powered browser automation tasks while sharing the
context of an existing browser page. It supports running custom tasks, login workflows,
and pre-defined workflows with automatic waiting for completion.
"""
def __init__(
self, page: Page, browser_session_id: str | None, browser_address: str | None, client: AsyncSkyvern
) -> None:
self._page = page
self._browser_session_id = browser_session_id
self._browser_address = browser_address
self._client = client
async def run_task(
self,
prompt: str,
engine: RunEngine = RunEngine.skyvern_v2,
model: dict[str, Any] | None = None,
url: str | None = None,
webhook_url: str | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
title: str | None = None,
error_code_mapping: dict[str, str] | None = None,
data_extraction_schema: dict[str, Any] | str | None = None,
max_steps: int | None = None,
timeout: float = DEFAULT_AGENT_TIMEOUT,
user_agent: str | None = None,
) -> TaskRunResponse:
"""Run a task in the context of this page and wait for it to finish.
Args:
prompt: Natural language description of the task to perform.
engine: The execution engine to use. Defaults to skyvern_v2.
model: LLM model configuration options.
url: URL to navigate to. If not provided, uses the current page URL.
webhook_url: URL to receive webhook notifications about task progress.
totp_identifier: Identifier for TOTP (Time-based One-Time Password) authentication.
totp_url: URL to fetch TOTP codes from.
title: Human-readable title for this task run.
error_code_mapping: Mapping of error codes to custom error messages.
data_extraction_schema: Schema defining what data to extract from the page.
max_steps: Maximum number of steps the agent can take.
timeout: Maximum time in seconds to wait for task completion.
user_agent: Custom user agent string to use.
Returns:
TaskRunResponse containing the task execution results.
"""
task_run = await self._client.run_task(
prompt=prompt,
engine=engine,
model=model,
url=url or self._get_page_url(),
webhook_url=webhook_url,
totp_identifier=totp_identifier,
totp_url=totp_url,
title=title,
error_code_mapping=error_code_mapping,
data_extraction_schema=data_extraction_schema,
max_steps=max_steps,
browser_session_id=self._browser_session_id,
browser_address=self._browser_address,
user_agent=user_agent,
)
task_run = await self._wait_for_run_completion(task_run.run_id, timeout)
return TaskRunResponse.model_validate(task_run.model_dump())
async def login(
self,
credential_type: CredentialType,
*,
url: str | None = None,
credential_id: str | None = None,
bitwarden_collection_id: str | None = None,
bitwarden_item_id: str | None = None,
onepassword_vault_id: str | None = None,
onepassword_item_id: str | None = None,
prompt: str | None = None,
webhook_url: str | None = None,
totp_identifier: str | None = None,
totp_url: str | None = None,
extra_http_headers: dict[str, str] | None = None,
timeout: float = DEFAULT_AGENT_TIMEOUT,
) -> WorkflowRunResponse:
"""Run a login task in the context of this page and wait for it to finish.
Args:
credential_type: Type of credential store to use (e.g., bitwarden, onepassword).
url: URL to navigate to for login. If not provided, uses the current page URL.
credential_id: ID of the credential to use.
bitwarden_collection_id: Bitwarden collection ID containing the credentials.
bitwarden_item_id: Bitwarden item ID for the credentials.
onepassword_vault_id: 1Password vault ID containing the credentials.
onepassword_item_id: 1Password item ID for the credentials.
prompt: Additional instructions for the login process.
webhook_url: URL to receive webhook notifications about login progress.
totp_identifier: Identifier for TOTP authentication.
totp_url: URL to fetch TOTP codes from.
extra_http_headers: Additional HTTP headers to include in requests.
timeout: Maximum time in seconds to wait for login completion.
Returns:
WorkflowRunResponse containing the login workflow execution results.
"""
workflow_run = await self._client.login(
credential_type=credential_type,
url=url or self._get_page_url(),
credential_id=credential_id,
bitwarden_collection_id=bitwarden_collection_id,
bitwarden_item_id=bitwarden_item_id,
onepassword_vault_id=onepassword_vault_id,
onepassword_item_id=onepassword_item_id,
prompt=prompt,
webhook_url=webhook_url,
totp_identifier=totp_identifier,
totp_url=totp_url,
browser_session_id=self._browser_session_id,
browser_address=self._browser_address,
extra_http_headers=extra_http_headers,
)
workflow_run = await self._wait_for_run_completion(workflow_run.run_id, timeout)
return WorkflowRunResponse.model_validate(workflow_run.model_dump())
async def run_workflow(
self,
workflow_id: str,
parameters: dict[str, Any] | None = None,
template: bool | None = None,
title: str | None = None,
webhook_url: str | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
timeout: float = DEFAULT_AGENT_TIMEOUT,
) -> WorkflowRunResponse:
"""Run a workflow in the context of this page and wait for it to finish.
Args:
workflow_id: ID of the workflow to execute.
parameters: Dictionary of parameters to pass to the workflow.
template: Whether this is a workflow template.
title: Human-readable title for this workflow run.
webhook_url: URL to receive webhook notifications about workflow progress.
totp_url: URL to fetch TOTP codes from.
totp_identifier: Identifier for TOTP authentication.
timeout: Maximum time in seconds to wait for workflow completion.
Returns:
WorkflowRunResponse containing the workflow execution results.
"""
workflow_run = await self._client.run_workflow(
workflow_id=workflow_id,
parameters=parameters,
template=template,
title=title,
webhook_url=webhook_url,
totp_url=totp_url,
totp_identifier=totp_identifier,
browser_session_id=self._browser_session_id,
browser_address=self._browser_address,
)
workflow_run = await self._wait_for_run_completion(workflow_run.run_id, timeout)
return WorkflowRunResponse.model_validate(workflow_run.model_dump())
async def _wait_for_run_completion(self, run_id: str, timeout: float) -> GetRunResponse:
async with asyncio.timeout(timeout):
while True:
task_run = await self._client.get_run(run_id)
if RunStatus(task_run.status).is_final():
break
await asyncio.sleep(DEFAULT_AGENT_HEARTBEAT_INTERVAL)
return task_run
def _get_page_url(self) -> str | None:
url = self._page.url
if url == "about:blank":
return None
return url
class SkyvernBrowserPage:
"""A browser page wrapper that combines Playwright's page API with Skyvern's AI capabilities.
This class provides a unified interface for both traditional browser automation (via Playwright)
and AI-powered task execution (via Skyvern). It exposes standard page methods like click, fill,
goto, etc., while also providing access to Skyvern's task and workflow execution through the
`run` attribute.
Example:
```python
# Use standard Playwright methods
await page.goto("https://example.com")
await page.fill("#username", "user@example.com")
await page.click("#login-button")
# Or use Skyvern's AI capabilities
await page.run.run_task("Fill out the contact form and submit it")
```
Attributes:
run: SkyvernPageRun instance for executing AI-powered tasks and workflows.
"""
def __init__(self, page: Page, run: SkyvernPageRun):
self.run = run
self._playwright_page = page
async def click(self, selector: str, **kwargs: Any) -> None:
"""Click an element matching the selector.
Args:
selector: A selector to search for an element to click.
**kwargs: Additional options like timeout, force, position, etc.
"""
await self._playwright_page.click(selector, **kwargs)
async def fill(self, selector: str, value: str, **kwargs: Any) -> None:
"""Fill an input field with the given value.
Args:
selector: A selector to search for an element to fill.
value: Value to fill for the input field.
**kwargs: Additional options like timeout, force, no_wait_after, etc.
"""
await self._playwright_page.fill(selector, value, **kwargs)
async def goto(self, url: str, **kwargs: Any) -> None:
"""Navigate to the given URL.
Args:
url: URL to navigate page to.
**kwargs: Additional options like timeout, wait_until, referer, etc.
"""
await self._playwright_page.goto(url, **kwargs)
async def type(self, selector: str, text: str, **kwargs: Any) -> None:
"""Type text into an element character by character.
Args:
selector: A selector to search for an element to type into.
text: Text to type into the element.
**kwargs: Additional options like delay, timeout, no_wait_after, etc.
"""
await self._playwright_page.type(selector, text, **kwargs)
async def select_option(self, selector: str, value: Any = None, **kwargs: Any) -> list[str]:
"""Select option(s) in a <select> element.
Args:
selector: A selector to search for a select element.
value: Option value(s) to select. Can be a string, list of strings, or dict with value/label/index.
**kwargs: Additional options like timeout, force, no_wait_after, etc.
Returns:
List of option values that have been successfully selected.
"""
return await self._playwright_page.select_option(selector, value, **kwargs)
async def reload(self, **kwargs: Any) -> None:
"""Reload the current page.
Args:
**kwargs: Additional options like timeout, wait_until, etc.
"""
await self._playwright_page.reload(**kwargs)
async def screenshot(self, **kwargs: Any) -> bytes:
"""Take a screenshot of the page.
Args:
**kwargs: Additional options like path, full_page, clip, type, quality, etc.
Returns:
bytes: The screenshot as bytes (unless path is specified, then saves to file).
"""
return await self._playwright_page.screenshot(**kwargs)

View File

@@ -0,0 +1,222 @@
import os
import httpx
from dotenv import load_dotenv
from playwright.async_api import Playwright, async_playwright
from skyvern.client import AsyncSkyvern, BrowserSessionResponse, SkyvernEnvironment
from skyvern.library.constants import DEFAULT_CDP_PORT
from skyvern.library.skyvern_browser import SkyvernBrowser
class SkyvernSdk:
"""Main entry point for the Skyvern SDK.
This class provides methods to launch and connect to browsers (both local and cloud-hosted),
and access the Skyvern API client for task and workflow management. It combines browser
automation capabilities with AI-powered task execution.
Example:
```python
# Initialize with environment and API key
skyvern = SkyvernSdk(environment=SkyvernEnvironment.PRODUCTION, api_key="your-api-key")
# Launch a local browser
browser = await skyvern.launch_local_browser(headless=False)
page = await browser.get_working_page()
# Or use a cloud browser
browser = await skyvern.use_cloud_browser()
page = await browser.get_working_page()
# Execute AI-powered tasks
await page.run.run_task("Fill out the form and submit it")
```
You can also mix AI-powered tasks with direct browser control in the same session:
```python
# Create credentials via API
credential = await skyvern.api.create_credential(
name="my_user",
credential_type="password",
credential=NonEmptyPasswordCredential(username="user@example.com",password="secure_password"),
)
# Get a browser page
browser = await skyvern.launch_cloud_browser()
page = await browser.get_working_page()
# Navigate manually
await page.goto("https://example.com")
# Use AI to handle login
await page.run.login(
credential_type=CredentialType.skyvern,
credential_id=credential.credential_id,
)
# Continue with manual browser control
await page.click("#invoices-button")
await page.fill("#search", "my invoice")
await page.screenshot(path="screenshot.png", full_page=True)
```
"""
def __init__(
self,
*,
environment: SkyvernEnvironment = SkyvernEnvironment.LOCAL,
base_url: str | None = None,
api_key: str | None = None,
timeout: float | None = None,
follow_redirects: bool | None = True,
httpx_client: httpx.AsyncClient | None = None,
):
"""Initialize the Skyvern SDK client.
Args:
environment: The Skyvern environment to connect to (LOCAL or PRODUCTION).
base_url: Custom base URL for the Skyvern API. Overrides environment setting.
api_key: Skyvern API key. If not provided, loads from SKYVERN_API_KEY environment variable.
timeout: HTTP request timeout in seconds.
follow_redirects: Whether to follow HTTP redirects. Defaults to True.
httpx_client: Custom httpx.AsyncClient instance for HTTP requests.
Raises:
Exception: If no API key is provided and no .env file exists.
"""
if api_key is None:
if os.path.exists(".env"):
load_dotenv(".env")
env_key = os.getenv("SKYVERN_API_KEY")
if not env_key:
raise ValueError(
"SKYVERN_API_KEY is not set. Provide api_key or set SKYVERN_API_KEY in environment/.env."
)
self._api_key = env_key
else:
self._api_key = api_key
self._api = AsyncSkyvern(
environment=environment,
base_url=base_url,
api_key=self._api_key,
x_api_key=self._api_key,
timeout=timeout,
follow_redirects=follow_redirects,
httpx_client=httpx_client,
)
self._playwright: Playwright | None = None
@property
def api(self) -> AsyncSkyvern:
"""Get the AsyncSkyvern API client for direct API access."""
return self._api
async def launch_local_browser(self, *, headless: bool = False, port: int = DEFAULT_CDP_PORT) -> SkyvernBrowser:
"""Launch a new local Chromium browser with Chrome DevTools Protocol (CDP) enabled.
This method launches a browser on your local machine with remote debugging enabled,
allowing Skyvern to control it via CDP. Useful for development and debugging.
Args:
headless: Whether to run the browser in headless mode. Defaults to False.
port: The port number for the CDP endpoint. Defaults to DEFAULT_CDP_PORT.
Returns:
SkyvernBrowser: A browser instance with Skyvern capabilities.
"""
playwright = await self._get_playwright()
browser = await playwright.chromium.launch(
headless=headless,
args=[f"--remote-debugging-port={port}"],
)
browser_address = f"http://localhost:{port}"
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(browser_context, self._api, browser_address=browser_address)
async def connect_to_browser_over_cdp(self, cdp_url: str) -> SkyvernBrowser:
"""Connect to an existing browser instance via Chrome DevTools Protocol (CDP).
Use this to connect to a browser that's already running with CDP enabled,
whether local or remote.
Args:
cdp_url: The CDP WebSocket URL (e.g., "http://localhost:9222").
Returns:
SkyvernBrowser: A browser instance connected to the existing browser.
"""
playwright = await self._get_playwright()
browser = await playwright.chromium.connect_over_cdp(cdp_url)
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(browser_context, self._api, browser_address=cdp_url)
async def connect_to_cloud_browser_session(self, browser_session_id: str) -> SkyvernBrowser:
"""Connect to an existing cloud-hosted browser session by ID.
Args:
browser_session_id: The ID of the cloud browser session to connect to.
Returns:
SkyvernBrowser: A browser instance connected to the cloud session.
"""
browser_session = await self._api.get_browser_session(browser_session_id)
return await self._connect_to_cloud_browser_session(browser_session)
async def launch_cloud_browser(self) -> SkyvernBrowser:
"""Launch a new cloud-hosted browser session.
This creates a new browser session in Skyvern's cloud infrastructure and connects to it.
Returns:
SkyvernBrowser: A browser instance connected to the new cloud session.
"""
browser_session = await self._api.create_browser_session()
return await self._connect_to_cloud_browser_session(browser_session)
async def use_cloud_browser(self) -> SkyvernBrowser:
"""Get or create a cloud browser session.
This method attempts to reuse the most recent available cloud browser session.
If no session exists, it creates a new one. This is useful for cost efficiency
and session persistence.
Returns:
SkyvernBrowser: A browser instance connected to an existing or new cloud session.
"""
browser_sessions = await self._api.get_browser_sessions()
browser_session = max(
(s for s in browser_sessions if s.runnable_id is None), key=lambda s: s.started_at, default=None
)
if browser_session is None:
browser_session = await self._api.create_browser_session()
return await self._connect_to_cloud_browser_session(browser_session)
async def _connect_to_cloud_browser_session(self, browser_session: BrowserSessionResponse) -> SkyvernBrowser:
if browser_session.browser_address is None:
raise Exception(f"Browser address is missing for session {browser_session.browser_session_id}")
playwright = await self._get_playwright()
browser = await playwright.chromium.connect_over_cdp(
browser_session.browser_address, headers={"x-api-key": self._api_key}
)
browser_context = browser.contexts[0] if browser.contexts else await browser.new_context()
return SkyvernBrowser(browser_context, self._api, browser_session_id=browser_session.browser_session_id)
async def _get_playwright(self) -> Playwright:
if self._playwright is None:
self._playwright = await async_playwright().start()
return self._playwright
async def aclose(self) -> None:
"""Close Playwright and release resources."""
if self._playwright is not None:
try:
await self._playwright.stop()
finally:
self._playwright = None

View File

@@ -30,6 +30,11 @@ class LoginRequest(BaseModel):
description="ID of the browser session to use, which is prefixed by `pbs_` e.g. `pbs_123456`",
examples=["pbs_123456"],
)
browser_address: str | None = Field(
default=None,
description="The CDP address for the task.",
examples=["http://127.0.0.1:9222", "ws://127.0.0.1:9222/devtools/browser/1234567890"],
)
extra_http_headers: dict[str, str] | None = Field(
default=None, description="Additional HTTP headers to include in requests"
)

View File

@@ -419,6 +419,13 @@ async def _create_headless_chromium(
extra_http_headers: dict[str, str] | None = None,
**kwargs: dict,
) -> tuple[BrowserContext, BrowserArtifacts, BrowserCleanupFunc]:
if browser_address := kwargs.get("browser_address"):
return await _connect_to_cdp_browser(
playwright,
remote_browser_url=str(browser_address),
extra_http_headers=extra_http_headers,
)
user_data_dir = make_temp_directory(prefix="skyvern_browser_")
download_dir = initialize_download_dir()
BrowserContextFactory.update_chromium_browser_preferences(
@@ -447,6 +454,13 @@ async def _create_headful_chromium(
extra_http_headers: dict[str, str] | None = None,
**kwargs: dict,
) -> tuple[BrowserContext, BrowserArtifacts, BrowserCleanupFunc]:
if browser_address := kwargs.get("browser_address"):
return await _connect_to_cdp_browser(
playwright,
remote_browser_url=str(browser_address),
extra_http_headers=extra_http_headers,
)
user_data_dir = make_temp_directory(prefix="skyvern_browser_")
download_dir = initialize_download_dir()
BrowserContextFactory.update_chromium_browser_preferences(
@@ -503,6 +517,13 @@ async def _create_cdp_connection_browser(
extra_http_headers: dict[str, str] | None = None,
**kwargs: dict,
) -> tuple[BrowserContext, BrowserArtifacts, BrowserCleanupFunc]:
if browser_address := kwargs.get("browser_address"):
return await _connect_to_cdp_browser(
playwright,
remote_browser_url=str(browser_address),
extra_http_headers=extra_http_headers,
)
browser_type = settings.BROWSER_TYPE
browser_path = settings.CHROME_EXECUTABLE_PATH
@@ -550,13 +571,20 @@ async def _create_cdp_connection_browser(
else:
LOG.info("Port 9222 is in use, using existing browser")
return await _connect_to_cdp_browser(playwright, settings.BROWSER_REMOTE_DEBUGGING_URL, extra_http_headers)
async def _connect_to_cdp_browser(
playwright: Playwright,
remote_browser_url: str,
extra_http_headers: dict[str, str] | None = None,
) -> tuple[BrowserContext, BrowserArtifacts, BrowserCleanupFunc]:
browser_args = BrowserContextFactory.build_browser_args(extra_http_headers=extra_http_headers)
browser_artifacts = BrowserContextFactory.build_browser_artifacts(
har_path=browser_args["record_har_path"],
)
remote_browser_url = settings.BROWSER_REMOTE_DEBUGGING_URL
LOG.info("Connecting browser CDP connection", remote_browser_url=remote_browser_url)
browser = await playwright.chromium.connect_over_cdp(remote_browser_url)
@@ -676,7 +704,7 @@ class BrowserState:
if not use_existing_page:
await self._close_all_other_pages()
if url:
if url and page.url.rstrip("/") != url.rstrip("/"):
await self.navigate_to_url(page=page, url=url)
async def navigate_to_url(self, page: Page, url: str, retry_times: int = NAVIGATION_MAX_RETRY_TIME) -> None: