diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index acd9a034..6d8118c0 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -718,9 +718,9 @@ class BrowserSessionAlreadyOccupiedError(SkyvernHTTPException): super().__init__(f"Browser session {browser_session_id} is already occupied") -class MissingBrowserSessionError(SkyvernHTTPException): - def __init__(self, browser_session_id: str) -> None: - super().__init__(f"Browser session {browser_session_id} does not exist.") +class BrowserSessionNotRenewable(SkyvernException): + def __init__(self, reason: str, browser_session_id: str) -> None: + super().__init__(f"Browser session {browser_session_id} is not renewable: {reason}") class MissingBrowserAddressError(SkyvernException): diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 681d5359..cbf00de0 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -3481,25 +3481,3 @@ class AgentDB: await session.refresh(debug_session) return DebugSession.model_validate(debug_session) - - async def update_debug_session( - self, - *, - debug_session_id: str, - browser_session_id: str | None = None, - ) -> DebugSession: - async with self.Session() as session: - debug_session = ( - await session.scalars(select(DebugSessionModel).filter_by(debug_session_id=debug_session_id)) - ).first() - - if not debug_session: - raise NotFoundError(f"Debug session {debug_session_id} not found") - - if browser_session_id: - debug_session.browser_session_id = browser_session_id - - await session.commit() - await session.refresh(debug_session) - - return DebugSession.model_validate(debug_session) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 6b2be75b..25b72c8e 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -1,8 +1,6 @@ import asyncio -from datetime import datetime, timedelta, timezone from enum import Enum from functools import partial -from math import floor from typing import Annotated, Any import structlog @@ -13,7 +11,7 @@ from fastapi.responses import ORJSONResponse from skyvern import analytics from skyvern._version import __version__ from skyvern.config import settings -from skyvern.exceptions import MissingBrowserAddressError +from skyvern.exceptions import BrowserSessionNotRenewable, MissingBrowserAddressError from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError @@ -23,6 +21,7 @@ from skyvern.forge.sdk.core.curl_converter import curl_to_http_request_block_par from skyvern.forge.sdk.core.permissions.permission_checker_factory import PermissionCheckerFactory from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType +from skyvern.forge.sdk.db.exceptions import NotFoundError from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.routes.code_samples import ( @@ -2027,8 +2026,10 @@ async def get_or_create_debug_session_by_user_and_workflow_permanent_id( If the debug session does not exist, a new one will be created. - If the debug session exists, the timeout will be extended to 4 hours from - the time of the request. + In addition, the timeout for the debug session's browser session will be + extended to 4 hours from the time of the request. If the browser session + cannot be renewed, a new one will be created and assigned to the debug + session. The browser_session that could not be renewed will be closed. """ debug_session = await app.DATABASE.get_debug_session( @@ -2038,144 +2039,67 @@ async def get_or_create_debug_session_by_user_and_workflow_permanent_id( ) if not debug_session: - new_browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session( - organization_id=current_org.organization_id, - timeout_minutes=settings.DEBUG_SESSION_TIMEOUT_MINUTES, - ) - - debug_session = await app.DATABASE.create_debug_session( - browser_session_id=new_browser_session.persistent_browser_session_id, + LOG.info( + "Existing debug session not found, created a new one, along with a new browser session", organization_id=current_org.organization_id, user_id=current_user_id, workflow_permanent_id=workflow_permanent_id, ) - LOG.info( - "Existing debug session not found, created a new one", - debug_session_id=debug_session.debug_session_id, - browser_session_id=new_browser_session.persistent_browser_session_id, - organization_id=current_org.organization_id, - user_id=current_user_id, # not sure how we feel about logging this - workflow_permanent_id=workflow_permanent_id, + return await new_debug_session( + workflow_permanent_id, + current_org, + current_user_id, ) - return debug_session - LOG.info( "Existing debug session found", debug_session_id=debug_session.debug_session_id, browser_session_id=debug_session.browser_session_id, organization_id=current_org.organization_id, - user_id=current_user_id, # not sure how we feel about logging this - workflow_permanent_id=workflow_permanent_id, - ) - - browser_session = await app.DATABASE.get_persistent_browser_session( - debug_session.browser_session_id, current_org.organization_id - ) - - if browser_session and browser_session.completed_at is None: - if browser_session.started_at is None or browser_session.timeout_minutes is None: - LOG.warning( - "Persistent browser session started_at or timeout_minutes is None; assumption == this is normal, and they will become non-None shortly", - debug_session_id=debug_session.debug_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, - user_id=current_user_id, - ) - return debug_session - right_now = datetime.now(timezone.utc) - current_timeout_minutes = browser_session.timeout_minutes - started_at_utc = ( - browser_session.started_at.replace(tzinfo=timezone.utc) - if browser_session.started_at.tzinfo is None - else browser_session.started_at - ) - current_timeout_datetime = started_at_utc + timedelta(minutes=float(current_timeout_minutes)) - minutes_left = (current_timeout_datetime - right_now).total_seconds() / 60 - - if minutes_left >= settings.DEBUG_SESSION_TIMEOUT_THRESHOLD_MINUTES: - new_timeout_datetime = right_now + timedelta(minutes=settings.DEBUG_SESSION_TIMEOUT_MINUTES) - minutes_diff = floor((new_timeout_datetime - current_timeout_datetime).total_seconds() / 60) - new_timeout_minutes = current_timeout_minutes + minutes_diff - - LOG.info( - f"Extended persistent browser session (for debugging) by {minutes_diff} minute(s)", - minutes_diff=minutes_diff, - debug_session_id=debug_session.debug_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, - user_id=current_user_id, - ) - - await app.DATABASE.update_persistent_browser_session( - browser_session_id=debug_session.browser_session_id, - organization_id=current_org.organization_id, - timeout_minutes=new_timeout_minutes, - ) - - return debug_session - else: - LOG.info( - "Browser session for debug session has expired", - debug_session_id=debug_session.debug_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, - user_id=current_user_id, - ) - else: - if browser_session: - LOG.info( - "Browser session for debug session has expired, creating a new one", - debug_session_id=debug_session.debug_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, - user_id=current_user_id, - ) - else: - LOG.info( - "Browser session did not exist for debug session, creating a new one", - debug_session_id=debug_session.debug_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, - user_id=current_user_id, - ) - - browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session( - organization_id=current_org.organization_id, - timeout_minutes=settings.DEBUG_SESSION_TIMEOUT_MINUTES, - ) - - await app.DATABASE.update_debug_session( - debug_session_id=debug_session.debug_session_id, - browser_session_id=browser_session.persistent_browser_session_id, - ) - - LOG.info( - "Updated debug session with new browser session", - debug_session_id=debug_session.debug_session_id, - browser_session_id=browser_session.persistent_browser_session_id, - organization_id=current_org.organization_id, - workflow_permanent_id=workflow_permanent_id, user_id=current_user_id, + workflow_permanent_id=workflow_permanent_id, ) - return debug_session + try: + await app.PERSISTENT_SESSIONS_MANAGER.renew_or_close_session( + debug_session.browser_session_id, + current_org.organization_id, + ) + return debug_session + except BrowserSessionNotRenewable as ex: + LOG.info( + "Browser session was non-renewable; creating a new debug session", + ex=str(ex), + debug_session_id=debug_session.debug_session_id, + browser_session_id=debug_session.browser_session_id, + organization_id=current_org.organization_id, + workflow_permanent_id=workflow_permanent_id, + user_id=current_user_id, + ) + + return await new_debug_session( + workflow_permanent_id, + current_org, + current_user_id, + ) @base_router.post( "/debug-session/{workflow_permanent_id}/new", include_in_schema=False, ) -async def new_browser_session_for_debug_session( +async def new_debug_session( workflow_permanent_id: str, current_org: Organization = Depends(org_auth_service.get_current_org), current_user_id: str = Depends(org_auth_service.get_current_user_id), ) -> DebugSession: """ Create a new debug session, along with a new browser session. If any - existing debug session are found, close the browser sessions associated - with them. + existing debug sessions are found, "complete" them. Then close the browser + sessions associated with those completed debug sessions. + + Return the new debug session. """ completed_debug_sessions = await app.DATABASE.complete_debug_sessions( @@ -2196,10 +2120,13 @@ async def new_browser_session_for_debug_session( closeable_browser_sessions: list[PersistentBrowserSession] = [] for debug_session in completed_debug_sessions: - browser_session = await app.DATABASE.get_persistent_browser_session( - debug_session.browser_session_id, - current_org.organization_id, - ) + try: + browser_session = await app.DATABASE.get_persistent_browser_session( + debug_session.browser_session_id, + current_org.organization_id, + ) + except NotFoundError: + browser_session = None if browser_session and browser_session.completed_at is None: closeable_browser_sessions.append(browser_session) diff --git a/skyvern/webeye/persistent_sessions_manager.py b/skyvern/webeye/persistent_sessions_manager.py index 752fd832..d024cc93 100644 --- a/skyvern/webeye/persistent_sessions_manager.py +++ b/skyvern/webeye/persistent_sessions_manager.py @@ -1,11 +1,14 @@ from __future__ import annotations from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from math import floor import structlog from playwright._impl._errors import TargetClosedError -from skyvern.exceptions import MissingBrowserAddressError +from skyvern.config import settings +from skyvern.exceptions import BrowserSessionNotRenewable, MissingBrowserAddressError from skyvern.forge.sdk.db.client import AgentDB from skyvern.forge.sdk.db.polls import wait_on_persistent_browser_address from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession @@ -19,6 +22,99 @@ class BrowserSession: browser_state: BrowserState +async def validate_session_for_renewal( + database: AgentDB, + session_id: str, + organization_id: str, +) -> tuple[PersistentBrowserSession, datetime, int]: + """ + Validate a specific browser session for renewal. Otherwise raise. + """ + + browser_session = await database.get_persistent_browser_session( + session_id=session_id, + organization_id=organization_id, + ) + + if not browser_session: + LOG.warning( + "Attempted to renew non-existent browser session", + browser_session_id=session_id, + organization_id=organization_id, + ) + raise BrowserSessionNotRenewable("Browser session does not exist", session_id) + + if browser_session.completed_at is not None: + LOG.warning( + "Attempted to renew completed browser session", + browser_session_id=session_id, + organization_id=organization_id, + ) + raise BrowserSessionNotRenewable("Browser session has already completed", session_id) + + if browser_session.started_at is None or browser_session.timeout_minutes is None: + LOG.warning( + "Attempted to renew browser session that has not started yet", + browser_session_id=session_id, + organization_id=organization_id, + ) + raise BrowserSessionNotRenewable("Browser session has not started yet", session_id) + + if browser_session.status != "created": + LOG.warning( + "Attempted to renew browser session that is not in the 'created' state", + browser_session_id=session_id, + organization_id=organization_id, + ) + raise BrowserSessionNotRenewable("Browser session is not in the 'created' state", session_id) + + started_at_utc = ( + browser_session.started_at.replace(tzinfo=timezone.utc) + if browser_session.started_at.tzinfo is None + else browser_session.started_at + ) + + return browser_session, started_at_utc, browser_session.timeout_minutes + + +async def renew_session(database: AgentDB, session_id: str, organization_id: str) -> PersistentBrowserSession: + """ + Renew a specific browser session, if it is deemed renewable. + """ + + browser_session, started_at_utc, current_timeout_minutes = await validate_session_for_renewal( + database, + organization_id=organization_id, + session_id=session_id, + ) + + right_now = datetime.now(timezone.utc) + current_timeout_datetime = started_at_utc + timedelta(minutes=float(current_timeout_minutes)) + minutes_left = (current_timeout_datetime - right_now).total_seconds() / 60 + + if minutes_left >= settings.DEBUG_SESSION_TIMEOUT_THRESHOLD_MINUTES: + new_timeout_datetime = right_now + timedelta(minutes=settings.DEBUG_SESSION_TIMEOUT_MINUTES) + minutes_diff = floor((new_timeout_datetime - current_timeout_datetime).total_seconds() / 60) + new_timeout_minutes = current_timeout_minutes + minutes_diff + + browser_session = await database.update_persistent_browser_session( + browser_session_id=session_id, + organization_id=organization_id, + timeout_minutes=new_timeout_minutes, + ) + + LOG.info( + f"Extended browser session by {minutes_diff} minute(s)", + minutes_diff=minutes_diff, + session_id=session_id, + organization_id=organization_id, + ) + + return browser_session + + raise BrowserSessionNotRenewable("Session has expired", session_id) + + class PersistentSessionsManager: instance: PersistentSessionsManager | None = None _browser_sessions: dict[str, BrowserSession] = dict() @@ -135,6 +231,13 @@ class PersistentSessionsManager: organization_id=organization_id, ) + async def renew_or_close_session(self, session_id: str, organization_id: str) -> PersistentBrowserSession: + try: + return await renew_session(self.database, session_id, organization_id) + except BrowserSessionNotRenewable: + await self.close_session(organization_id, session_id) + raise + async def release_browser_session(self, session_id: str, organization_id: str) -> None: """Release a specific browser session.""" await self.database.release_persistent_browser_session(session_id, organization_id)