browser session streaming fix and clean up wait_on_persistent_browser_address (#2983)
This commit is contained in:
@@ -1,4 +1,3 @@
|
|||||||
import asyncio
|
|
||||||
import json
|
import json
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Any, List, Sequence
|
from typing import Any, List, Sequence
|
||||||
@@ -3354,21 +3353,3 @@ class AgentDB:
|
|||||||
query = query.filter_by(organization_id=organization_id)
|
query = query.filter_by(organization_id=organization_id)
|
||||||
task_run = (await session.scalars(query)).first()
|
task_run = (await session.scalars(query)).first()
|
||||||
return Run.model_validate(task_run) if task_run else None
|
return Run.model_validate(task_run) if task_run else None
|
||||||
|
|
||||||
async def wait_on_persistent_browser_address(self, session_id: str, organization_id: str) -> str:
|
|
||||||
async with asyncio.timeout(10 * 60):
|
|
||||||
while True:
|
|
||||||
persistent_browser_session = await self.get_persistent_browser_session(session_id, organization_id)
|
|
||||||
if persistent_browser_session is None:
|
|
||||||
raise Exception(f"Persistent browser session not found for {session_id}")
|
|
||||||
|
|
||||||
LOG.info(
|
|
||||||
"Checking browser address",
|
|
||||||
session_id=session_id,
|
|
||||||
address=persistent_browser_session.browser_address,
|
|
||||||
)
|
|
||||||
|
|
||||||
if persistent_browser_session.browser_address:
|
|
||||||
return persistent_browser_session.browser_address
|
|
||||||
|
|
||||||
await asyncio.sleep(2)
|
|
||||||
|
|||||||
@@ -3,13 +3,31 @@ import asyncio
|
|||||||
from structlog import get_logger
|
from structlog import get_logger
|
||||||
|
|
||||||
from skyvern.forge.sdk.db.client import AgentDB
|
from skyvern.forge.sdk.db.client import AgentDB
|
||||||
|
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
|
||||||
|
|
||||||
LOG = get_logger(__name__)
|
LOG = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def wait_on_persistent_browser_address(db: AgentDB, session_id: str, organization_id: str) -> str | None:
|
async def wait_on_persistent_browser_address(
|
||||||
|
db: AgentDB,
|
||||||
|
session_id: str,
|
||||||
|
organization_id: str,
|
||||||
|
timeout: int = 600,
|
||||||
|
poll_interval: int = 2,
|
||||||
|
) -> str | None:
|
||||||
|
persistent_browser_session = await await_browser_session(db, session_id, organization_id, timeout, poll_interval)
|
||||||
|
return persistent_browser_session.browser_address if persistent_browser_session else None
|
||||||
|
|
||||||
|
|
||||||
|
async def await_browser_session(
|
||||||
|
db: AgentDB,
|
||||||
|
session_id: str,
|
||||||
|
organization_id: str,
|
||||||
|
timeout: int = 600,
|
||||||
|
poll_interval: int = 2,
|
||||||
|
) -> PersistentBrowserSession | None:
|
||||||
try:
|
try:
|
||||||
async with asyncio.timeout(10 * 60):
|
async with asyncio.timeout(timeout):
|
||||||
while True:
|
while True:
|
||||||
persistent_browser_session = await db.get_persistent_browser_session(session_id, organization_id)
|
persistent_browser_session = await db.get_persistent_browser_session(session_id, organization_id)
|
||||||
if persistent_browser_session is None:
|
if persistent_browser_session is None:
|
||||||
@@ -22,9 +40,9 @@ async def wait_on_persistent_browser_address(db: AgentDB, session_id: str, organ
|
|||||||
)
|
)
|
||||||
|
|
||||||
if persistent_browser_session.browser_address:
|
if persistent_browser_session.browser_address:
|
||||||
return persistent_browser_session.browser_address
|
return persistent_browser_session
|
||||||
|
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(poll_interval)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
LOG.warning(f"Browser address not found for persistent browser session {session_id}")
|
LOG.warning(f"Browser address not found for persistent browser session {session_id}")
|
||||||
|
|
||||||
|
|||||||
@@ -52,11 +52,10 @@ async def verify_browser_session(
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_, host, cdp_port = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_address(
|
browser_address = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_address(
|
||||||
session_id=browser_session_id,
|
session_id=browser_session_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
)
|
)
|
||||||
browser_address = f"{host}:{cdp_port}"
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Browser session address not found for browser session.",
|
"Browser session address not found for browser session.",
|
||||||
@@ -211,11 +210,10 @@ async def verify_workflow_run(
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_, host, cdp_port = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_address(
|
browser_address = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_address(
|
||||||
session_id=browser_session.persistent_browser_session_id,
|
session_id=browser_session.persistent_browser_session_id,
|
||||||
organization_id=organization_id,
|
organization_id=organization_id,
|
||||||
)
|
)
|
||||||
browser_address = f"{host}:{cdp_port}"
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Browser session address not found for workflow run.",
|
"Browser session address not found for workflow run.",
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from dataclasses import dataclass
|
|||||||
import structlog
|
import structlog
|
||||||
from playwright._impl._errors import TargetClosedError
|
from playwright._impl._errors import TargetClosedError
|
||||||
|
|
||||||
|
from skyvern.exceptions import MissingBrowserAddressError
|
||||||
from skyvern.forge.sdk.db.client import AgentDB
|
from skyvern.forge.sdk.db.client import AgentDB
|
||||||
from skyvern.forge.sdk.db.polls import wait_on_persistent_browser_address
|
from skyvern.forge.sdk.db.polls import wait_on_persistent_browser_address
|
||||||
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
|
from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession
|
||||||
@@ -69,16 +70,13 @@ class PersistentSessionsManager:
|
|||||||
|
|
||||||
LOG.info("Browser session begin", browser_session_id=browser_session_id)
|
LOG.info("Browser session begin", browser_session_id=browser_session_id)
|
||||||
|
|
||||||
async def get_browser_address(self, session_id: str, organization_id: str) -> tuple[str, str, str]:
|
async def get_browser_address(self, session_id: str, organization_id: str) -> str:
|
||||||
address = await wait_on_persistent_browser_address(self.database, session_id, organization_id)
|
address = await wait_on_persistent_browser_address(self.database, session_id, organization_id)
|
||||||
|
|
||||||
if address is None:
|
if address is None:
|
||||||
raise Exception(f"Browser address not found for persistent browser session {session_id}")
|
raise MissingBrowserAddressError(session_id)
|
||||||
|
|
||||||
protocol = "http"
|
return address
|
||||||
host, cdp_port = address.split(":")
|
|
||||||
|
|
||||||
return protocol, host, cdp_port
|
|
||||||
|
|
||||||
async def get_session_by_runnable_id(
|
async def get_session_by_runnable_id(
|
||||||
self, runnable_id: str, organization_id: str
|
self, runnable_id: str, organization_id: str
|
||||||
|
|||||||
Reference in New Issue
Block a user