remove screenshot when check user goal (#1147)

This commit is contained in:
LawyZheng
2024-11-06 23:20:45 +08:00
committed by GitHub
parent c084764373
commit b62c2caae0
3 changed files with 94 additions and 45 deletions

View File

@@ -3,11 +3,11 @@ import copy
import json
from collections import defaultdict
from enum import StrEnum
from typing import Any, Awaitable, Callable
from typing import Any, Awaitable, Callable, Self
import structlog
from playwright.async_api import Frame, Locator, Page
from pydantic import BaseModel
from pydantic import BaseModel, PrivateAttr
from skyvern.constants import BUILDING_ELEMENT_TREE_TIMEOUT_MS, SKYVERN_DIR, SKYVERN_ID_ATTR
from skyvern.exceptions import FailedToTakeScreenshot, UnknownElementTreeFormat
@@ -18,6 +18,7 @@ from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()
CleanupElementTreeFunc = Callable[[Page | Frame, str, list[dict]], Awaitable[list[dict]]]
ScrapeExcludeFunc = Callable[[Page, Frame], Awaitable[bool]]
RESERVED_ATTRIBUTES = {
"accept", # for input file
@@ -211,6 +212,26 @@ class ScrapedPage(BaseModel):
html: str
extracted_text: str | None = None
_browser_state: BrowserState = PrivateAttr()
_clean_up_func: CleanupElementTreeFunc = PrivateAttr()
_scrape_exclude: ScrapeExcludeFunc | None = PrivateAttr(default=None)
def __init__(self, **data: Any) -> None:
missing_attrs = [attr for attr in ["_browser_state", "_clean_up_func"] if attr not in data]
if len(missing_attrs) > 0:
raise ValueError(f"Missing required private attributes: {', '.join(missing_attrs)}")
# popup private attributes
browser_state = data.pop("_browser_state")
clean_up_func = data.pop("_clean_up_func")
scrape_exclude = data.pop("_scrape_exclude")
super().__init__(**data)
self._browser_state = browser_state
self._clean_up_func = clean_up_func
self._scrape_exclude = scrape_exclude
def build_element_tree(self, fmt: ElementTreeFormat = ElementTreeFormat.JSON) -> str:
if fmt == ElementTreeFormat.JSON:
return json.dumps(self.element_tree_trimmed)
@@ -220,13 +241,23 @@ class ScrapedPage(BaseModel):
raise UnknownElementTreeFormat(fmt=fmt)
async def refresh(self, with_screenshot: bool = True) -> Self:
return await scrape_website(
browser_state=self._browser_state,
url=self.url,
cleanup_element_tree=self._clean_up_func,
scrape_exclude=self._scrape_exclude,
with_screenshot=with_screenshot,
)
async def scrape_website(
browser_state: BrowserState,
url: str,
cleanup_element_tree: CleanupElementTreeFunc,
num_retry: int = 0,
scrape_exclude: Callable[[Page, Frame], Awaitable[bool]] | None = None,
scrape_exclude: ScrapeExcludeFunc | None = None,
with_screenshot: bool = True,
) -> ScrapedPage:
"""
************************************************************************************************
@@ -251,7 +282,13 @@ async def scrape_website(
"""
try:
num_retry += 1
return await scrape_web_unsafe(browser_state, url, cleanup_element_tree, scrape_exclude)
return await scrape_web_unsafe(
browser_state=browser_state,
url=url,
cleanup_element_tree=cleanup_element_tree,
scrape_exclude=scrape_exclude,
with_screenshot=with_screenshot,
)
except Exception as e:
# NOTE: MAX_SCRAPING_RETRIES is set to 0 in both staging and production
if num_retry > SettingsManager.get_settings().MAX_SCRAPING_RETRIES:
@@ -272,6 +309,7 @@ async def scrape_website(
cleanup_element_tree,
num_retry=num_retry,
scrape_exclude=scrape_exclude,
with_screenshot=with_screenshot,
)
@@ -318,7 +356,8 @@ async def scrape_web_unsafe(
browser_state: BrowserState,
url: str,
cleanup_element_tree: CleanupElementTreeFunc,
scrape_exclude: Callable[[Page, Frame], Awaitable[bool]] | None = None,
scrape_exclude: ScrapeExcludeFunc | None = None,
with_screenshot: bool = True,
) -> ScrapedPage:
"""
Asynchronous function that performs web scraping without any built-in error handling. This function is intended
@@ -331,10 +370,8 @@ async def scrape_web_unsafe(
:return: Tuple containing Page instance, base64 encoded screenshot, and page elements.
:note: This function does not handle exceptions. Ensure proper error handling in the calling context.
"""
# We only create a new page if one does not exist. This is to allow keeping the same page since we want to
# continue working on the same page that we're taking actions on.
# *This also means URL is only used when creating a new page, and not when using an existing page.
page = await browser_state.get_or_create_page(url)
# browser state must have the page instance, otherwise we should not do scraping
page = await browser_state.must_get_working_page()
# Take screenshots of the page with the bounding boxes. We will remove the bounding boxes later.
# Scroll to the top of the page and take a screenshot.
# Scroll to the next page and take a screenshot until we reach the end of the page.
@@ -345,7 +382,11 @@ async def scrape_web_unsafe(
LOG.info("Waiting for 5 seconds before scraping the website.")
await asyncio.sleep(5)
screenshots = await SkyvernFrame.take_split_screenshots(page=page, url=url, draw_boxes=True)
screenshots: list[bytes] = []
# TODO: do we need to scroll to the button when we scrape without screenshots?
if with_screenshot:
screenshots = await SkyvernFrame.take_split_screenshots(page=page, url=url, draw_boxes=True)
elements, element_tree = await get_interactable_element_tree(page, scrape_exclude)
element_tree = await cleanup_element_tree(page, url, copy.deepcopy(element_tree))
@@ -384,6 +425,9 @@ async def scrape_web_unsafe(
url=page.url,
html=html,
extracted_text=text_content,
_browser_state=browser_state,
_clean_up_func=cleanup_element_tree,
_scrape_exclude=scrape_exclude,
)
@@ -391,7 +435,7 @@ async def get_interactable_element_tree_in_frame(
frames: list[Frame],
elements: list[dict],
element_tree: list[dict],
scrape_exclude: Callable[[Page, Frame], Awaitable[bool]] | None = None,
scrape_exclude: ScrapeExcludeFunc | None = None,
) -> tuple[list[dict], list[dict]]:
for frame in frames:
if frame.is_detached():
@@ -445,7 +489,7 @@ async def get_interactable_element_tree_in_frame(
async def get_interactable_element_tree(
page: Page,
scrape_exclude: Callable[[Page, Frame], Awaitable[bool]] | None = None,
scrape_exclude: ScrapeExcludeFunc | None = None,
) -> tuple[list[dict], list[dict]]:
"""
Get the element tree of the page, including all the elements that are interactable.