From 40e608f9cd2d937ec3f5e5b7665ef96a32e2741d Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Fri, 13 Jun 2025 05:55:41 -0700 Subject: [PATCH] vnc: persistent sessions manager update (#2706) --- skyvern/exceptions.py | 15 +++++ skyvern/forge/sdk/routes/agent_protocol.py | 60 +++++++++++-------- skyvern/forge/sdk/workflow/models/block.py | 7 +-- skyvern/forge/sdk/workflow/service.py | 8 +++ skyvern/webeye/browser_manager.py | 14 +---- skyvern/webeye/persistent_sessions_manager.py | 39 +++++++++++- 6 files changed, 100 insertions(+), 43 deletions(-) diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index d61e0988..d7777474 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -687,3 +687,18 @@ class SkyvernContextWindowExceededError(SkyvernException): class LLMCallerNotFoundError(SkyvernException): def __init__(self, uid: str) -> None: super().__init__(f"LLM caller for {uid} is not found") + + +class BrowserSessionAlreadyOccupiedError(SkyvernHTTPException): + def __init__(self, browser_session_id: str) -> None: + super().__init__(f"Browser session {browser_session_id} is already occupied") + + +class MissingBrowserSessionError(SkyvernHTTPException): + def __init__(self, browser_session_id: str) -> None: + super().__init__(f"Browser session {browser_session_id} does not exist.") + + +class MissingBrowserAddressError(SkyvernException): + def __init__(self, browser_session_id: str) -> None: + super().__init__(f"Browser session {browser_session_id} does not have an address.") diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index a070bdb6..9512574e 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -9,6 +9,7 @@ from fastapi.responses import ORJSONResponse from skyvern import analytics from skyvern._version import __version__ from skyvern.config import settings +from skyvern.exceptions import MissingBrowserAddressError from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError @@ -221,6 +222,8 @@ async def run_task( create_task_run=True, model=run_request.model, ) + except MissingBrowserAddressError as e: + raise HTTPException(status_code=400, detail=str(e)) from e except LLMProviderError: LOG.error("LLM failure to initialize task v2", exc_info=True) raise HTTPException( @@ -316,18 +319,22 @@ async def run_workflow( totp_url=workflow_run_request.totp_url, browser_session_id=workflow_run_request.browser_session_id, ) - workflow_run = await workflow_service.run_workflow( - workflow_id=workflow_id, - organization=current_org, - workflow_request=legacy_workflow_request, - template=template, - version=None, - max_steps=x_max_steps_override, - api_key=x_api_key, - request_id=request_id, - request=request, - background_tasks=background_tasks, - ) + + try: + workflow_run = await workflow_service.run_workflow( + workflow_id=workflow_id, + organization=current_org, + workflow_request=legacy_workflow_request, + template=template, + version=None, + max_steps=x_max_steps_override, + api_key=x_api_key, + request_id=request_id, + request=request, + background_tasks=background_tasks, + ) + except MissingBrowserAddressError as e: + raise HTTPException(status_code=400, detail=str(e)) from e return WorkflowRunResponse( run_id=workflow_run.workflow_run_id, @@ -1253,18 +1260,21 @@ async def run_workflow_legacy( browser_session_id=workflow_request.browser_session_id, ) - workflow_run = await workflow_service.run_workflow( - workflow_id=workflow_id, - organization=current_org, - workflow_request=workflow_request, - template=template, - version=version, - max_steps=x_max_steps_override, - api_key=x_api_key, - request_id=request_id, - request=request, - background_tasks=background_tasks, - ) + try: + workflow_run = await workflow_service.run_workflow( + workflow_id=workflow_id, + organization=current_org, + workflow_request=workflow_request, + template=template, + version=version, + max_steps=x_max_steps_override, + api_key=x_api_key, + request_id=request_id, + request=request, + background_tasks=background_tasks, + ) + except MissingBrowserAddressError as e: + raise HTTPException(status_code=400, detail=str(e)) from e return RunWorkflowResponse( workflow_id=workflow_id, @@ -1759,6 +1769,8 @@ async def run_task_v2( extracted_information_schema=data.extracted_information_schema, error_code_mapping=data.error_code_mapping, ) + except MissingBrowserAddressError as e: + raise HTTPException(status_code=400, detail=str(e)) from e except LLMProviderError: LOG.error("LLM failure to initialize task v2", exc_info=True) raise HTTPException( diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index b8c0dd15..6b173a3a 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -1228,12 +1228,7 @@ async def wrapper(): ) browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state(browser_session_id) if browser_state: - await app.PERSISTENT_SESSIONS_MANAGER.occupy_browser_session( - browser_session_id, - runnable_type="workflow_run", - runnable_id=workflow_run_id, - organization_id=organization_id, - ) + LOG.info("Was occupying session here, but no longer.", browser_session_id=browser_session_id) else: browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index bca6cf1a..844d2a55 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -219,6 +219,14 @@ class WorkflowService: ) raise e + if workflow_request.browser_session_id: + await app.PERSISTENT_SESSIONS_MANAGER.begin_session( + browser_session_id=workflow_request.browser_session_id, + runnable_type="workflow_run", + runnable_id=workflow_run.workflow_run_id, + organization_id=organization.organization_id, + ) + return workflow_run async def execute_workflow( diff --git a/skyvern/webeye/browser_manager.py b/skyvern/webeye/browser_manager.py index 6e2e02ed..5308a32d 100644 --- a/skyvern/webeye/browser_manager.py +++ b/skyvern/webeye/browser_manager.py @@ -93,12 +93,7 @@ class BrowserManager: raise MissingBrowserState(task_id=task.task_id) else: if task.organization_id: - await app.PERSISTENT_SESSIONS_MANAGER.occupy_browser_session( - browser_session_id, - organization_id=task.organization_id, - runnable_type="task", - runnable_id=task.task_id, - ) + LOG.info("User to occupy browser session here", browser_session_id=browser_session_id) else: LOG.warning("Organization ID is not set for task", task_id=task.task_id) page = await browser_state.get_working_page() @@ -160,12 +155,7 @@ class BrowserManager: ) raise MissingBrowserState(workflow_run_id=workflow_run.workflow_run_id) else: - await app.PERSISTENT_SESSIONS_MANAGER.occupy_browser_session( - browser_session_id, - runnable_type="workflow_run", - runnable_id=workflow_run.workflow_run_id, - organization_id=workflow_run.organization_id, - ) + LOG.info("Used to occupy browser session here", browser_session_id=browser_session_id) page = await browser_state.get_working_page() if page: if url: diff --git a/skyvern/webeye/persistent_sessions_manager.py b/skyvern/webeye/persistent_sessions_manager.py index 58f968ff..f7c705b4 100644 --- a/skyvern/webeye/persistent_sessions_manager.py +++ b/skyvern/webeye/persistent_sessions_manager.py @@ -27,10 +27,47 @@ class PersistentSessionsManager: def __new__(cls, database: AgentDB) -> PersistentSessionsManager: if cls.instance is None: - cls.instance = super().__new__(cls) + new_instance = super().__new__(cls) + cls.instance = new_instance + cls.instance.database = database + return new_instance + cls.instance.database = database return cls.instance + async def begin_session( + self, + *, + browser_session_id: str, + runnable_type: str, + runnable_id: str, + organization_id: str, + ) -> None: + """ + Attempt to begin a session. + + TODO: cloud-side, temporal and ECS fargate are used to effect the session. These tools are not presently + available OSS-side. + """ + + LOG.info("Begin browser session", browser_session_id=browser_session_id) + + persistent_browser_session = await self.database.get_persistent_browser_session( + browser_session_id, organization_id + ) + + if persistent_browser_session is None: + raise Exception(f"Persistent browser session not found for {browser_session_id}") + + await self.occupy_browser_session( + session_id=browser_session_id, + runnable_type=runnable_type, + runnable_id=runnable_id, + organization_id=organization_id, + ) + + LOG.info("Browser session begin", browser_session_id=browser_session_id) + async def get_browser_address(self, session_id: str, organization_id: str) -> tuple[str, str, str]: address = await wait_on_persistent_browser_address(self.database, session_id, organization_id)