diff --git a/skyvern/core/script_generations/script_skyvern_page.py b/skyvern/core/script_generations/script_skyvern_page.py index bad2b0c8..1eb8a263 100644 --- a/skyvern/core/script_generations/script_skyvern_page.py +++ b/skyvern/core/script_generations/script_skyvern_page.py @@ -26,7 +26,7 @@ from skyvern.webeye.actions.actions import ( SolveCaptchaAction, ) from skyvern.webeye.actions.handler import ActionHandler, handle_complete_action -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website LOG = structlog.get_logger() diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index a428e403..44acecdc 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -125,7 +125,7 @@ from skyvern.webeye.actions.parse_actions import ( parse_ui_tars_actions, ) from skyvern.webeye.actions.responses import ActionResult, ActionSuccess -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.scraper.scraper import ElementTreeFormat, ScrapedPage, scrape_website from skyvern.webeye.utils.page import SkyvernFrame diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index 64bab4f7..6b82900d 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -23,7 +23,7 @@ from skyvern.forge.sdk.workflow.models.block import BlockTypeVar from skyvern.services import workflow_script_service from skyvern.webeye.actions.action_types import POST_ACTION_EXECUTION_ACTION_TYPES from skyvern.webeye.actions.actions import Action -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.scraper.scraper import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html from skyvern.webeye.utils.dom import SkyvernElement from skyvern.webeye.utils.page import SkyvernFrame diff --git a/skyvern/forge/forge_app.py b/skyvern/forge/forge_app.py index a38a9b14..c45a9ba5 100644 --- a/skyvern/forge/forge_app.py +++ b/skyvern/forge/forge_app.py @@ -32,6 +32,7 @@ from skyvern.forge.sdk.workflow.context_manager import WorkflowContextManager from skyvern.forge.sdk.workflow.service import WorkflowService from skyvern.webeye.browser_manager import BrowserManager from skyvern.webeye.persistent_sessions_manager import PersistentSessionsManager +from skyvern.webeye.real_browser_manager import RealBrowserManager from skyvern.webeye.scraper.scraper import ScrapeExcludeFunc @@ -92,7 +93,7 @@ def create_forge_app() -> ForgeApp: app.STORAGE = StorageFactory.get_storage() app.CACHE = CacheFactory.get_cache() app.ARTIFACT_MANAGER = ArtifactManager() - app.BROWSER_MANAGER = BrowserManager() + app.BROWSER_MANAGER = RealBrowserManager() app.EXPERIMENTATION_PROVIDER = NoOpExperimentationProvider() app.LLM_API_HANDLER = LLMAPIHandlerFactory.get_llm_api_handler(settings.LLM_KEY) diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 5602f5da..6c220193 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -93,7 +93,7 @@ from skyvern.schemas.workflows import BlockResult, BlockStatus, BlockType, FileS from skyvern.utils.strings import generate_random_string from skyvern.utils.templating import get_missing_variables from skyvern.utils.url_validators import prepend_scheme_and_validate_url -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.utils.page import SkyvernFrame LOG = structlog.get_logger() diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index ed7cdcd7..27bd2ea8 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -129,7 +129,7 @@ from skyvern.schemas.workflows import ( WorkflowStatus, ) from skyvern.services import script_service, workflow_script_service -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState LOG = structlog.get_logger() diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py index 5fcaea38..a015fdb9 100644 --- a/skyvern/services/task_v2_service.py +++ b/skyvern/services/task_v2_service.py @@ -55,7 +55,7 @@ from skyvern.schemas.workflows import ( ) from skyvern.utils.prompt_engine import load_prompt_with_elements from skyvern.utils.strings import generate_random_string -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website from skyvern.webeye.utils.page import SkyvernFrame diff --git a/skyvern/webeye/browser_artifacts.py b/skyvern/webeye/browser_artifacts.py new file mode 100644 index 00000000..1b4ebb7b --- /dev/null +++ b/skyvern/webeye/browser_artifacts.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import asyncio +import os + +import aiofiles +from pydantic import BaseModel, PrivateAttr + + +class VideoArtifact(BaseModel): + video_path: str | None = None + video_artifact_id: str | None = None + video_data: bytes = b"" + + +class BrowserArtifacts(BaseModel): + video_artifacts: list[VideoArtifact] = [] + har_path: str | None = None + traces_dir: str | None = None + browser_session_dir: str | None = None + browser_console_log_path: str | None = None + _browser_console_log_lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock) + + async def append_browser_console_log(self, msg: str) -> int: + if self.browser_console_log_path is None: + return 0 + + async with self._browser_console_log_lock: + async with aiofiles.open(self.browser_console_log_path, "a") as f: + return await f.write(msg) + + async def read_browser_console_log(self) -> bytes: + if self.browser_console_log_path is None: + return b"" + + async with self._browser_console_log_lock: + if not os.path.exists(self.browser_console_log_path): + return b"" + + async with aiofiles.open(self.browser_console_log_path, "rb") as f: + return await f.read() diff --git a/skyvern/webeye/browser_factory.py b/skyvern/webeye/browser_factory.py index 8dacfcd3..bc7757f7 100644 --- a/skyvern/webeye/browser_factory.py +++ b/skyvern/webeye/browser_factory.py @@ -16,33 +16,20 @@ from pathlib import Path from typing import Any, Awaitable, Callable, Protocol from urllib.parse import parse_qsl, urlparse -import aiofiles import psutil import structlog from playwright.async_api import BrowserContext, ConsoleMessage, Download, Page, Playwright -from pydantic import BaseModel, PrivateAttr from skyvern.config import settings from skyvern.constants import ( - BROWSER_CLOSE_TIMEOUT, BROWSER_DOWNLOAD_TIMEOUT, - BROWSER_PAGE_CLOSE_TIMEOUT, - NAVIGATION_MAX_RETRY_TIME, SKYVERN_DIR, ) -from skyvern.exceptions import ( - EmptyBrowserContext, - FailedToNavigateToUrl, - FailedToReloadPage, - FailedToStopLoadingPage, - MissingBrowserStatePage, - UnknownBrowserType, - UnknownErrorWhileCreatingBrowserContext, -) +from skyvern.exceptions import UnknownBrowserType, UnknownErrorWhileCreatingBrowserContext from skyvern.forge.sdk.api.files import get_download_dir, make_temp_directory from skyvern.forge.sdk.core.skyvern_context import current, ensure_context -from skyvern.schemas.runs import ProxyLocation, ProxyLocationInput, get_tzinfo_from_proxy -from skyvern.webeye.utils.page import ScreenshotMode, SkyvernFrame +from skyvern.schemas.runs import ProxyLocation, get_tzinfo_from_proxy +from skyvern.webeye.browser_artifacts import BrowserArtifacts, VideoArtifact LOG = structlog.get_logger() @@ -311,40 +298,6 @@ class BrowserContextFactory: raise UnknownErrorWhileCreatingBrowserContext(browser_type, e) from e -class VideoArtifact(BaseModel): - video_path: str | None = None - video_artifact_id: str | None = None - video_data: bytes = b"" - - -class BrowserArtifacts(BaseModel): - video_artifacts: list[VideoArtifact] = [] - har_path: str | None = None - traces_dir: str | None = None - browser_session_dir: str | None = None - browser_console_log_path: str | None = None - _browser_console_log_lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock) - - async def append_browser_console_log(self, msg: str) -> int: - if self.browser_console_log_path is None: - return 0 - - async with self._browser_console_log_lock: - async with aiofiles.open(self.browser_console_log_path, "a") as f: - return await f.write(msg) - - async def read_browser_console_log(self) -> bytes: - if self.browser_console_log_path is None: - return b"" - - async with self._browser_console_log_lock: - if not os.path.exists(self.browser_console_log_path): - return b"" - - async with aiofiles.open(self.browser_console_log_path, "rb") as f: - return await f.read() - - def setup_proxy() -> dict | None: if not settings.HOSTED_PROXY_POOL or settings.HOSTED_PROXY_POOL.strip() == "": LOG.warning("No proxy server value found. Continuing without using proxy...") @@ -635,415 +588,3 @@ async def _connect_to_cdp_browser( BrowserContextFactory.register_type("chromium-headless", _create_headless_chromium) BrowserContextFactory.register_type("chromium-headful", _create_headful_chromium) BrowserContextFactory.register_type("cdp-connect", _create_cdp_connection_browser) - - -class BrowserState: - instance = None - - def __init__( - self, - pw: Playwright, - browser_context: BrowserContext | None = None, - page: Page | None = None, - browser_artifacts: BrowserArtifacts = BrowserArtifacts(), - browser_cleanup: BrowserCleanupFunc = None, - ): - self.__page = page - self.pw = pw - self.browser_context = browser_context - self.browser_artifacts = browser_artifacts - self.browser_cleanup = browser_cleanup - - async def __assert_page(self) -> Page: - page = await self.get_working_page() - if page is not None: - return page - pages = (self.browser_context.pages or []) if self.browser_context else [] - LOG.error("BrowserState has no page", urls=[p.url for p in pages]) - raise MissingBrowserStatePage() - - async def _close_all_other_pages(self) -> None: - cur_page = await self.get_working_page() - if not self.browser_context or not cur_page: - return - pages = self.browser_context.pages - for page in pages: - if page != cur_page: - try: - async with asyncio.timeout(2): - await page.close() - except asyncio.TimeoutError: - LOG.warning("Timeout to close the page. Skip closing the page", url=page.url) - except Exception: - LOG.exception("Error while closing the page", url=page.url) - - async def check_and_fix_state( - self, - url: str | None = None, - proxy_location: ProxyLocationInput = None, - task_id: str | None = None, - workflow_run_id: str | None = None, - script_id: str | None = None, - organization_id: str | None = None, - extra_http_headers: dict[str, str] | None = None, - browser_address: str | None = None, - browser_profile_id: str | None = None, - ) -> None: - if self.browser_context is None: - LOG.info("creating browser context") - ( - browser_context, - browser_artifacts, - browser_cleanup, - ) = await BrowserContextFactory.create_browser_context( - self.pw, - url=url, - proxy_location=proxy_location, - task_id=task_id, - workflow_run_id=workflow_run_id, - script_id=script_id, - organization_id=organization_id, - extra_http_headers=extra_http_headers, - browser_address=browser_address, - browser_profile_id=browser_profile_id, - ) - self.browser_context = browser_context - self.browser_artifacts = browser_artifacts - self.browser_cleanup = browser_cleanup - LOG.info("browser context is created") - - if await self.get_working_page() is None: - page: Page | None = None - use_existing_page = False - if browser_address and len(self.browser_context.pages) > 0: - pages = await self.list_valid_pages() - if len(pages) > 0: - page = pages[-1] - use_existing_page = True - if page is None: - page = await self.browser_context.new_page() - - await self.set_working_page(page, 0) - if not use_existing_page: - await self._close_all_other_pages() - - if url and page.url.rstrip("/") != url.rstrip("/"): - await self.navigate_to_url(page=page, url=url) - - async def navigate_to_url(self, page: Page, url: str, retry_times: int = NAVIGATION_MAX_RETRY_TIME) -> None: - try: - for retry_time in range(retry_times): - LOG.info(f"Trying to navigate to {url} and waiting for 1 second.", url=url, retry_time=retry_time) - try: - start_time = time.time() - await page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) - end_time = time.time() - LOG.info( - "Page loading time", - loading_time=end_time - start_time, - url=url, - ) - # Do we need this? - await asyncio.sleep(5) - LOG.info(f"Successfully went to {url}", url=url, retry_time=retry_time) - return - - except Exception as e: - if retry_time >= retry_times - 1: - raise FailedToNavigateToUrl(url=url, error_message=str(e)) - - LOG.warning( - f"Error while navigating to url: {str(e)}", - exc_info=True, - url=url, - retry_time=retry_time, - ) - # Wait for 1 seconds before retrying - await asyncio.sleep(1) - - except Exception as e: - LOG.exception( - f"Failed to navigate to {url} after {retry_times} retries: {str(e)}", - url=url, - ) - raise e - - async def get_working_page(self) -> Page | None: - # HACK: currently, assuming the last page is always the working page. - # Need to refactor this logic when we want to manipulate multi pages together - # TODO: do not use index of pages, it should be more robust if we want to fully support multi pages manipulation - if self.__page is None or self.browser_context is None: - return None - - # pick the last and http/https page as the working page - pages = await self.list_valid_pages() - if len(pages) == 0: - LOG.info("No http, https or blank page found in the browser context, return None") - return None - - last_page = pages[-1] - if self.__page == last_page: - return self.__page - await self.set_working_page(last_page, len(pages) - 1) - return last_page - - async def list_valid_pages(self, max_pages: int = settings.BROWSER_MAX_PAGES_NUMBER) -> list[Page]: - # List all valid pages(blank page, and http/https page) in the browser context, up to max_pages - # MSEdge CDP bug(?) - # when using CDP connect to a MSEdge, the download hub will be included in the context.pages - if self.browser_context is None: - return [] - - pages = [ - http_page - for http_page in self.browser_context.pages - if ( - http_page.url == "about:blank" - or http_page.url == "chrome-error://chromewebdata/" - or urlparse(http_page.url).scheme in ["http", "https"] - ) - ] - - if max_pages <= 0 or len(pages) <= max_pages: - return pages - - reserved_pages = pages[-max_pages:] - - closing_pages = pages[: len(pages) - max_pages] - LOG.warning( - "The page number exceeds the limit, closing the oldest pages. It might cause the video missing", - closing_pages=closing_pages, - ) - for page in closing_pages: - try: - async with asyncio.timeout(BROWSER_PAGE_CLOSE_TIMEOUT): - await page.close() - except Exception: - LOG.warning("Error while closing the page", exc_info=True) - - return reserved_pages - - async def validate_browser_context(self, page: Page) -> bool: - # validate the content - try: - skyvern_frame = await SkyvernFrame.create_instance(frame=page) - html = await skyvern_frame.get_content() - except Exception: - LOG.error( - "Error happened while getting the first page content", - exc_info=True, - ) - return False - - if "Bad gateway error" in html: - LOG.warning("Bad gateway error on the page, recreate a new browser context with another proxy node") - return False - - if "client_connect_forbidden_host" in html: - LOG.warning( - "capture the client_connect_forbidden_host error on the page, recreate a new browser context with another proxy node" - ) - return False - - return True - - async def must_get_working_page(self) -> Page: - page = await self.get_working_page() - if page is None: - raise MissingBrowserStatePage() - return page - - async def set_working_page(self, page: Page | None, index: int = 0) -> None: - self.__page = page - if page is None: - return - if len(self.browser_artifacts.video_artifacts) > index: - if self.browser_artifacts.video_artifacts[index].video_path is None: - try: - async with asyncio.timeout(settings.BROWSER_ACTION_TIMEOUT_MS / 1000): - if page.video: - self.browser_artifacts.video_artifacts[index].video_path = await page.video.path() - except asyncio.TimeoutError: - LOG.info("Timeout to get the page video, skip the exception") - except Exception: - LOG.exception("Error while getting the page video", exc_info=True) - return - - target_lenght = index + 1 - self.browser_artifacts.video_artifacts.extend( - [VideoArtifact()] * (target_lenght - len(self.browser_artifacts.video_artifacts)) - ) - try: - async with asyncio.timeout(settings.BROWSER_ACTION_TIMEOUT_MS / 1000): - if page.video: - self.browser_artifacts.video_artifacts[index].video_path = await page.video.path() - except asyncio.TimeoutError: - LOG.info("Timeout to get the page video, skip the exception") - except Exception: - LOG.exception("Error while getting the page video", exc_info=True) - return - - async def get_or_create_page( - self, - url: str | None = None, - proxy_location: ProxyLocationInput = None, - task_id: str | None = None, - workflow_run_id: str | None = None, - script_id: str | None = None, - organization_id: str | None = None, - extra_http_headers: dict[str, str] | None = None, - browser_address: str | None = None, - browser_profile_id: str | None = None, - ) -> Page: - page = await self.get_working_page() - if page is not None: - return page - - try: - await self.check_and_fix_state( - url=url, - proxy_location=proxy_location, - task_id=task_id, - workflow_run_id=workflow_run_id, - script_id=script_id, - organization_id=organization_id, - extra_http_headers=extra_http_headers, - browser_address=browser_address, - browser_profile_id=browser_profile_id, - ) - except Exception as e: - error_message = str(e) - if "net::ERR" not in error_message: - raise e - if not await self.close_current_open_page(): - LOG.warning("Failed to close the current open page") - raise e - await self.check_and_fix_state( - url=url, - proxy_location=proxy_location, - task_id=task_id, - workflow_run_id=workflow_run_id, - script_id=script_id, - organization_id=organization_id, - extra_http_headers=extra_http_headers, - browser_address=browser_address, - browser_profile_id=browser_profile_id, - ) - page = await self.__assert_page() - - if not await self.validate_browser_context(await self.get_working_page()): - if not await self.close_current_open_page(): - LOG.warning("Failed to close the current open page, going to skip the browser context validation") - return page - await self.check_and_fix_state( - url=url, - proxy_location=proxy_location, - task_id=task_id, - workflow_run_id=workflow_run_id, - script_id=script_id, - organization_id=organization_id, - extra_http_headers=extra_http_headers, - browser_address=browser_address, - browser_profile_id=browser_profile_id, - ) - page = await self.__assert_page() - return page - - async def close_current_open_page(self) -> bool: - try: - async with asyncio.timeout(BROWSER_CLOSE_TIMEOUT): - await self._close_all_other_pages() - if self.browser_context is not None: - await self.browser_context.close() - self.browser_context = None - await self.set_working_page(None) - return True - except Exception: - LOG.warning("Error while closing the current open page", exc_info=True) - return False - - async def stop_page_loading(self) -> None: - page = await self.__assert_page() - try: - await SkyvernFrame.evaluate(frame=page, expression="window.stop()") - except Exception as e: - LOG.exception(f"Error while stop loading the page: {repr(e)}") - raise FailedToStopLoadingPage(url=page.url, error_message=repr(e)) - - async def new_page(self) -> Page: - if self.browser_context is None: - raise EmptyBrowserContext() - return await self.browser_context.new_page() - - async def reload_page(self) -> None: - page = await self.__assert_page() - - LOG.info(f"Reload page {page.url} and waiting for 5 seconds") - try: - start_time = time.time() - await page.reload(timeout=settings.BROWSER_LOADING_TIMEOUT_MS) - end_time = time.time() - LOG.info( - "Page loading time", - loading_time=end_time - start_time, - ) - await asyncio.sleep(5) - except Exception as e: - LOG.exception(f"Error while reload url: {repr(e)}") - raise FailedToReloadPage(url=page.url, error_message=repr(e)) - - async def close(self, close_browser_on_completion: bool = True) -> None: - LOG.info("Closing browser state") - try: - async with asyncio.timeout(BROWSER_CLOSE_TIMEOUT): - if self.browser_context and close_browser_on_completion: - LOG.info("Closing browser context and its pages") - try: - await self.browser_context.close() - except Exception: - LOG.warning("Failed to close browser context", exc_info=True) - LOG.info("Main browser context and all its pages are closed") - if self.browser_cleanup is not None: - try: - self.browser_cleanup() - LOG.info("Main browser cleanup is excuted") - except Exception: - LOG.warning("Failed to execute browser cleanup", exc_info=True) - except asyncio.TimeoutError: - LOG.error("Timeout to close browser context, going to stop playwright directly") - - try: - async with asyncio.timeout(BROWSER_CLOSE_TIMEOUT): - if self.pw and close_browser_on_completion: - try: - LOG.info("Stopping playwright") - await self.pw.stop() - LOG.info("Playwright is stopped") - except Exception: - LOG.warning("Failed to stop playwright", exc_info=True) - except asyncio.TimeoutError: - LOG.error("Timeout to close playwright, might leave the broswer opening forever") - - async def take_fullpage_screenshot( - self, - file_path: str | None = None, - ) -> bytes: - page = await self.__assert_page() - return await SkyvernFrame.take_scrolling_screenshot( - page=page, - file_path=file_path, - mode=ScreenshotMode.LITE, - ) - - async def take_post_action_screenshot( - self, - scrolling_number: int, - file_path: str | None = None, - ) -> bytes: - page = await self.__assert_page() - return await SkyvernFrame.take_scrolling_screenshot( - page=page, - file_path=file_path, - mode=ScreenshotMode.LITE, - scrolling_number=scrolling_number, - ) diff --git a/skyvern/webeye/browser_manager.py b/skyvern/webeye/browser_manager.py index af8b2dcf..2fca1f85 100644 --- a/skyvern/webeye/browser_manager.py +++ b/skyvern/webeye/browser_manager.py @@ -1,146 +1,21 @@ from __future__ import annotations -import os +from typing import Protocol import structlog -from playwright.async_api import async_playwright -from skyvern.exceptions import MissingBrowserState -from skyvern.forge import app from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun -from skyvern.schemas.runs import ProxyLocation, ProxyLocationInput -from skyvern.webeye.browser_factory import BrowserContextFactory, BrowserState, VideoArtifact +from skyvern.webeye.browser_artifacts import VideoArtifact +from skyvern.webeye.browser_state import BrowserState LOG = structlog.get_logger() -class BrowserManager: - instance = None - pages: dict[str, BrowserState] = dict() +class BrowserManager(Protocol): + pages: dict[str, BrowserState] - def __new__(cls) -> BrowserManager: - if cls.instance is None: - cls.instance = super().__new__(cls) - return cls.instance - - @staticmethod - async def _create_browser_state( - proxy_location: ProxyLocationInput = None, - url: str | None = None, - task_id: str | None = None, - workflow_run_id: str | None = None, - script_id: str | None = None, - organization_id: str | None = None, - extra_http_headers: dict[str, str] | None = None, - browser_address: str | None = None, - browser_profile_id: str | None = None, - ) -> BrowserState: - pw = await async_playwright().start() - ( - browser_context, - browser_artifacts, - browser_cleanup, - ) = await BrowserContextFactory.create_browser_context( - pw, - proxy_location=proxy_location, - url=url, - task_id=task_id, - workflow_run_id=workflow_run_id, - script_id=script_id, - organization_id=organization_id, - extra_http_headers=extra_http_headers, - browser_address=browser_address, - browser_profile_id=browser_profile_id, - ) - return BrowserState( - pw=pw, - browser_context=browser_context, - page=None, - browser_artifacts=browser_artifacts, - browser_cleanup=browser_cleanup, - ) - - def get_for_task(self, task_id: str, workflow_run_id: str | None = None) -> BrowserState | None: - if task_id in self.pages: - return self.pages[task_id] - - if workflow_run_id and workflow_run_id in self.pages: - LOG.info( - "Browser state for task not found. Using browser state for workflow run", - task_id=task_id, - workflow_run_id=workflow_run_id, - ) - self.pages[task_id] = self.pages[workflow_run_id] - return self.pages[task_id] - - return None - - async def get_or_create_for_task( - self, - task: Task, - browser_session_id: str | None = None, - ) -> BrowserState: - browser_state = self.get_for_task(task_id=task.task_id, workflow_run_id=task.workflow_run_id) - if browser_state is not None: - return browser_state - - if browser_session_id: - LOG.info( - "Getting browser state for task from persistent sessions manager", - browser_session_id=browser_session_id, - ) - browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state( - browser_session_id, organization_id=task.organization_id - ) - if browser_state is None: - LOG.warning( - "Browser state not found in persistent sessions manager", - browser_session_id=browser_session_id, - ) - else: - if task.organization_id: - LOG.info("User to occupy browser session here", browser_session_id=browser_session_id) - else: - LOG.warning("Organization ID is not set for task", task_id=task.task_id) - page = await browser_state.get_working_page() - if page: - await browser_state.navigate_to_url(page=page, url=task.url) - else: - LOG.warning("Browser state has no page", workflow_run_id=task.workflow_run_id) - - if browser_state is None: - LOG.info("Creating browser state for task", task_id=task.task_id) - browser_state = await self._create_browser_state( - proxy_location=task.proxy_location, - url=task.url, - task_id=task.task_id, - organization_id=task.organization_id, - extra_http_headers=task.extra_http_headers, - browser_address=task.browser_address, - ) - - if browser_session_id: - await app.PERSISTENT_SESSIONS_MANAGER.set_browser_state( - browser_session_id, - browser_state, - ) - - self.pages[task.task_id] = browser_state - if task.workflow_run_id: - self.pages[task.workflow_run_id] = browser_state - - # The URL here is only used when creating a new page, and not when using an existing page. - # This will make sure browser_state.page is not None. - await browser_state.get_or_create_page( - url=task.url, - proxy_location=task.proxy_location, - task_id=task.task_id, - organization_id=task.organization_id, - extra_http_headers=task.extra_http_headers, - browser_address=task.browser_address, - ) - return browser_state + async def get_or_create_for_task(self, task: Task, browser_session_id: str | None = None) -> BrowserState: ... async def get_or_create_for_workflow_run( self, @@ -148,171 +23,7 @@ class BrowserManager: url: str | None = None, browser_session_id: str | None = None, browser_profile_id: str | None = None, - ) -> BrowserState: - parent_workflow_run_id = workflow_run.parent_workflow_run_id - workflow_run_id = workflow_run.workflow_run_id - if browser_profile_id is None: - browser_profile_id = workflow_run.browser_profile_id - browser_state = self.get_for_workflow_run( - workflow_run_id=workflow_run_id, parent_workflow_run_id=parent_workflow_run_id - ) - if browser_state: - # always keep the browser state for the workflow run and the parent workflow run synced - self.pages[workflow_run_id] = browser_state - if parent_workflow_run_id: - self.pages[parent_workflow_run_id] = browser_state - return browser_state - - if browser_session_id: - # TODO: what if there's a parent workflow run? - LOG.info( - "Getting browser state for workflow run from persistent sessions manager", - browser_session_id=browser_session_id, - ) - browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state( - browser_session_id, organization_id=workflow_run.organization_id - ) - if browser_state is None: - LOG.warning( - "Browser state not found in persistent sessions manager", browser_session_id=browser_session_id - ) - else: - LOG.info("Used to occupy browser session here", browser_session_id=browser_session_id) - page = await browser_state.get_working_page() - if page: - if url: - await browser_state.navigate_to_url(page=page, url=url) - else: - LOG.warning("Browser state has no page", workflow_run_id=workflow_run.workflow_run_id) - - if browser_state is None: - LOG.info( - "Creating browser state for workflow run", - workflow_run_id=workflow_run.workflow_run_id, - ) - browser_state = await self._create_browser_state( - proxy_location=workflow_run.proxy_location, - url=url, - workflow_run_id=workflow_run.workflow_run_id, - organization_id=workflow_run.organization_id, - extra_http_headers=workflow_run.extra_http_headers, - browser_address=workflow_run.browser_address, - browser_profile_id=browser_profile_id, - ) - - if browser_session_id: - await app.PERSISTENT_SESSIONS_MANAGER.set_browser_state( - browser_session_id, - browser_state, - ) - - self.pages[workflow_run_id] = browser_state - if parent_workflow_run_id: - self.pages[parent_workflow_run_id] = browser_state - - # The URL here is only used when creating a new page, and not when using an existing page. - # This will make sure browser_state.page is not None. - await browser_state.get_or_create_page( - url=url, - proxy_location=workflow_run.proxy_location, - workflow_run_id=workflow_run.workflow_run_id, - organization_id=workflow_run.organization_id, - extra_http_headers=workflow_run.extra_http_headers, - browser_address=workflow_run.browser_address, - browser_profile_id=browser_profile_id, - ) - return browser_state - - def get_for_workflow_run( - self, workflow_run_id: str, parent_workflow_run_id: str | None = None - ) -> BrowserState | None: - if parent_workflow_run_id and parent_workflow_run_id in self.pages: - return self.pages[parent_workflow_run_id] - - if workflow_run_id in self.pages: - return self.pages[workflow_run_id] - - return None - - def set_video_artifact_for_task(self, task: Task, artifacts: list[VideoArtifact]) -> None: - if task.workflow_run_id and task.workflow_run_id in self.pages: - self.pages[task.workflow_run_id].browser_artifacts.video_artifacts = artifacts - return - if task.task_id in self.pages: - self.pages[task.task_id].browser_artifacts.video_artifacts = artifacts - return - - raise MissingBrowserState(task_id=task.task_id) - - async def get_video_artifacts( - self, - browser_state: BrowserState, - task_id: str = "", - workflow_id: str = "", - workflow_run_id: str = "", - ) -> list[VideoArtifact]: - if len(browser_state.browser_artifacts.video_artifacts) == 0: - LOG.warning( - "Video data not found for task", - task_id=task_id, - workflow_id=workflow_id, - workflow_run_id=workflow_run_id, - ) - return [] - - for i, video_artifact in enumerate(browser_state.browser_artifacts.video_artifacts): - path = video_artifact.video_path - if path and os.path.exists(path=path): - with open(path, "rb") as f: - browser_state.browser_artifacts.video_artifacts[i].video_data = f.read() - - return browser_state.browser_artifacts.video_artifacts - - async def get_har_data( - self, - browser_state: BrowserState, - task_id: str = "", - workflow_id: str = "", - workflow_run_id: str = "", - ) -> bytes: - if browser_state: - path = browser_state.browser_artifacts.har_path - if path and os.path.exists(path=path): - with open(path, "rb") as f: - return f.read() - LOG.warning( - "HAR data not found for task", - task_id=task_id, - workflow_id=workflow_id, - workflow_run_id=workflow_run_id, - ) - return b"" - - async def get_browser_console_log( - self, - browser_state: BrowserState, - task_id: str = "", - workflow_id: str = "", - workflow_run_id: str = "", - ) -> bytes: - if browser_state.browser_artifacts.browser_console_log_path is None: - LOG.warning( - "browser console log not found for task", - task_id=task_id, - workflow_id=workflow_id, - workflow_run_id=workflow_run_id, - ) - return b"" - - return await browser_state.browser_artifacts.read_browser_console_log() - - @classmethod - async def close(cls) -> None: - LOG.info("Closing BrowserManager") - for browser_state in cls.pages.values(): - await browser_state.close() - cls.pages = dict() - LOG.info("BrowserManger is closed") + ) -> BrowserState: ... async def cleanup_for_task( self, @@ -320,32 +31,7 @@ class BrowserManager: close_browser_on_completion: bool = True, browser_session_id: str | None = None, organization_id: str | None = None, - ) -> BrowserState | None: - """ - Developer notes: handle errors here. Do not raise error from this function. - If error occurs, log it and address the cleanup error. - """ - LOG.info("Cleaning up for task") - browser_state_to_close = self.pages.pop(task_id, None) - if browser_state_to_close: - # Stop tracing before closing the browser if tracing is enabled - if browser_state_to_close.browser_context and browser_state_to_close.browser_artifacts.traces_dir: - trace_path = f"{browser_state_to_close.browser_artifacts.traces_dir}/{task_id}.zip" - await browser_state_to_close.browser_context.tracing.stop(path=trace_path) - LOG.info("Stopped tracing", trace_path=trace_path) - await browser_state_to_close.close(close_browser_on_completion=close_browser_on_completion) - LOG.info("Task is cleaned up") - - if browser_session_id: - if organization_id: - await app.PERSISTENT_SESSIONS_MANAGER.release_browser_session( - browser_session_id, organization_id=organization_id - ) - LOG.info("Released browser session", browser_session_id=browser_session_id) - else: - LOG.warning("Organization ID not specified, cannot release browser session", task_id=task_id) - - return browser_state_to_close + ) -> BrowserState | None: ... async def cleanup_for_workflow_run( self, @@ -354,88 +40,46 @@ class BrowserManager: close_browser_on_completion: bool = True, browser_session_id: str | None = None, organization_id: str | None = None, - ) -> BrowserState | None: - LOG.info("Cleaning up for workflow run") - browser_state_to_close = self.pages.pop(workflow_run_id, None) - if browser_state_to_close: - # Stop tracing before closing the browser if tracing is enabled - if browser_state_to_close.browser_context and browser_state_to_close.browser_artifacts.traces_dir: - trace_path = f"{browser_state_to_close.browser_artifacts.traces_dir}/{workflow_run_id}.zip" - await browser_state_to_close.browser_context.tracing.stop(path=trace_path) - LOG.info("Stopped tracing", trace_path=trace_path) - - await browser_state_to_close.close(close_browser_on_completion=close_browser_on_completion) - for task_id in task_ids: - task_browser_state = self.pages.pop(task_id, None) - if task_browser_state is None: - continue - try: - await task_browser_state.close() - except Exception: - LOG.info( - "Failed to close the browser state from the task block, might because it's already closed.", - exc_info=True, - task_id=task_id, - workflow_run_id=workflow_run_id, - ) - LOG.info("Workflow run is cleaned up") - - if browser_session_id: - if organization_id: - await app.PERSISTENT_SESSIONS_MANAGER.release_browser_session( - browser_session_id, organization_id=organization_id - ) - LOG.info("Released browser session", browser_session_id=browser_session_id) - else: - LOG.warning( - "Organization ID not specified, cannot release browser session", workflow_run_id=workflow_run_id - ) - - return browser_state_to_close + ) -> BrowserState | None: ... async def get_or_create_for_script( self, script_id: str | None = None, browser_session_id: str | None = None, - ) -> BrowserState: - browser_state = self.get_for_script(script_id=script_id) - if browser_state: - return browser_state + ) -> BrowserState: ... - if browser_session_id: - LOG.info( - "Getting browser state for script", - browser_session_id=browser_session_id, - ) - browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state( - browser_session_id, organization_id=script_id - ) - if browser_state is None: - LOG.warning( - "Browser state not found in persistent sessions manager", - browser_session_id=browser_session_id, - ) - else: - page = await browser_state.get_working_page() - if not page: - LOG.warning("Browser state has no page to run the script", script_id=script_id) - proxy_location = ProxyLocation.RESIDENTIAL - if not browser_state: - browser_state = await self._create_browser_state( - proxy_location=proxy_location, - script_id=script_id, - ) + def get_for_task(self, task_id: str, workflow_run_id: str | None = None) -> BrowserState | None: ... - if script_id: - self.pages[script_id] = browser_state - await browser_state.get_or_create_page( - proxy_location=proxy_location, - script_id=script_id, - ) + def get_for_workflow_run( + self, + workflow_run_id: str, + parent_workflow_run_id: str | None = None, + ) -> BrowserState | None: ... - return browser_state + def get_for_script(self, script_id: str | None = None) -> BrowserState | None: ... - def get_for_script(self, script_id: str | None = None) -> BrowserState | None: - if script_id and script_id in self.pages: - return self.pages[script_id] - return None + def set_video_artifact_for_task(self, task: Task, artifacts: list[VideoArtifact]) -> None: ... + + async def get_video_artifacts( + self, + browser_state: BrowserState, + task_id: str = "", + workflow_id: str = "", + workflow_run_id: str = "", + ) -> list[VideoArtifact]: ... + + async def get_har_data( + self, + browser_state: BrowserState, + task_id: str = "", + workflow_id: str = "", + workflow_run_id: str = "", + ) -> bytes: ... + + async def get_browser_console_log( + self, + browser_state: BrowserState, + task_id: str = "", + workflow_id: str = "", + workflow_run_id: str = "", + ) -> bytes: ... diff --git a/skyvern/webeye/browser_state.py b/skyvern/webeye/browser_state.py new file mode 100644 index 00000000..550c1637 --- /dev/null +++ b/skyvern/webeye/browser_state.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from typing import Protocol + +from playwright.async_api import BrowserContext, Page, Playwright + +from skyvern.config import settings +from skyvern.constants import NAVIGATION_MAX_RETRY_TIME +from skyvern.schemas.runs import ProxyLocationInput +from skyvern.webeye.browser_artifacts import BrowserArtifacts +from skyvern.webeye.browser_factory import BrowserCleanupFunc + + +class BrowserState(Protocol): + browser_context: BrowserContext | None + browser_artifacts: BrowserArtifacts + browser_cleanup: BrowserCleanupFunc + pw: Playwright + + async def check_and_fix_state( + self, + url: str | None = None, + proxy_location: ProxyLocationInput = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + script_id: str | None = None, + organization_id: str | None = None, + extra_http_headers: dict[str, str] | None = None, + browser_address: str | None = None, + browser_profile_id: str | None = None, + ) -> None: ... + + async def get_working_page(self) -> Page | None: ... + + async def must_get_working_page(self) -> Page: ... + + async def set_working_page(self, page: Page | None, index: int = 0) -> None: ... + + async def navigate_to_url(self, page: Page, url: str, retry_times: int = NAVIGATION_MAX_RETRY_TIME) -> None: ... + + async def get_or_create_page( + self, + url: str | None = None, + proxy_location: ProxyLocationInput = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + script_id: str | None = None, + organization_id: str | None = None, + extra_http_headers: dict[str, str] | None = None, + browser_address: str | None = None, + browser_profile_id: str | None = None, + ) -> Page: ... + + async def list_valid_pages(self, max_pages: int = settings.BROWSER_MAX_PAGES_NUMBER) -> list[Page]: ... + + async def validate_browser_context(self, page: Page) -> bool: ... + + async def close_current_open_page(self) -> bool: ... + + async def stop_page_loading(self) -> None: ... + + async def new_page(self) -> Page: ... + + async def reload_page(self) -> None: ... + + async def close(self, close_browser_on_completion: bool = True) -> None: ... + + async def take_fullpage_screenshot(self, file_path: str | None = None) -> bytes: ... + + async def take_post_action_screenshot(self, scrolling_number: int, file_path: str | None = None) -> bytes: ... diff --git a/skyvern/webeye/persistent_sessions_manager.py b/skyvern/webeye/persistent_sessions_manager.py index 2dd0038f..01da90e8 100644 --- a/skyvern/webeye/persistent_sessions_manager.py +++ b/skyvern/webeye/persistent_sessions_manager.py @@ -17,7 +17,7 @@ from skyvern.forge.sdk.schemas.persistent_browser_sessions import ( is_final_status, ) from skyvern.schemas.runs import ProxyLocation, ProxyLocationInput -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState LOG = structlog.get_logger() diff --git a/skyvern/webeye/real_browser_manager.py b/skyvern/webeye/real_browser_manager.py new file mode 100644 index 00000000..241db72b --- /dev/null +++ b/skyvern/webeye/real_browser_manager.py @@ -0,0 +1,438 @@ +from __future__ import annotations + +import os + +import structlog +from playwright.async_api import async_playwright + +from skyvern.exceptions import MissingBrowserState +from skyvern.forge import app +from skyvern.forge.sdk.schemas.tasks import Task +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun +from skyvern.schemas.runs import ProxyLocation, ProxyLocationInput +from skyvern.webeye.browser_artifacts import VideoArtifact +from skyvern.webeye.browser_factory import BrowserContextFactory +from skyvern.webeye.browser_state import BrowserState +from skyvern.webeye.real_browser_state import RealBrowserState + +LOG = structlog.get_logger() + + +class RealBrowserManager: + def __init__(self) -> None: + self.pages: dict[str, BrowserState] = {} + + @staticmethod + async def _create_browser_state( + proxy_location: ProxyLocationInput = None, + url: str | None = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + script_id: str | None = None, + organization_id: str | None = None, + extra_http_headers: dict[str, str] | None = None, + browser_address: str | None = None, + browser_profile_id: str | None = None, + ) -> BrowserState: + pw = await async_playwright().start() + ( + browser_context, + browser_artifacts, + browser_cleanup, + ) = await BrowserContextFactory.create_browser_context( + pw, + proxy_location=proxy_location, + url=url, + task_id=task_id, + workflow_run_id=workflow_run_id, + script_id=script_id, + organization_id=organization_id, + extra_http_headers=extra_http_headers, + browser_address=browser_address, + browser_profile_id=browser_profile_id, + ) + return RealBrowserState( + pw=pw, + browser_context=browser_context, + page=None, + browser_artifacts=browser_artifacts, + browser_cleanup=browser_cleanup, + ) + + def get_for_task(self, task_id: str, workflow_run_id: str | None = None) -> BrowserState | None: + if task_id in self.pages: + return self.pages[task_id] + + if workflow_run_id and workflow_run_id in self.pages: + LOG.info( + "Browser state for task not found. Using browser state for workflow run", + task_id=task_id, + workflow_run_id=workflow_run_id, + ) + self.pages[task_id] = self.pages[workflow_run_id] + return self.pages[task_id] + + return None + + async def get_or_create_for_task( + self, + task: Task, + browser_session_id: str | None = None, + ) -> BrowserState: + browser_state = self.get_for_task(task_id=task.task_id, workflow_run_id=task.workflow_run_id) + if browser_state is not None: + return browser_state + + if browser_session_id: + LOG.info( + "Getting browser state for task from persistent sessions manager", + browser_session_id=browser_session_id, + ) + browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state( + browser_session_id, organization_id=task.organization_id + ) + if browser_state is None: + LOG.warning( + "Browser state not found in persistent sessions manager", + browser_session_id=browser_session_id, + ) + else: + if task.organization_id: + LOG.info("User to occupy browser session here", browser_session_id=browser_session_id) + else: + LOG.warning("Organization ID is not set for task", task_id=task.task_id) + page = await browser_state.get_working_page() + if page: + await browser_state.navigate_to_url(page=page, url=task.url) + else: + LOG.warning("Browser state has no page", workflow_run_id=task.workflow_run_id) + + if browser_state is None: + LOG.info("Creating browser state for task", task_id=task.task_id) + browser_state = await self._create_browser_state( + proxy_location=task.proxy_location, + url=task.url, + task_id=task.task_id, + organization_id=task.organization_id, + extra_http_headers=task.extra_http_headers, + browser_address=task.browser_address, + ) + + if browser_session_id: + await app.PERSISTENT_SESSIONS_MANAGER.set_browser_state( + browser_session_id, + browser_state, + ) + + self.pages[task.task_id] = browser_state + if task.workflow_run_id: + self.pages[task.workflow_run_id] = browser_state + + # The URL here is only used when creating a new page, and not when using an existing page. + # This will make sure browser_state.page is not None. + await browser_state.get_or_create_page( + url=task.url, + proxy_location=task.proxy_location, + task_id=task.task_id, + organization_id=task.organization_id, + extra_http_headers=task.extra_http_headers, + browser_address=task.browser_address, + ) + return browser_state + + async def get_or_create_for_workflow_run( + self, + workflow_run: WorkflowRun, + url: str | None = None, + browser_session_id: str | None = None, + browser_profile_id: str | None = None, + ) -> BrowserState: + parent_workflow_run_id = workflow_run.parent_workflow_run_id + workflow_run_id = workflow_run.workflow_run_id + if browser_profile_id is None: + browser_profile_id = workflow_run.browser_profile_id + browser_state = self.get_for_workflow_run( + workflow_run_id=workflow_run_id, parent_workflow_run_id=parent_workflow_run_id + ) + if browser_state: + # always keep the browser state for the workflow run and the parent workflow run synced + self.pages[workflow_run_id] = browser_state + if parent_workflow_run_id: + self.pages[parent_workflow_run_id] = browser_state + return browser_state + + if browser_session_id: + # TODO: what if there's a parent workflow run? + LOG.info( + "Getting browser state for workflow run from persistent sessions manager", + browser_session_id=browser_session_id, + ) + browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state( + browser_session_id, organization_id=workflow_run.organization_id + ) + if browser_state is None: + LOG.warning( + "Browser state not found in persistent sessions manager", browser_session_id=browser_session_id + ) + else: + LOG.info("Used to occupy browser session here", browser_session_id=browser_session_id) + page = await browser_state.get_working_page() + if page: + if url: + await browser_state.navigate_to_url(page=page, url=url) + else: + LOG.warning("Browser state has no page", workflow_run_id=workflow_run.workflow_run_id) + + if browser_state is None: + LOG.info( + "Creating browser state for workflow run", + workflow_run_id=workflow_run.workflow_run_id, + ) + browser_state = await self._create_browser_state( + proxy_location=workflow_run.proxy_location, + url=url, + workflow_run_id=workflow_run.workflow_run_id, + organization_id=workflow_run.organization_id, + extra_http_headers=workflow_run.extra_http_headers, + browser_address=workflow_run.browser_address, + browser_profile_id=browser_profile_id, + ) + + if browser_session_id: + await app.PERSISTENT_SESSIONS_MANAGER.set_browser_state( + browser_session_id, + browser_state, + ) + + self.pages[workflow_run_id] = browser_state + if parent_workflow_run_id: + self.pages[parent_workflow_run_id] = browser_state + + # The URL here is only used when creating a new page, and not when using an existing page. + # This will make sure browser_state.page is not None. + await browser_state.get_or_create_page( + url=url, + proxy_location=workflow_run.proxy_location, + workflow_run_id=workflow_run.workflow_run_id, + organization_id=workflow_run.organization_id, + extra_http_headers=workflow_run.extra_http_headers, + browser_address=workflow_run.browser_address, + browser_profile_id=browser_profile_id, + ) + return browser_state + + def get_for_workflow_run( + self, workflow_run_id: str, parent_workflow_run_id: str | None = None + ) -> BrowserState | None: + if parent_workflow_run_id and parent_workflow_run_id in self.pages: + return self.pages[parent_workflow_run_id] + + if workflow_run_id in self.pages: + return self.pages[workflow_run_id] + + return None + + def set_video_artifact_for_task(self, task: Task, artifacts: list[VideoArtifact]) -> None: + if task.workflow_run_id and task.workflow_run_id in self.pages: + self.pages[task.workflow_run_id].browser_artifacts.video_artifacts = artifacts + return + if task.task_id in self.pages: + self.pages[task.task_id].browser_artifacts.video_artifacts = artifacts + return + + raise MissingBrowserState(task_id=task.task_id) + + async def get_video_artifacts( + self, + browser_state: BrowserState, + task_id: str = "", + workflow_id: str = "", + workflow_run_id: str = "", + ) -> list[VideoArtifact]: + if len(browser_state.browser_artifacts.video_artifacts) == 0: + LOG.warning( + "Video data not found for task", + task_id=task_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + ) + return [] + + for i, video_artifact in enumerate(browser_state.browser_artifacts.video_artifacts): + path = video_artifact.video_path + if path and os.path.exists(path=path): + with open(path, "rb") as f: + browser_state.browser_artifacts.video_artifacts[i].video_data = f.read() + + return browser_state.browser_artifacts.video_artifacts + + async def get_har_data( + self, + browser_state: BrowserState, + task_id: str = "", + workflow_id: str = "", + workflow_run_id: str = "", + ) -> bytes: + if browser_state: + path = browser_state.browser_artifacts.har_path + if path and os.path.exists(path=path): + with open(path, "rb") as f: + return f.read() + LOG.warning( + "HAR data not found for task", + task_id=task_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + ) + return b"" + + async def get_browser_console_log( + self, + browser_state: BrowserState, + task_id: str = "", + workflow_id: str = "", + workflow_run_id: str = "", + ) -> bytes: + if browser_state.browser_artifacts.browser_console_log_path is None: + LOG.warning( + "browser console log not found for task", + task_id=task_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + ) + return b"" + + return await browser_state.browser_artifacts.read_browser_console_log() + + async def close(self) -> None: + LOG.info("Closing BrowserManager") + for browser_state in self.pages.values(): + await browser_state.close() + self.pages = dict() + LOG.info("BrowserManger is closed") + + async def cleanup_for_task( + self, + task_id: str, + close_browser_on_completion: bool = True, + browser_session_id: str | None = None, + organization_id: str | None = None, + ) -> BrowserState | None: + """ + Developer notes: handle errors here. Do not raise error from this function. + If error occurs, log it and address the cleanup error. + """ + LOG.info("Cleaning up for task") + browser_state_to_close = self.pages.pop(task_id, None) + if browser_state_to_close: + # Stop tracing before closing the browser if tracing is enabled + if browser_state_to_close.browser_context and browser_state_to_close.browser_artifacts.traces_dir: + trace_path = f"{browser_state_to_close.browser_artifacts.traces_dir}/{task_id}.zip" + await browser_state_to_close.browser_context.tracing.stop(path=trace_path) + LOG.info("Stopped tracing", trace_path=trace_path) + await browser_state_to_close.close(close_browser_on_completion=close_browser_on_completion) + LOG.info("Task is cleaned up") + + if browser_session_id: + if organization_id: + await app.PERSISTENT_SESSIONS_MANAGER.release_browser_session( + browser_session_id, organization_id=organization_id + ) + LOG.info("Released browser session", browser_session_id=browser_session_id) + else: + LOG.warning("Organization ID not specified, cannot release browser session", task_id=task_id) + + return browser_state_to_close + + async def cleanup_for_workflow_run( + self, + workflow_run_id: str, + task_ids: list[str], + close_browser_on_completion: bool = True, + browser_session_id: str | None = None, + organization_id: str | None = None, + ) -> BrowserState | None: + LOG.info("Cleaning up for workflow run") + browser_state_to_close = self.pages.pop(workflow_run_id, None) + if browser_state_to_close: + # Stop tracing before closing the browser if tracing is enabled + if browser_state_to_close.browser_context and browser_state_to_close.browser_artifacts.traces_dir: + trace_path = f"{browser_state_to_close.browser_artifacts.traces_dir}/{workflow_run_id}.zip" + await browser_state_to_close.browser_context.tracing.stop(path=trace_path) + LOG.info("Stopped tracing", trace_path=trace_path) + + await browser_state_to_close.close(close_browser_on_completion=close_browser_on_completion) + for task_id in task_ids: + task_browser_state = self.pages.pop(task_id, None) + if task_browser_state is None: + continue + try: + await task_browser_state.close() + except Exception: + LOG.info( + "Failed to close the browser state from the task block, might because it's already closed.", + exc_info=True, + task_id=task_id, + workflow_run_id=workflow_run_id, + ) + LOG.info("Workflow run is cleaned up") + + if browser_session_id: + if organization_id: + await app.PERSISTENT_SESSIONS_MANAGER.release_browser_session( + browser_session_id, organization_id=organization_id + ) + LOG.info("Released browser session", browser_session_id=browser_session_id) + else: + LOG.warning( + "Organization ID not specified, cannot release browser session", workflow_run_id=workflow_run_id + ) + + return browser_state_to_close + + async def get_or_create_for_script( + self, + script_id: str | None = None, + browser_session_id: str | None = None, + ) -> BrowserState: + browser_state = self.get_for_script(script_id=script_id) + if browser_state: + return browser_state + + if browser_session_id: + LOG.info( + "Getting browser state for script", + browser_session_id=browser_session_id, + ) + browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state( + browser_session_id, organization_id=script_id + ) + if browser_state is None: + LOG.warning( + "Browser state not found in persistent sessions manager", + browser_session_id=browser_session_id, + ) + else: + page = await browser_state.get_working_page() + if not page: + LOG.warning("Browser state has no page to run the script", script_id=script_id) + proxy_location = ProxyLocation.RESIDENTIAL + if not browser_state: + browser_state = await self._create_browser_state( + proxy_location=proxy_location, + script_id=script_id, + ) + + if script_id: + self.pages[script_id] = browser_state + await browser_state.get_or_create_page( + proxy_location=proxy_location, + script_id=script_id, + ) + + return browser_state + + def get_for_script(self, script_id: str | None = None) -> BrowserState | None: + if script_id and script_id in self.pages: + return self.pages[script_id] + return None diff --git a/skyvern/webeye/real_browser_state.py b/skyvern/webeye/real_browser_state.py new file mode 100644 index 00000000..e0878931 --- /dev/null +++ b/skyvern/webeye/real_browser_state.py @@ -0,0 +1,435 @@ +from __future__ import annotations + +import asyncio +import time +from urllib.parse import urlparse + +import structlog +from playwright.async_api import BrowserContext, Page, Playwright + +from skyvern.config import settings +from skyvern.constants import BROWSER_CLOSE_TIMEOUT, BROWSER_PAGE_CLOSE_TIMEOUT, NAVIGATION_MAX_RETRY_TIME +from skyvern.exceptions import ( + EmptyBrowserContext, + FailedToNavigateToUrl, + FailedToReloadPage, + FailedToStopLoadingPage, + MissingBrowserStatePage, +) +from skyvern.schemas.runs import ProxyLocationInput +from skyvern.webeye.browser_artifacts import BrowserArtifacts, VideoArtifact +from skyvern.webeye.browser_factory import BrowserCleanupFunc, BrowserContextFactory +from skyvern.webeye.browser_state import BrowserState +from skyvern.webeye.utils.page import ScreenshotMode, SkyvernFrame + +LOG = structlog.get_logger() + + +class RealBrowserState(BrowserState): + def __init__( + self, + pw: Playwright, + browser_context: BrowserContext | None = None, + page: Page | None = None, + browser_artifacts: BrowserArtifacts = BrowserArtifacts(), + browser_cleanup: BrowserCleanupFunc = None, + ): + self.__page = page + self.pw = pw + self.browser_context = browser_context + self.browser_artifacts = browser_artifacts + self.browser_cleanup = browser_cleanup + + async def __assert_page(self) -> Page: + page = await self.get_working_page() + if page is not None: + return page + pages = (self.browser_context.pages or []) if self.browser_context else [] + LOG.error("BrowserState has no page", urls=[p.url for p in pages]) + raise MissingBrowserStatePage() + + async def _close_all_other_pages(self) -> None: + cur_page = await self.get_working_page() + if not self.browser_context or not cur_page: + return + pages = self.browser_context.pages + for page in pages: + if page != cur_page: + try: + async with asyncio.timeout(2): + await page.close() + except asyncio.TimeoutError: + LOG.warning("Timeout to close the page. Skip closing the page", url=page.url) + except Exception: + LOG.exception("Error while closing the page", url=page.url) + + async def check_and_fix_state( + self, + url: str | None = None, + proxy_location: ProxyLocationInput = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + script_id: str | None = None, + organization_id: str | None = None, + extra_http_headers: dict[str, str] | None = None, + browser_address: str | None = None, + browser_profile_id: str | None = None, + ) -> None: + if self.browser_context is None: + LOG.info("creating browser context") + ( + browser_context, + browser_artifacts, + browser_cleanup, + ) = await BrowserContextFactory.create_browser_context( + self.pw, + url=url, + proxy_location=proxy_location, + task_id=task_id, + workflow_run_id=workflow_run_id, + script_id=script_id, + organization_id=organization_id, + extra_http_headers=extra_http_headers, + browser_address=browser_address, + browser_profile_id=browser_profile_id, + ) + self.browser_context = browser_context + self.browser_artifacts = browser_artifacts + self.browser_cleanup = browser_cleanup + LOG.info("browser context is created") + + if await self.get_working_page() is None: + page: Page | None = None + use_existing_page = False + if browser_address and len(self.browser_context.pages) > 0: + pages = await self.list_valid_pages() + if len(pages) > 0: + page = pages[-1] + use_existing_page = True + if page is None: + page = await self.browser_context.new_page() + + await self.set_working_page(page, 0) + if not use_existing_page: + await self._close_all_other_pages() + + if url and page.url.rstrip("/") != url.rstrip("/"): + await self.navigate_to_url(page=page, url=url) + + async def navigate_to_url(self, page: Page, url: str, retry_times: int = NAVIGATION_MAX_RETRY_TIME) -> None: + try: + for retry_time in range(retry_times): + LOG.info(f"Trying to navigate to {url} and waiting for 1 second.", url=url, retry_time=retry_time) + try: + start_time = time.time() + await page.goto(url, timeout=settings.BROWSER_LOADING_TIMEOUT_MS) + end_time = time.time() + LOG.info( + "Page loading time", + loading_time=end_time - start_time, + url=url, + ) + # Do we need this? + await asyncio.sleep(5) + LOG.info(f"Successfully went to {url}", url=url, retry_time=retry_time) + return + + except Exception as e: + if retry_time >= retry_times - 1: + raise FailedToNavigateToUrl(url=url, error_message=str(e)) + + LOG.warning( + f"Error while navigating to url: {str(e)}", + exc_info=True, + url=url, + retry_time=retry_time, + ) + # Wait for 1 seconds before retrying + await asyncio.sleep(1) + + except Exception as e: + LOG.exception( + f"Failed to navigate to {url} after {retry_times} retries: {str(e)}", + url=url, + ) + raise e + + async def get_working_page(self) -> Page | None: + # HACK: currently, assuming the last page is always the working page. + # Need to refactor this logic when we want to manipulate multi pages together + # TODO: do not use index of pages, it should be more robust if we want to fully support multi pages manipulation + if self.__page is None or self.browser_context is None: + return None + + # pick the last and http/https page as the working page + pages = await self.list_valid_pages() + if len(pages) == 0: + LOG.info("No http, https or blank page found in the browser context, return None") + return None + + last_page = pages[-1] + if self.__page == last_page: + return self.__page + await self.set_working_page(last_page, len(pages) - 1) + return last_page + + async def list_valid_pages(self, max_pages: int = settings.BROWSER_MAX_PAGES_NUMBER) -> list[Page]: + # List all valid pages(blank page, and http/https page) in the browser context, up to max_pages + # MSEdge CDP bug(?) + # when using CDP connect to a MSEdge, the download hub will be included in the context.pages + if self.browser_context is None: + return [] + + pages = [ + http_page + for http_page in self.browser_context.pages + if ( + http_page.url == "about:blank" + or http_page.url == "chrome-error://chromewebdata/" + or urlparse(http_page.url).scheme in ["http", "https"] + ) + ] + + if max_pages <= 0 or len(pages) <= max_pages: + return pages + + reserved_pages = pages[-max_pages:] + + closing_pages = pages[: len(pages) - max_pages] + LOG.warning( + "The page number exceeds the limit, closing the oldest pages. It might cause the video missing", + closing_pages=closing_pages, + ) + for page in closing_pages: + try: + async with asyncio.timeout(BROWSER_PAGE_CLOSE_TIMEOUT): + await page.close() + except Exception: + LOG.warning("Error while closing the page", exc_info=True) + + return reserved_pages + + async def validate_browser_context(self, page: Page) -> bool: + # validate the content + try: + skyvern_frame = await SkyvernFrame.create_instance(frame=page) + html = await skyvern_frame.get_content() + except Exception: + LOG.error( + "Error happened while getting the first page content", + exc_info=True, + ) + return False + + if "Bad gateway error" in html: + LOG.warning("Bad gateway error on the page, recreate a new browser context with another proxy node") + return False + + if "client_connect_forbidden_host" in html: + LOG.warning( + "capture the client_connect_forbidden_host error on the page, recreate a new browser context with another proxy node" + ) + return False + + return True + + async def must_get_working_page(self) -> Page: + page = await self.get_working_page() + if page is None: + raise MissingBrowserStatePage() + return page + + async def set_working_page(self, page: Page | None, index: int = 0) -> None: + self.__page = page + if page is None: + return + if len(self.browser_artifacts.video_artifacts) > index: + if self.browser_artifacts.video_artifacts[index].video_path is None: + try: + async with asyncio.timeout(settings.BROWSER_ACTION_TIMEOUT_MS / 1000): + if page.video: + self.browser_artifacts.video_artifacts[index].video_path = await page.video.path() + except asyncio.TimeoutError: + LOG.info("Timeout to get the page video, skip the exception") + except Exception: + LOG.exception("Error while getting the page video", exc_info=True) + return + + target_length = index + 1 + self.browser_artifacts.video_artifacts.extend( + [VideoArtifact()] * (target_length - len(self.browser_artifacts.video_artifacts)) + ) + try: + async with asyncio.timeout(settings.BROWSER_ACTION_TIMEOUT_MS / 1000): + if page.video: + self.browser_artifacts.video_artifacts[index].video_path = await page.video.path() + except asyncio.TimeoutError: + LOG.info("Timeout to get the page video, skip the exception") + except Exception: + LOG.exception("Error while getting the page video", exc_info=True) + return + + async def get_or_create_page( + self, + url: str | None = None, + proxy_location: ProxyLocationInput = None, + task_id: str | None = None, + workflow_run_id: str | None = None, + script_id: str | None = None, + organization_id: str | None = None, + extra_http_headers: dict[str, str] | None = None, + browser_address: str | None = None, + browser_profile_id: str | None = None, + ) -> Page: + page = await self.get_working_page() + if page is not None: + return page + + try: + await self.check_and_fix_state( + url=url, + proxy_location=proxy_location, + task_id=task_id, + workflow_run_id=workflow_run_id, + script_id=script_id, + organization_id=organization_id, + extra_http_headers=extra_http_headers, + browser_address=browser_address, + browser_profile_id=browser_profile_id, + ) + except Exception as e: + error_message = str(e) + if "net::ERR" not in error_message: + raise e + if not await self.close_current_open_page(): + LOG.warning("Failed to close the current open page") + raise e + await self.check_and_fix_state( + url=url, + proxy_location=proxy_location, + task_id=task_id, + workflow_run_id=workflow_run_id, + script_id=script_id, + organization_id=organization_id, + extra_http_headers=extra_http_headers, + browser_address=browser_address, + browser_profile_id=browser_profile_id, + ) + page = await self.__assert_page() + + if not await self.validate_browser_context(await self.get_working_page()): + if not await self.close_current_open_page(): + LOG.warning("Failed to close the current open page, going to skip the browser context validation") + return page + await self.check_and_fix_state( + url=url, + proxy_location=proxy_location, + task_id=task_id, + workflow_run_id=workflow_run_id, + script_id=script_id, + organization_id=organization_id, + extra_http_headers=extra_http_headers, + browser_address=browser_address, + browser_profile_id=browser_profile_id, + ) + page = await self.__assert_page() + return page + + async def close_current_open_page(self) -> bool: + try: + async with asyncio.timeout(BROWSER_CLOSE_TIMEOUT): + await self._close_all_other_pages() + if self.browser_context is not None: + await self.browser_context.close() + self.browser_context = None + await self.set_working_page(None) + return True + except Exception: + LOG.warning("Error while closing the current open page", exc_info=True) + return False + + async def stop_page_loading(self) -> None: + page = await self.__assert_page() + try: + await SkyvernFrame.evaluate(frame=page, expression="window.stop()") + except Exception as e: + LOG.exception(f"Error while stop loading the page: {repr(e)}") + raise FailedToStopLoadingPage(url=page.url, error_message=repr(e)) + + async def new_page(self) -> Page: + if self.browser_context is None: + raise EmptyBrowserContext() + return await self.browser_context.new_page() + + async def reload_page(self) -> None: + page = await self.__assert_page() + + LOG.info(f"Reload page {page.url} and waiting for 5 seconds") + try: + start_time = time.time() + await page.reload(timeout=settings.BROWSER_LOADING_TIMEOUT_MS) + end_time = time.time() + LOG.info( + "Page loading time", + loading_time=end_time - start_time, + ) + await asyncio.sleep(5) + except Exception as e: + LOG.exception(f"Error while reload url: {repr(e)}") + raise FailedToReloadPage(url=page.url, error_message=repr(e)) + + async def close(self, close_browser_on_completion: bool = True) -> None: + LOG.info("Closing browser state") + try: + async with asyncio.timeout(BROWSER_CLOSE_TIMEOUT): + if self.browser_context and close_browser_on_completion: + LOG.info("Closing browser context and its pages") + try: + await self.browser_context.close() + except Exception: + LOG.warning("Failed to close browser context", exc_info=True) + LOG.info("Main browser context and all its pages are closed") + if self.browser_cleanup is not None: + try: + self.browser_cleanup() + LOG.info("Main browser cleanup is executed") + except Exception: + LOG.warning("Failed to execute browser cleanup", exc_info=True) + except asyncio.TimeoutError: + LOG.error("Timeout to close browser context, going to stop playwright directly") + + try: + async with asyncio.timeout(BROWSER_CLOSE_TIMEOUT): + if self.pw and close_browser_on_completion: + try: + LOG.info("Stopping playwright") + await self.pw.stop() + LOG.info("Playwright is stopped") + except Exception: + LOG.warning("Failed to stop playwright", exc_info=True) + except asyncio.TimeoutError: + LOG.error("Timeout to close playwright, might leave the broswer opening forever") + + async def take_fullpage_screenshot( + self, + file_path: str | None = None, + ) -> bytes: + page = await self.__assert_page() + return await SkyvernFrame.take_scrolling_screenshot( + page=page, + file_path=file_path, + mode=ScreenshotMode.LITE, + ) + + async def take_post_action_screenshot( + self, + scrolling_number: int, + file_path: str | None = None, + ) -> bytes: + page = await self.__assert_page() + return await SkyvernFrame.take_scrolling_screenshot( + page=page, + file_path=file_path, + mode=ScreenshotMode.LITE, + scrolling_number=scrolling_number, + ) diff --git a/skyvern/webeye/scraper/scraper.py b/skyvern/webeye/scraper/scraper.py index 3429cf70..542834b5 100644 --- a/skyvern/webeye/scraper/scraper.py +++ b/skyvern/webeye/scraper/scraper.py @@ -27,7 +27,7 @@ from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.trace import TraceManager from skyvern.utils.image_resizer import Resolution from skyvern.utils.token_counter import count_tokens -from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.browser_state import BrowserState from skyvern.webeye.utils.page import SkyvernFrame LOG = structlog.get_logger()