vnc: persistent sessions manager update (#2706)

This commit is contained in:
Shuchang Zheng
2025-06-13 05:55:41 -07:00
committed by GitHub
parent affb4e0462
commit 40e608f9cd
6 changed files with 100 additions and 43 deletions

View File

@@ -687,3 +687,18 @@ class SkyvernContextWindowExceededError(SkyvernException):
class LLMCallerNotFoundError(SkyvernException):
def __init__(self, uid: str) -> None:
super().__init__(f"LLM caller for {uid} is not found")
class BrowserSessionAlreadyOccupiedError(SkyvernHTTPException):
def __init__(self, browser_session_id: str) -> None:
super().__init__(f"Browser session {browser_session_id} is already occupied")
class MissingBrowserSessionError(SkyvernHTTPException):
def __init__(self, browser_session_id: str) -> None:
super().__init__(f"Browser session {browser_session_id} does not exist.")
class MissingBrowserAddressError(SkyvernException):
def __init__(self, browser_session_id: str) -> None:
super().__init__(f"Browser session {browser_session_id} does not have an address.")

View File

@@ -9,6 +9,7 @@ from fastapi.responses import ORJSONResponse
from skyvern import analytics
from skyvern._version import __version__
from skyvern.config import settings
from skyvern.exceptions import MissingBrowserAddressError
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError
@@ -221,6 +222,8 @@ async def run_task(
create_task_run=True,
model=run_request.model,
)
except MissingBrowserAddressError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except LLMProviderError:
LOG.error("LLM failure to initialize task v2", exc_info=True)
raise HTTPException(
@@ -316,18 +319,22 @@ async def run_workflow(
totp_url=workflow_run_request.totp_url,
browser_session_id=workflow_run_request.browser_session_id,
)
workflow_run = await workflow_service.run_workflow(
workflow_id=workflow_id,
organization=current_org,
workflow_request=legacy_workflow_request,
template=template,
version=None,
max_steps=x_max_steps_override,
api_key=x_api_key,
request_id=request_id,
request=request,
background_tasks=background_tasks,
)
try:
workflow_run = await workflow_service.run_workflow(
workflow_id=workflow_id,
organization=current_org,
workflow_request=legacy_workflow_request,
template=template,
version=None,
max_steps=x_max_steps_override,
api_key=x_api_key,
request_id=request_id,
request=request,
background_tasks=background_tasks,
)
except MissingBrowserAddressError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return WorkflowRunResponse(
run_id=workflow_run.workflow_run_id,
@@ -1253,18 +1260,21 @@ async def run_workflow_legacy(
browser_session_id=workflow_request.browser_session_id,
)
workflow_run = await workflow_service.run_workflow(
workflow_id=workflow_id,
organization=current_org,
workflow_request=workflow_request,
template=template,
version=version,
max_steps=x_max_steps_override,
api_key=x_api_key,
request_id=request_id,
request=request,
background_tasks=background_tasks,
)
try:
workflow_run = await workflow_service.run_workflow(
workflow_id=workflow_id,
organization=current_org,
workflow_request=workflow_request,
template=template,
version=version,
max_steps=x_max_steps_override,
api_key=x_api_key,
request_id=request_id,
request=request,
background_tasks=background_tasks,
)
except MissingBrowserAddressError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return RunWorkflowResponse(
workflow_id=workflow_id,
@@ -1759,6 +1769,8 @@ async def run_task_v2(
extracted_information_schema=data.extracted_information_schema,
error_code_mapping=data.error_code_mapping,
)
except MissingBrowserAddressError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except LLMProviderError:
LOG.error("LLM failure to initialize task v2", exc_info=True)
raise HTTPException(

View File

@@ -1228,12 +1228,7 @@ async def wrapper():
)
browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state(browser_session_id)
if browser_state:
await app.PERSISTENT_SESSIONS_MANAGER.occupy_browser_session(
browser_session_id,
runnable_type="workflow_run",
runnable_id=workflow_run_id,
organization_id=organization_id,
)
LOG.info("Was occupying session here, but no longer.", browser_session_id=browser_session_id)
else:
browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id)

View File

@@ -219,6 +219,14 @@ class WorkflowService:
)
raise e
if workflow_request.browser_session_id:
await app.PERSISTENT_SESSIONS_MANAGER.begin_session(
browser_session_id=workflow_request.browser_session_id,
runnable_type="workflow_run",
runnable_id=workflow_run.workflow_run_id,
organization_id=organization.organization_id,
)
return workflow_run
async def execute_workflow(

View File

@@ -93,12 +93,7 @@ class BrowserManager:
raise MissingBrowserState(task_id=task.task_id)
else:
if task.organization_id:
await app.PERSISTENT_SESSIONS_MANAGER.occupy_browser_session(
browser_session_id,
organization_id=task.organization_id,
runnable_type="task",
runnable_id=task.task_id,
)
LOG.info("User to occupy browser session here", browser_session_id=browser_session_id)
else:
LOG.warning("Organization ID is not set for task", task_id=task.task_id)
page = await browser_state.get_working_page()
@@ -160,12 +155,7 @@ class BrowserManager:
)
raise MissingBrowserState(workflow_run_id=workflow_run.workflow_run_id)
else:
await app.PERSISTENT_SESSIONS_MANAGER.occupy_browser_session(
browser_session_id,
runnable_type="workflow_run",
runnable_id=workflow_run.workflow_run_id,
organization_id=workflow_run.organization_id,
)
LOG.info("Used to occupy browser session here", browser_session_id=browser_session_id)
page = await browser_state.get_working_page()
if page:
if url:

View File

@@ -27,10 +27,47 @@ class PersistentSessionsManager:
def __new__(cls, database: AgentDB) -> PersistentSessionsManager:
if cls.instance is None:
cls.instance = super().__new__(cls)
new_instance = super().__new__(cls)
cls.instance = new_instance
cls.instance.database = database
return new_instance
cls.instance.database = database
return cls.instance
async def begin_session(
self,
*,
browser_session_id: str,
runnable_type: str,
runnable_id: str,
organization_id: str,
) -> None:
"""
Attempt to begin a session.
TODO: cloud-side, temporal and ECS fargate are used to effect the session. These tools are not presently
available OSS-side.
"""
LOG.info("Begin browser session", browser_session_id=browser_session_id)
persistent_browser_session = await self.database.get_persistent_browser_session(
browser_session_id, organization_id
)
if persistent_browser_session is None:
raise Exception(f"Persistent browser session not found for {browser_session_id}")
await self.occupy_browser_session(
session_id=browser_session_id,
runnable_type=runnable_type,
runnable_id=runnable_id,
organization_id=organization_id,
)
LOG.info("Browser session begin", browser_session_id=browser_session_id)
async def get_browser_address(self, session_id: str, organization_id: str) -> tuple[str, str, str]:
address = await wait_on_persistent_browser_address(self.database, session_id, organization_id)