From b5ff547a3a6e75b174eac745c564e855d8a4d58b Mon Sep 17 00:00:00 2001 From: Benji Visser Date: Wed, 21 Jan 2026 22:27:16 -0500 Subject: [PATCH] browser sessions v2 - backend (#4515) Signed-off-by: Benji Visser --- ...2_add_browser_session_v2_compute_fields.py | 35 ++++++++ skyvern/exceptions.py | 8 ++ skyvern/forge/sdk/artifact/storage/local.py | 66 ++++++++++++++- skyvern/forge/sdk/db/agent_db.py | 46 ++++++++++- skyvern/forge/sdk/db/models.py | 5 ++ skyvern/forge/sdk/db/polls.py | 2 +- .../sdk/routes/streaming/channels/vnc.py | 80 +++++++++++++++---- .../forge/sdk/routes/streaming/messages.py | 8 +- skyvern/forge/sdk/routes/streaming/verify.py | 3 +- skyvern/forge/sdk/routes/streaming/vnc.py | 5 +- .../schemas/persistent_browser_sessions.py | 6 ++ skyvern/webeye/browser_factory.py | 2 +- .../default_persistent_sessions_manager.py | 35 ++++++-- skyvern/webeye/persistent_sessions_manager.py | 1 + skyvern/webeye/schemas.py | 6 ++ 15 files changed, 273 insertions(+), 35 deletions(-) create mode 100644 alembic/versions/2026_01_22_0314-ce791d022652_add_browser_session_v2_compute_fields.py diff --git a/alembic/versions/2026_01_22_0314-ce791d022652_add_browser_session_v2_compute_fields.py b/alembic/versions/2026_01_22_0314-ce791d022652_add_browser_session_v2_compute_fields.py new file mode 100644 index 00000000..e9b8c4c0 --- /dev/null +++ b/alembic/versions/2026_01_22_0314-ce791d022652_add_browser_session_v2_compute_fields.py @@ -0,0 +1,35 @@ +"""add browser session v2 compute fields + +Revision ID: ce791d022652 +Revises: a720c991f779 +Create Date: 2026-01-22 03:14:43.766916+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "ce791d022652" +down_revision: Union[str, None] = "a720c991f779" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("persistent_browser_sessions", sa.Column("instance_type", sa.String(), nullable=True)) + op.add_column("persistent_browser_sessions", sa.Column("vcpu_millicores", sa.Integer(), nullable=True)) + op.add_column("persistent_browser_sessions", sa.Column("memory_mb", sa.Integer(), nullable=True)) + op.add_column("persistent_browser_sessions", sa.Column("duration_ms", sa.BigInteger(), nullable=True)) + op.add_column("persistent_browser_sessions", sa.Column("compute_cost", sa.Numeric(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("persistent_browser_sessions", "compute_cost") + op.drop_column("persistent_browser_sessions", "duration_ms") + op.drop_column("persistent_browser_sessions", "memory_mb") + op.drop_column("persistent_browser_sessions", "vcpu_millicores") + op.drop_column("persistent_browser_sessions", "instance_type") diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index acd4e4d7..36f02598 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -829,6 +829,14 @@ class BrowserSessionNotFound(SkyvernHTTPException): ) +class BrowserSessionStartupTimeout(SkyvernHTTPException): + def __init__(self, browser_session_id: str) -> None: + super().__init__( + f"Browser session {browser_session_id} failed to start within the timeout period.", + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + ) + + class BrowserProfileNotFound(SkyvernHTTPException): def __init__(self, profile_id: str, organization_id: str | None = None) -> None: message = f"Browser profile {profile_id} not found" diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index e9a85a9d..25275f56 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -289,14 +289,72 @@ class LocalStorage(BaseStorage): return [] async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]: - """List all recording files for a browser session (not implemented for local storage).""" - return [] + """List all recording files for a browser session from local storage. + + Videos are synced to the browser_sessions storage path when the session closes. + """ + videos_base = ( + Path(self.artifact_path) + / settings.ENV + / organization_id + / "browser_sessions" + / browser_session_id + / "videos" + ) + + recording_files: list[str] = [] + if videos_base.exists(): + for root, _, files in os.walk(videos_base): + for file in files: + file_path = Path(root) / file + recording_files.append(f"file://{file_path}") + + return recording_files async def get_shared_recordings_in_browser_session( self, organization_id: str, browser_session_id: str ) -> list[FileInfo]: - """Get recording files with URLs for a browser session (not implemented for local storage).""" - return [] + """Get recording files with URLs for a browser session from local storage.""" + file_uris = await self.list_recordings_in_browser_session(organization_id, browser_session_id) + if not file_uris: + return [] + + file_infos: list[FileInfo] = [] + for uri in file_uris: + uri_lower = uri.lower() + if not (uri_lower.endswith(".webm") or uri_lower.endswith(".mp4")): + LOG.warning( + "Skipping recording file with unsupported extension", + uri=uri, + organization_id=organization_id, + browser_session_id=browser_session_id, + ) + continue + + file_path = parse_uri_to_path(uri) + path_obj = Path(file_path) + + if not path_obj.exists(): + continue + + file_size = path_obj.stat().st_size + if file_size == 0: + continue + + modified_at = datetime.fromtimestamp(path_obj.stat().st_mtime) + checksum = calculate_sha256_for_file(file_path) + filename = path_obj.name + + file_info = FileInfo( + url=uri, + checksum=checksum, + filename=filename, + modified_at=modified_at, + ) + file_infos.append(file_info) + + file_infos.sort(key=lambda f: (f.modified_at is not None, f.modified_at), reverse=True) + return file_infos async def get_downloaded_files(self, organization_id: str, run_id: str | None) -> list[FileInfo]: download_dir = get_download_dir(run_id=run_id) diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 02796482..59e30126 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -4705,6 +4705,7 @@ class AgentDB(BaseAlchemyDB): status: str | None = None, timeout_minutes: int | None = None, organization_id: str | None = None, + completed_at: datetime | None = None, ) -> PersistentBrowserSession: try: async with self.Session() as session: @@ -4723,6 +4724,8 @@ class AgentDB(BaseAlchemyDB): persistent_browser_session.status = status if timeout_minutes: persistent_browser_session.timeout_minutes = timeout_minutes + if completed_at: + persistent_browser_session.completed_at = completed_at await session.commit() await session.refresh(persistent_browser_session) @@ -4741,7 +4744,7 @@ class AgentDB(BaseAlchemyDB): self, browser_session_id: str, browser_address: str | None, - ip_address: str, + ip_address: str | None, ecs_task_arn: str | None, organization_id: str | None = None, ) -> None: @@ -4779,6 +4782,47 @@ class AgentDB(BaseAlchemyDB): LOG.error("UnexpectedError", exc_info=True) raise + async def update_persistent_browser_session_compute_cost( + self, + session_id: str, + organization_id: str, + instance_type: str, + vcpu_millicores: int, + memory_mb: int, + duration_ms: int, + compute_cost: float, + ) -> None: + """Update the compute cost fields for a persistent browser session""" + try: + async with self.Session() as session: + persistent_browser_session = ( + await session.scalars( + select(PersistentBrowserSessionModel) + .filter_by(persistent_browser_session_id=session_id) + .filter_by(organization_id=organization_id) + .filter_by(deleted_at=None) + ) + ).first() + if persistent_browser_session: + persistent_browser_session.instance_type = instance_type + persistent_browser_session.vcpu_millicores = vcpu_millicores + persistent_browser_session.memory_mb = memory_mb + persistent_browser_session.duration_ms = duration_ms + persistent_browser_session.compute_cost = compute_cost + await session.commit() + await session.refresh(persistent_browser_session) + else: + raise NotFoundError(f"PersistentBrowserSession {session_id} not found") + except NotFoundError: + LOG.error("NotFoundError", exc_info=True) + raise + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + async def mark_persistent_browser_session_deleted(self, session_id: str, organization_id: str) -> None: """Mark a persistent browser session as deleted.""" try: diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index f7a063a0..6ab74a57 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -856,6 +856,11 @@ class PersistentBrowserSessionModel(Base): proxy_location = Column(String, nullable=True) extensions = Column(JSON, nullable=True) browser_type = Column(String, nullable=True) + instance_type = Column(String, nullable=True) + vcpu_millicores = Column(Integer, nullable=True) + memory_mb = Column(Integer, nullable=True) + duration_ms = Column(BigInteger, nullable=True) + compute_cost = Column(Numeric, nullable=True) started_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False, index=True) diff --git a/skyvern/forge/sdk/db/polls.py b/skyvern/forge/sdk/db/polls.py index 1293aaea..4c2be575 100644 --- a/skyvern/forge/sdk/db/polls.py +++ b/skyvern/forge/sdk/db/polls.py @@ -33,7 +33,7 @@ async def await_browser_session( if persistent_browser_session is None: raise Exception(f"Persistent browser session not found for {session_id}") - LOG.info( + LOG.debug( "Checking browser address", session_id=session_id, address=persistent_browser_session.browser_address, diff --git a/skyvern/forge/sdk/routes/streaming/channels/vnc.py b/skyvern/forge/sdk/routes/streaming/channels/vnc.py index 36f3d2dd..0b5fc3a1 100644 --- a/skyvern/forge/sdk/routes/streaming/channels/vnc.py +++ b/skyvern/forge/sdk/routes/streaming/channels/vnc.py @@ -281,6 +281,43 @@ async def ask_for_clipboard(vnc_channel: VncChannel) -> None: LOG.exception(f"{class_name} Failed to ask for clipboard via CDP", **vnc_channel.identity) +# TODO(benji): I hate this function. It's messy and gross. Once we remove v1, +# we should clean this up. +def _build_vnc_url_from_browser_address(browser_address: str) -> str | None: + """ + Build a routed VNC URL from a V2 K8s routed browser_address. + + V2 K8s routed browser_address format: + wss://{domain}/{session_id}/{token}/devtools/browser/{browser_id} + + Returns VNC URL in format: + wss://{domain}/vnc/{session_id}/{token} + + Returns None if browser_address is not a V2 routed URL. + """ + if not browser_address: + return None + + parsed = urlparse(browser_address) + + # Check if this looks like a V2 routed URL (wss:// with token in path) + if parsed.scheme not in ("wss", "ws"): + return None + + # Parse path: /{session_id}/{token}/devtools/browser/{browser_id} + path_parts = parsed.path.strip("/").split("/") + if len(path_parts) < 4 or path_parts[2] != "devtools": + return None + + session_id = path_parts[0] + token = path_parts[1] + domain = parsed.netloc + + # Build VNC URL with same domain and token + scheme = "wss" if parsed.scheme == "wss" else "ws" + return f"{scheme}://{domain}/vnc/{session_id}/{token}" + + async def loop_stream_vnc(vnc_channel: VncChannel) -> None: """ Actually stream the VNC data between a frontend and a browser. @@ -292,24 +329,28 @@ async def loop_stream_vnc(vnc_channel: VncChannel) -> None: browser_session = vnc_channel.browser_session class_name = vnc_channel.class_name - if browser_session: - if browser_session.ip_address: - if ":" in browser_session.ip_address: - ip, _ = browser_session.ip_address.split(":") - vnc_url = f"ws://{ip}:{vnc_channel.vnc_port}" - else: - vnc_url = f"ws://{browser_session.ip_address}:{vnc_channel.vnc_port}" - else: - browser_address = browser_session.browser_address - - parsed_browser_address = urlparse(browser_address) - host = parsed_browser_address.hostname - vnc_url = f"ws://{host}:{vnc_channel.vnc_port}" - else: + if not browser_session: raise Exception(f"{class_name} No browser session associated with vnc channel.") - # NOTE(jdo:streaming-local-dev) - # vnc_url = "ws://localhost:6080" + # First, check if this is a V2 K8s routed session by examining browser_address + # V2 sessions have browser_address like: wss://{domain}/{session_id}/{token}/devtools/... + # For these, we need to route VNC through the same nginx proxy + routed_vnc_url = _build_vnc_url_from_browser_address(browser_session.browser_address) + if routed_vnc_url: + vnc_url = routed_vnc_url + elif browser_session.ip_address: + # V1 ECS sessions: Direct IP connection (ip_address is a public/reachable IP) + if ":" in browser_session.ip_address: + ip, _ = browser_session.ip_address.split(":") + vnc_url = f"ws://{ip}:{vnc_channel.vnc_port}" + else: + vnc_url = f"ws://{browser_session.ip_address}:{vnc_channel.vnc_port}" + else: + # Last resort: parse browser_address hostname + browser_address = browser_session.browser_address + parsed_browser_address = urlparse(browser_address) + host = parsed_browser_address.hostname + vnc_url = f"ws://{host}:{vnc_channel.vnc_port}" LOG.info( f"{class_name} Connecting to vnc url.", @@ -317,7 +358,12 @@ async def loop_stream_vnc(vnc_channel: VncChannel) -> None: **vnc_channel.identity, ) - async with websockets.connect(vnc_url) as novnc_ws: + # For routed VNC URLs (wss://), we need to pass the x-api-key header for authentication + extra_headers: dict[str, str] = {} + if vnc_url.startswith("wss://") and vnc_channel.x_api_key: + extra_headers["x-api-key"] = vnc_channel.x_api_key + + async with websockets.connect(vnc_url, additional_headers=extra_headers) as novnc_ws: async def frontend_to_browser() -> None: nonlocal class_name diff --git a/skyvern/forge/sdk/routes/streaming/messages.py b/skyvern/forge/sdk/routes/streaming/messages.py index 03ffc7f4..3a890974 100644 --- a/skyvern/forge/sdk/routes/streaming/messages.py +++ b/skyvern/forge/sdk/routes/streaming/messages.py @@ -5,8 +5,10 @@ Provides WS endpoints for streaming messages to/from our frontend application. import structlog from fastapi import WebSocket +from skyvern.config import settings from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router -from skyvern.forge.sdk.routes.streaming.auth import auth +from skyvern.forge.sdk.routes.streaming.auth import _auth as local_auth +from skyvern.forge.sdk.routes.streaming.auth import auth as real_auth from skyvern.forge.sdk.routes.streaming.channels.message import ( Loops, MessageChannel, @@ -60,6 +62,7 @@ async def messages( client_id: str | None = None, token: str | None = None, ) -> None: + auth = local_auth if settings.ENV == "local" else real_auth organization_id = await auth(apikey=apikey, token=token, websocket=websocket) if not organization_id: @@ -98,7 +101,7 @@ async def messages( ) else: LOG.error( - "Message channel: no browser_session_id or workflow_run_id provided.", + "[WS] messages: no browser_session_id or workflow_run_id provided", client_id=client_id, organization_id=organization_id, ) @@ -139,5 +142,4 @@ async def messages( workflow_run_id=workflow_run_id, organization_id=organization_id, ) - await message_channel.close(reason="message-stream-closed") diff --git a/skyvern/forge/sdk/routes/streaming/verify.py b/skyvern/forge/sdk/routes/streaming/verify.py index 54bbf27b..25ecff89 100644 --- a/skyvern/forge/sdk/routes/streaming/verify.py +++ b/skyvern/forge/sdk/routes/streaming/verify.py @@ -45,12 +45,12 @@ async def verify_browser_session( """ Verify the browser session exists, and is usable. """ - if settings.ENV == "local": dummy_browser_session = AddressablePersistentBrowserSession( persistent_browser_session_id=browser_session_id, organization_id=organization_id, browser_address="0.0.0.0:9223", + ip_address="localhost", created_at=datetime.now(), modified_at=datetime.now(), ) @@ -199,6 +199,7 @@ async def verify_workflow_run( persistent_browser_session_id=workflow_run_id, organization_id=organization_id, browser_address="0.0.0.0:9223", + ip_address="localhost", created_at=datetime.now(), modified_at=datetime.now(), ) diff --git a/skyvern/forge/sdk/routes/streaming/vnc.py b/skyvern/forge/sdk/routes/streaming/vnc.py index 9e81efff..12ef247f 100644 --- a/skyvern/forge/sdk/routes/streaming/vnc.py +++ b/skyvern/forge/sdk/routes/streaming/vnc.py @@ -12,8 +12,10 @@ NOTE(jdo:streaming-local-dev) import structlog from fastapi import WebSocket +from skyvern.config import settings from skyvern.forge.sdk.routes.routers import base_router, legacy_base_router -from skyvern.forge.sdk.routes.streaming.auth import auth +from skyvern.forge.sdk.routes.streaming.auth import _auth as local_auth +from skyvern.forge.sdk.routes.streaming.auth import auth as real_auth from skyvern.forge.sdk.routes.streaming.channels.vnc import ( Loops, VncChannel, @@ -86,6 +88,7 @@ async def stream( workflow_run_id=workflow_run_id, ) + auth = local_auth if settings.ENV == "local" else real_auth organization_id = await auth(apikey=apikey, token=token, websocket=websocket) if not organization_id: diff --git a/skyvern/forge/sdk/schemas/persistent_browser_sessions.py b/skyvern/forge/sdk/schemas/persistent_browser_sessions.py index c25870ec..230314f4 100644 --- a/skyvern/forge/sdk/schemas/persistent_browser_sessions.py +++ b/skyvern/forge/sdk/schemas/persistent_browser_sessions.py @@ -1,4 +1,5 @@ from datetime import datetime +from decimal import Decimal from enum import StrEnum from pydantic import BaseModel, ConfigDict @@ -48,6 +49,11 @@ class PersistentBrowserSession(BaseModel): status: str | None = None timeout_minutes: int | None = None proxy_location: ProxyLocation | None = None + instance_type: str | None = None + vcpu_millicores: int | None = None + memory_mb: int | None = None + duration_ms: int | None = None + compute_cost: Decimal | None = None started_at: datetime | None = None completed_at: datetime | None = None created_at: datetime diff --git a/skyvern/webeye/browser_factory.py b/skyvern/webeye/browser_factory.py index d10afac5..56920b43 100644 --- a/skyvern/webeye/browser_factory.py +++ b/skyvern/webeye/browser_factory.py @@ -613,8 +613,8 @@ async def _create_cdp_connection_browser( "--remote-debugging-port=9222", "--no-first-run", "--no-default-browser-check", - "--remote-debugging-address=0.0.0.0", "--user-data-dir=./tmp/user_data_dir", + "--remote-debugging-address=0.0.0.0", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, diff --git a/skyvern/webeye/default_persistent_sessions_manager.py b/skyvern/webeye/default_persistent_sessions_manager.py index bac0f1a4..81be4a58 100644 --- a/skyvern/webeye/default_persistent_sessions_manager.py +++ b/skyvern/webeye/default_persistent_sessions_manager.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta, timezone from math import floor +from pathlib import Path import structlog from playwright._impl._errors import TargetClosedError @@ -161,10 +162,13 @@ async def update_status( organization_id=organization_id, browser_status=status, ) + + completed_at = datetime.now(timezone.utc) if is_final_status(status) else None persistent_browser_session = await db.update_persistent_browser_session( session_id, status=status, organization_id=organization_id, + completed_at=completed_at, ) return persistent_browser_session @@ -264,15 +268,14 @@ class DefaultPersistentSessionsManager(PersistentSessionsManager): timeout_minutes: int | None = None, extensions: list[Extensions] | None = None, browser_type: PersistentBrowserType | None = None, + is_high_priority: bool = False, ) -> PersistentBrowserSession: """Create a new browser session for an organization and return its ID with the browser state.""" - LOG.info( "Creating new browser session", organization_id=organization_id, ) - - browser_session_db = await self.database.create_persistent_browser_session( + return await self.database.create_persistent_browser_session( organization_id=organization_id, runnable_type=runnable_type, runnable_id=runnable_id, @@ -282,8 +285,6 @@ class DefaultPersistentSessionsManager(PersistentSessionsManager): browser_type=browser_type, ) - return browser_session_db - async def occupy_browser_session( self, session_id: str, @@ -324,7 +325,6 @@ class DefaultPersistentSessionsManager(PersistentSessionsManager): organization_id=organization_id, session_id=browser_session_id, ) - # Export session profile before closing (so it can be used to create browser profiles) browser_artifacts = browser_session.browser_state.browser_artifacts if browser_artifacts and browser_artifacts.browser_session_dir: @@ -346,6 +346,29 @@ class DefaultPersistentSessionsManager(PersistentSessionsManager): organization_id=organization_id, ) + if browser_artifacts and browser_artifacts.video_artifacts: + for video_artifact in browser_artifacts.video_artifacts: + if video_artifact.video_path: + try: + video_path = Path(video_artifact.video_path) + if video_path.exists(): + date = video_path.parent.name + await app.STORAGE.sync_browser_session_file( + organization_id=organization_id, + browser_session_id=browser_session_id, + artifact_type="videos", + local_file_path=str(video_path), + remote_path=video_path.name, + date=date, + ) + except Exception: + LOG.exception( + "Failed to sync video recording", + browser_session_id=browser_session_id, + organization_id=organization_id, + video_path=video_artifact.video_path, + ) + self._browser_sessions.pop(browser_session_id, None) try: diff --git a/skyvern/webeye/persistent_sessions_manager.py b/skyvern/webeye/persistent_sessions_manager.py index f053026f..8a406b9b 100644 --- a/skyvern/webeye/persistent_sessions_manager.py +++ b/skyvern/webeye/persistent_sessions_manager.py @@ -67,6 +67,7 @@ class PersistentSessionsManager(Protocol): timeout_minutes: int | None = None, extensions: list[Extensions] | None = None, browser_type: PersistentBrowserType | None = None, + is_high_priority: bool = False, ) -> PersistentBrowserSession: """Create a new browser session.""" ... diff --git a/skyvern/webeye/schemas.py b/skyvern/webeye/schemas.py index 17104546..f1bd9670 100644 --- a/skyvern/webeye/schemas.py +++ b/skyvern/webeye/schemas.py @@ -27,6 +27,11 @@ class BrowserSessionResponse(BaseModel): examples=["pbs_123456"], ) organization_id: str = Field(description="ID of the organization that owns this session") + status: str | None = Field( + None, + description="Current status of the browser session", + examples=["created", "running", "completed", "failed", "timeout"], + ) runnable_type: str | None = Field( None, description="Type of the current runnable associated with this session (workflow, task etc)", @@ -124,6 +129,7 @@ class BrowserSessionResponse(BaseModel): return cls( browser_session_id=browser_session.persistent_browser_session_id, organization_id=browser_session.organization_id, + status=browser_session.status, runnable_type=browser_session.runnable_type, runnable_id=browser_session.runnable_id, timeout=browser_session.timeout_minutes,