diff --git a/skyvern/core/script_generations/real_skyvern_page_ai.py b/skyvern/core/script_generations/real_skyvern_page_ai.py
index 5c002b35..a128898c 100644
--- a/skyvern/core/script_generations/real_skyvern_page_ai.py
+++ b/skyvern/core/script_generations/real_skyvern_page_ai.py
@@ -34,7 +34,7 @@ from skyvern.webeye.actions.handler import (
handle_upload_file_action,
)
from skyvern.webeye.actions.parse_actions import parse_actions
-from skyvern.webeye.scraper.scraper import ScrapedPage
+from skyvern.webeye.scraper.scraped_page import ScrapedPage
jinja_sandbox_env = SandboxedEnvironment()
diff --git a/skyvern/core/script_generations/script_skyvern_page.py b/skyvern/core/script_generations/script_skyvern_page.py
index 1eb8a263..ae75f4e4 100644
--- a/skyvern/core/script_generations/script_skyvern_page.py
+++ b/skyvern/core/script_generations/script_skyvern_page.py
@@ -27,7 +27,7 @@ from skyvern.webeye.actions.actions import (
)
from skyvern.webeye.actions.handler import ActionHandler, handle_complete_action
from skyvern.webeye.browser_state import BrowserState
-from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website
+from skyvern.webeye.scraper.scraped_page import ScrapedPage
LOG = structlog.get_logger()
@@ -106,8 +106,7 @@ class ScriptSkyvernPage(SkyvernPage):
# initialize browser state
# TODO: add workflow_run_id or eventually script_id/script_run_id
browser_state = await cls._get_or_create_browser_state(browser_session_id=browser_session_id)
- return await scrape_website(
- browser_state=browser_state,
+ return await browser_state.scrape_website(
url="",
cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(),
scrape_exclude=app.scrape_exclude,
diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py
index 0a0cf887..4d28cd02 100644
--- a/skyvern/forge/agent.py
+++ b/skyvern/forge/agent.py
@@ -126,7 +126,7 @@ from skyvern.webeye.actions.parse_actions import (
)
from skyvern.webeye.actions.responses import ActionResult, ActionSuccess
from skyvern.webeye.browser_state import BrowserState
-from skyvern.webeye.scraper.scraper import ElementTreeFormat, ScrapedPage, scrape_website
+from skyvern.webeye.scraper.scraped_page import ElementTreeFormat, ScrapedPage
from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()
@@ -2274,10 +2274,9 @@ class ForgeAgent:
draw_boxes = False
scroll = False
- return await scrape_website(
- browser_state,
- task.url,
- app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step),
+ return await browser_state.scrape_website(
+ url=task.url,
+ cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step),
scrape_exclude=app.scrape_exclude,
max_screenshot_number=max_screenshot_number,
draw_boxes=draw_boxes,
diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py
index 6b82900d..bd2019e2 100644
--- a/skyvern/forge/agent_functions.py
+++ b/skyvern/forge/agent_functions.py
@@ -24,7 +24,7 @@ from skyvern.services import workflow_script_service
from skyvern.webeye.actions.action_types import POST_ACTION_EXECUTION_ACTION_TYPES
from skyvern.webeye.actions.actions import Action
from skyvern.webeye.browser_state import BrowserState
-from skyvern.webeye.scraper.scraper import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html
+from skyvern.webeye.scraper.scraped_page import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html
from skyvern.webeye.utils.dom import SkyvernElement
from skyvern.webeye.utils.page import SkyvernFrame
diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py
index 93e4621e..3859ed52 100644
--- a/skyvern/services/script_service.py
+++ b/skyvern/services/script_service.py
@@ -61,7 +61,7 @@ from skyvern.schemas.scripts import (
ScriptStatus,
)
from skyvern.schemas.workflows import BlockStatus, BlockType, FileStorageType, FileType
-from skyvern.webeye.scraper.scraper import ElementTreeFormat
+from skyvern.webeye.scraper.scraped_page import ElementTreeFormat
LOG = structlog.get_logger()
jinja_sandbox_env = SandboxedEnvironment()
diff --git a/skyvern/services/task_v2_service.py b/skyvern/services/task_v2_service.py
index a015fdb9..a5ae965e 100644
--- a/skyvern/services/task_v2_service.py
+++ b/skyvern/services/task_v2_service.py
@@ -56,7 +56,7 @@ from skyvern.schemas.workflows import (
from skyvern.utils.prompt_engine import load_prompt_with_elements
from skyvern.utils.strings import generate_random_string
from skyvern.webeye.browser_state import BrowserState
-from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website
+from skyvern.webeye.scraper.scraped_page import ScrapedPage
from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()
@@ -682,10 +682,9 @@ async def run_task_v2_helper(
)
else:
try:
- scraped_page = await scrape_website(
- browser_state,
- url,
- app.AGENT_FUNCTION.cleanup_element_tree_factory(),
+ scraped_page = await browser_state.scrape_website(
+ url=url,
+ cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(),
scrape_exclude=app.scrape_exclude,
)
if page is None:
@@ -908,10 +907,9 @@ async def run_task_v2_helper(
browser_session_id=browser_session_id,
browser_profile_id=workflow_run.browser_profile_id,
)
- scraped_page = await scrape_website(
- browser_state,
- url,
- app.AGENT_FUNCTION.cleanup_element_tree_factory(),
+ scraped_page = await browser_state.scrape_website(
+ url=url,
+ cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(),
scrape_exclude=app.scrape_exclude,
)
completion_screenshots = scraped_page.screenshots
diff --git a/skyvern/utils/prompt_engine.py b/skyvern/utils/prompt_engine.py
index 46538d1f..904cda72 100644
--- a/skyvern/utils/prompt_engine.py
+++ b/skyvern/utils/prompt_engine.py
@@ -7,7 +7,7 @@ from skyvern.constants import DEFAULT_MAX_TOKENS
from skyvern.errors.errors import UserDefinedError
from skyvern.forge.sdk.prompting import PromptEngine
from skyvern.utils.token_counter import count_tokens
-from skyvern.webeye.scraper.scraper import ElementTreeBuilder
+from skyvern.webeye.scraper.scraped_page import ElementTreeBuilder
LOG = structlog.get_logger()
diff --git a/skyvern/webeye/actions/caching.py b/skyvern/webeye/actions/caching.py
index 41424537..74d5c2fa 100644
--- a/skyvern/webeye/actions/caching.py
+++ b/skyvern/webeye/actions/caching.py
@@ -7,7 +7,7 @@ from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task
from skyvern.webeye.actions.action_types import ActionType
from skyvern.webeye.actions.actions import Action, ActionStatus, SelectOption
-from skyvern.webeye.scraper.scraper import ScrapedPage
+from skyvern.webeye.scraper.scraped_page import ScrapedPage
LOG = structlog.get_logger()
diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py
index 8f18edb5..db448e2c 100644
--- a/skyvern/webeye/actions/handler.py
+++ b/skyvern/webeye/actions/handler.py
@@ -101,16 +101,14 @@ from skyvern.webeye.actions.actions import (
WebAction,
)
from skyvern.webeye.actions.responses import ActionAbort, ActionFailure, ActionResult, ActionSuccess
-from skyvern.webeye.scraper.scraper import (
+from skyvern.webeye.scraper.scraped_page import (
CleanupElementTreeFunc,
ElementTreeBuilder,
ElementTreeFormat,
- IncrementalScrapePage,
ScrapedPage,
- hash_element,
json_to_html,
- trim_element_tree,
)
+from skyvern.webeye.scraper.scraper import IncrementalScrapePage, hash_element, trim_element_tree
from skyvern.webeye.utils.dom import COMMON_INPUT_TAGS, DomUtil, InteractiveElement, SkyvernElement
from skyvern.webeye.utils.page import SkyvernFrame
diff --git a/skyvern/webeye/actions/models.py b/skyvern/webeye/actions/models.py
index d9c9aa0c..b5900a11 100644
--- a/skyvern/webeye/actions/models.py
+++ b/skyvern/webeye/actions/models.py
@@ -10,7 +10,7 @@ from skyvern.errors.errors import UserDefinedError
from skyvern.schemas.steps import AgentStepOutput
from skyvern.webeye.actions.actions import Action, DecisiveAction
from skyvern.webeye.actions.responses import ActionResult
-from skyvern.webeye.scraper.scraper import ScrapedPage
+from skyvern.webeye.scraper.scraped_page import ScrapedPage
class DetailedAgentStepOutput(BaseModel):
diff --git a/skyvern/webeye/actions/parse_actions.py b/skyvern/webeye/actions/parse_actions.py
index 0d55799e..a97b4638 100644
--- a/skyvern/webeye/actions/parse_actions.py
+++ b/skyvern/webeye/actions/parse_actions.py
@@ -42,7 +42,7 @@ from skyvern.webeye.actions.actions import (
VerificationCodeAction,
WaitAction,
)
-from skyvern.webeye.scraper.scraper import ScrapedPage
+from skyvern.webeye.scraper.scraped_page import ScrapedPage
LOG = structlog.get_logger()
diff --git a/skyvern/webeye/browser_state.py b/skyvern/webeye/browser_state.py
index 550c1637..68a02e7d 100644
--- a/skyvern/webeye/browser_state.py
+++ b/skyvern/webeye/browser_state.py
@@ -9,6 +9,7 @@ from skyvern.constants import NAVIGATION_MAX_RETRY_TIME
from skyvern.schemas.runs import ProxyLocationInput
from skyvern.webeye.browser_artifacts import BrowserArtifacts
from skyvern.webeye.browser_factory import BrowserCleanupFunc
+from skyvern.webeye.scraper.scraped_page import CleanupElementTreeFunc, ScrapedPage, ScrapeExcludeFunc
class BrowserState(Protocol):
@@ -68,3 +69,18 @@ class BrowserState(Protocol):
async def take_fullpage_screenshot(self, file_path: str | None = None) -> bytes: ...
async def take_post_action_screenshot(self, scrolling_number: int, file_path: str | None = None) -> bytes: ...
+
+ async def scrape_website(
+ self,
+ url: str,
+ cleanup_element_tree: CleanupElementTreeFunc,
+ num_retry: int = 0,
+ max_retries: int = settings.MAX_SCRAPING_RETRIES,
+ scrape_exclude: ScrapeExcludeFunc | None = None,
+ take_screenshots: bool = True,
+ draw_boxes: bool = True,
+ max_screenshot_number: int = settings.MAX_NUM_SCREENSHOTS,
+ scroll: bool = True,
+ support_empty_page: bool = False,
+ wait_seconds: float = 0,
+ ) -> ScrapedPage: ...
diff --git a/skyvern/webeye/real_browser_state.py b/skyvern/webeye/real_browser_state.py
index e0878931..008f9308 100644
--- a/skyvern/webeye/real_browser_state.py
+++ b/skyvern/webeye/real_browser_state.py
@@ -20,6 +20,8 @@ from skyvern.schemas.runs import ProxyLocationInput
from skyvern.webeye.browser_artifacts import BrowserArtifacts, VideoArtifact
from skyvern.webeye.browser_factory import BrowserCleanupFunc, BrowserContextFactory
from skyvern.webeye.browser_state import BrowserState
+from skyvern.webeye.scraper import scraper
+from skyvern.webeye.scraper.scraped_page import CleanupElementTreeFunc, ScrapedPage, ScrapeExcludeFunc
from skyvern.webeye.utils.page import ScreenshotMode, SkyvernFrame
LOG = structlog.get_logger()
@@ -378,6 +380,35 @@ class RealBrowserState(BrowserState):
LOG.exception(f"Error while reload url: {repr(e)}")
raise FailedToReloadPage(url=page.url, error_message=repr(e))
+ async def scrape_website(
+ self,
+ url: str,
+ cleanup_element_tree: CleanupElementTreeFunc,
+ num_retry: int = 0,
+ max_retries: int = settings.MAX_SCRAPING_RETRIES,
+ scrape_exclude: ScrapeExcludeFunc | None = None,
+ take_screenshots: bool = True,
+ draw_boxes: bool = True,
+ max_screenshot_number: int = settings.MAX_NUM_SCREENSHOTS,
+ scroll: bool = True,
+ support_empty_page: bool = False,
+ wait_seconds: float = 0,
+ ) -> ScrapedPage:
+ return await scraper.scrape_website(
+ browser_state=self,
+ url=url,
+ cleanup_element_tree=cleanup_element_tree,
+ num_retry=num_retry,
+ max_retries=max_retries,
+ scrape_exclude=scrape_exclude,
+ take_screenshots=take_screenshots,
+ draw_boxes=draw_boxes,
+ max_screenshot_number=max_screenshot_number,
+ scroll=scroll,
+ support_empty_page=support_empty_page,
+ wait_seconds=wait_seconds,
+ )
+
async def close(self, close_browser_on_completion: bool = True) -> None:
LOG.info("Closing browser state")
try:
diff --git a/skyvern/webeye/scraper/scraped_page.py b/skyvern/webeye/scraper/scraped_page.py
new file mode 100644
index 00000000..ff2dda38
--- /dev/null
+++ b/skyvern/webeye/scraper/scraped_page.py
@@ -0,0 +1,295 @@
+import copy
+import json
+import typing
+from abc import ABC, abstractmethod
+from enum import StrEnum
+from typing import Any, Awaitable, Callable, Self
+
+import structlog
+from playwright.async_api import Frame, Page
+from pydantic import BaseModel, PrivateAttr
+
+from skyvern.exceptions import UnknownElementTreeFormat
+from skyvern.forge.sdk.api.crypto import calculate_sha256
+from skyvern.forge.sdk.core import skyvern_context
+
+if typing.TYPE_CHECKING:
+ from skyvern.webeye.browser_state import BrowserState
+
+LOG = structlog.get_logger()
+
+CleanupElementTreeFunc = Callable[[Page | Frame, str, list[dict]], Awaitable[list[dict]]]
+ScrapeExcludeFunc = Callable[[Page, Frame], Awaitable[bool]]
+
+ELEMENT_NODE_ATTRIBUTES = {
+ "id",
+}
+
+
+def build_attribute(key: str, value: Any) -> str:
+ if isinstance(value, bool) or isinstance(value, int):
+ return f'{key}="{str(value).lower()}"'
+
+ return f'{key}="{str(value)}"' if value else key
+
+
+def json_to_html(element: dict, need_skyvern_attrs: bool = True) -> str:
+ """
+ if element is flagged as dropped, the html format is empty
+ """
+ tag = element["tagName"]
+ attributes: dict[str, Any] = copy.deepcopy(element.get("attributes", {}))
+
+ interactable = element.get("interactable", False)
+ if element.get("isDropped", False):
+ if not interactable:
+ return ""
+ else:
+ LOG.debug("Element is interactable. Trimmed all attributes instead of dropping it", element=element)
+ attributes = {}
+
+ context = skyvern_context.ensure_context()
+
+ # FIXME: Theoretically, all href links with over 69(64+1+4) length could be hashed
+ # but currently, just hash length>150 links to confirm the solution goes well
+ if "href" in attributes and len(attributes.get("href", "")) > 150:
+ href = attributes.get("href", "")
+ # jinja style can't accept the variable name starts with number
+ # adding "_" to make sure the variable name is valid.
+ hashed_href = "_" + calculate_sha256(href)
+ context.hashed_href_map[hashed_href] = href
+ attributes["href"] = "{{" + hashed_href + "}}"
+
+ if need_skyvern_attrs:
+ # adding the node attribute to attributes
+ for attr in ELEMENT_NODE_ATTRIBUTES:
+ value = element.get(attr)
+ if value is None:
+ continue
+ attributes[attr] = value
+
+ attributes_html = " ".join(build_attribute(key, value) for key, value in attributes.items())
+
+ if element.get("isSelectable", False):
+ tag = "select"
+
+ text = element.get("text", "")
+ # build children HTML
+ children_html = "".join(
+ json_to_html(child, need_skyvern_attrs=need_skyvern_attrs) for child in element.get("children", [])
+ )
+ # build option HTML
+ option_html = "".join(
+ f''
+ if option.get("text")
+ else f''
+ for option in element.get("options", [])
+ )
+
+ if element.get("purgeable", False):
+ return children_html + option_html
+
+ before_pseudo_text = element.get("beforePseudoText") or ""
+ after_pseudo_text = element.get("afterPseudoText") or ""
+
+ # Check if the element is self-closing
+ if (
+ tag in ["img", "input", "br", "hr", "meta", "link"]
+ and not option_html
+ and not children_html
+ and not before_pseudo_text
+ and not after_pseudo_text
+ ):
+ return f"<{tag}{attributes_html if not attributes_html else ' ' + attributes_html}>"
+ else:
+ return f"<{tag}{attributes_html if not attributes_html else ' ' + attributes_html}>{before_pseudo_text}{text}{children_html + option_html}{after_pseudo_text}{tag}>"
+
+
+class ElementTreeFormat(StrEnum):
+ JSON = "json" # deprecate JSON format soon. please use HTML format
+ HTML = "html"
+
+
+class ElementTreeBuilder(ABC):
+ @abstractmethod
+ def support_economy_elements_tree(self) -> bool:
+ pass
+
+ @abstractmethod
+ def build_element_tree(
+ self, fmt: ElementTreeFormat = ElementTreeFormat.HTML, html_need_skyvern_attrs: bool = True
+ ) -> str:
+ pass
+
+ @abstractmethod
+ def build_economy_elements_tree(
+ self,
+ fmt: ElementTreeFormat = ElementTreeFormat.HTML,
+ html_need_skyvern_attrs: bool = True,
+ percent_to_keep: float = 1,
+ ) -> str:
+ pass
+
+
+class ScrapedPage(BaseModel, ElementTreeBuilder):
+ """
+ Scraped response from a webpage, including:
+ 1. List of elements
+ 2. ID to css map
+ 3. The element tree of the page (list of dicts). Each element has children and attributes.
+ 4. The screenshot (base64 encoded)
+ 5. The URL of the page
+ 6. The HTML of the page
+ 7. The extracted text from the page
+ """
+
+ elements: list[dict]
+ id_to_element_dict: dict[str, dict] = {}
+ id_to_frame_dict: dict[str, str] = {}
+ id_to_css_dict: dict[str, str] = {}
+ id_to_element_hash: dict[str, str] = {}
+ hash_to_element_ids: dict[str, list[str]] = {}
+ element_tree: list[dict]
+ element_tree_trimmed: list[dict]
+ economy_element_tree: list[dict] | None = None
+ last_used_element_tree: list[dict] | None = None
+ screenshots: list[bytes] = []
+ url: str = ""
+ html: str = ""
+ extracted_text: str | None = None
+ window_dimension: dict[str, int] | None = None
+ _browser_state: "BrowserState" = PrivateAttr()
+ _clean_up_func: CleanupElementTreeFunc = PrivateAttr()
+ _scrape_exclude: ScrapeExcludeFunc | None = PrivateAttr(default=None)
+
+ def __init__(self, **data: Any) -> None:
+ missing_attrs = [attr for attr in ["_browser_state", "_clean_up_func"] if attr not in data]
+ if len(missing_attrs) > 0:
+ raise ValueError(f"Missing required private attributes: {', '.join(missing_attrs)}")
+
+ # popup private attributes
+ browser_state = data.pop("_browser_state")
+ clean_up_func = data.pop("_clean_up_func")
+ scrape_exclude = data.pop("_scrape_exclude")
+
+ super().__init__(**data)
+
+ self._browser_state = browser_state
+ self._clean_up_func = clean_up_func
+ self._scrape_exclude = scrape_exclude
+
+ def support_economy_elements_tree(self) -> bool:
+ return True
+
+ def build_element_tree(
+ self, fmt: ElementTreeFormat = ElementTreeFormat.HTML, html_need_skyvern_attrs: bool = True
+ ) -> str:
+ self.last_used_element_tree = self.element_tree_trimmed
+ if fmt == ElementTreeFormat.JSON:
+ return json.dumps(self.element_tree_trimmed)
+
+ if fmt == ElementTreeFormat.HTML:
+ return "".join(
+ json_to_html(element, need_skyvern_attrs=html_need_skyvern_attrs)
+ for element in self.element_tree_trimmed
+ )
+
+ raise UnknownElementTreeFormat(fmt=fmt)
+
+ def build_economy_elements_tree(
+ self,
+ fmt: ElementTreeFormat = ElementTreeFormat.HTML,
+ html_need_skyvern_attrs: bool = True,
+ percent_to_keep: float = 1,
+ ) -> str:
+ """
+ Economy elements tree doesn't include secondary elements like SVG, etc
+ """
+ if not self.economy_element_tree:
+ economy_elements = []
+ copied_element_tree_trimmed = copy.deepcopy(self.element_tree_trimmed)
+
+ # Process each root element
+ for root_element in copied_element_tree_trimmed:
+ processed_element = self._process_element_for_economy_tree(root_element)
+ if processed_element:
+ economy_elements.append(processed_element)
+
+ self.economy_element_tree = economy_elements
+
+ self.last_used_element_tree = self.economy_element_tree
+
+ if fmt == ElementTreeFormat.JSON:
+ element_str = json.dumps(self.economy_element_tree)
+ return element_str[: int(len(element_str) * percent_to_keep)]
+
+ if fmt == ElementTreeFormat.HTML:
+ element_str = "".join(
+ json_to_html(element, need_skyvern_attrs=html_need_skyvern_attrs)
+ for element in self.economy_element_tree
+ )
+ return element_str[: int(len(element_str) * percent_to_keep)]
+
+ raise UnknownElementTreeFormat(fmt=fmt)
+
+ def _process_element_for_economy_tree(self, element: dict) -> dict | None:
+ """
+ Helper method to process an element for the economy tree using BFS.
+ Removes SVG elements and their children.
+ """
+ # Skip SVG elements entirely
+ if element.get("tagName", "").lower() == "svg":
+ return None
+
+ # Process children using BFS
+ if "children" in element:
+ new_children = []
+ for child in element["children"]:
+ processed_child = self._process_element_for_economy_tree(child)
+ if processed_child:
+ new_children.append(processed_child)
+ element["children"] = new_children
+ return element
+
+ async def refresh(self, draw_boxes: bool = True, scroll: bool = True, max_retries: int = 0) -> Self:
+ refreshed_page = await self._browser_state.scrape_website(
+ url=self.url,
+ cleanup_element_tree=self._clean_up_func,
+ max_retries=max_retries,
+ scrape_exclude=self._scrape_exclude,
+ draw_boxes=draw_boxes,
+ scroll=scroll,
+ )
+ self.elements = refreshed_page.elements
+ self.id_to_css_dict = refreshed_page.id_to_css_dict
+ self.id_to_element_dict = refreshed_page.id_to_element_dict
+ self.id_to_frame_dict = refreshed_page.id_to_frame_dict
+ self.id_to_element_hash = refreshed_page.id_to_element_hash
+ self.hash_to_element_ids = refreshed_page.hash_to_element_ids
+ self.element_tree = refreshed_page.element_tree
+ self.element_tree_trimmed = refreshed_page.element_tree_trimmed
+ self.screenshots = refreshed_page.screenshots or self.screenshots
+ self.html = refreshed_page.html
+ self.extracted_text = refreshed_page.extracted_text
+ self.url = refreshed_page.url
+ return self
+
+ async def generate_scraped_page(
+ self,
+ draw_boxes: bool = True,
+ scroll: bool = True,
+ take_screenshots: bool = True,
+ max_retries: int = 0,
+ ) -> Self:
+ return await self._browser_state.scrape_website(
+ url=self.url,
+ cleanup_element_tree=self._clean_up_func,
+ max_retries=max_retries,
+ scrape_exclude=self._scrape_exclude,
+ take_screenshots=take_screenshots,
+ draw_boxes=draw_boxes,
+ scroll=scroll,
+ )
+
+ async def generate_scraped_page_without_screenshots(self, max_retries: int = 0) -> Self:
+ return await self.generate_scraped_page(take_screenshots=False, max_retries=max_retries)
diff --git a/skyvern/webeye/scraper/scraper.py b/skyvern/webeye/scraper/scraper.py
index 542834b5..7eb56f93 100644
--- a/skyvern/webeye/scraper/scraper.py
+++ b/skyvern/webeye/scraper/scraper.py
@@ -1,15 +1,11 @@
import asyncio
import copy
import json
-from abc import ABC, abstractmethod
from collections import defaultdict
-from enum import StrEnum
-from typing import Any, Awaitable, Callable, Self
import structlog
from playwright._impl._errors import TimeoutError
from playwright.async_api import ElementHandle, Frame, Locator, Page
-from pydantic import BaseModel, PrivateAttr
from skyvern.config import settings
from skyvern.constants import DEFAULT_MAX_TOKENS, SKYVERN_DIR, SKYVERN_ID_ATTR
@@ -28,12 +24,17 @@ from skyvern.forge.sdk.trace import TraceManager
from skyvern.utils.image_resizer import Resolution
from skyvern.utils.token_counter import count_tokens
from skyvern.webeye.browser_state import BrowserState
+from skyvern.webeye.scraper.scraped_page import (
+ CleanupElementTreeFunc,
+ ElementTreeBuilder,
+ ElementTreeFormat,
+ ScrapedPage,
+ ScrapeExcludeFunc,
+ json_to_html,
+)
from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()
-CleanupElementTreeFunc = Callable[[Page | Frame, str, list[dict]], Awaitable[list[dict]]]
-ScrapeExcludeFunc = Callable[[Page, Frame], Awaitable[bool]]
-
RESERVED_ATTRIBUTES = {
"accept", # for input file
"alt",
@@ -75,11 +76,6 @@ BASE64_INCLUDE_ATTRIBUTES = {
}
-ELEMENT_NODE_ATTRIBUTES = {
- "id",
-}
-
-
def load_js_script() -> str:
# TODO: Handle file location better. This is a hacky way to find the file location.
path = f"{SKYVERN_DIR}/webeye/scraper/domUtils.js"
@@ -96,86 +92,6 @@ def load_js_script() -> str:
JS_FUNCTION_DEFS = load_js_script()
-# function to convert JSON element to HTML
-def build_attribute(key: str, value: Any) -> str:
- if isinstance(value, bool) or isinstance(value, int):
- return f'{key}="{str(value).lower()}"'
-
- return f'{key}="{str(value)}"' if value else key
-
-
-def json_to_html(element: dict, need_skyvern_attrs: bool = True) -> str:
- """
- if element is flagged as dropped, the html format is empty
- """
- tag = element["tagName"]
- attributes: dict[str, Any] = copy.deepcopy(element.get("attributes", {}))
-
- interactable = element.get("interactable", False)
- if element.get("isDropped", False):
- if not interactable:
- return ""
- else:
- LOG.debug("Element is interactable. Trimmed all attributes instead of dropping it", element=element)
- attributes = {}
-
- context = skyvern_context.ensure_context()
-
- # FIXME: Theoretically, all href links with over 69(64+1+4) length could be hashed
- # but currently, just hash length>150 links to confirm the solution goes well
- if "href" in attributes and len(attributes.get("href", "")) > 150:
- href = attributes.get("href", "")
- # jinja style can't accept the variable name starts with number
- # adding "_" to make sure the variable name is valid.
- hashed_href = "_" + calculate_sha256(href)
- context.hashed_href_map[hashed_href] = href
- attributes["href"] = "{{" + hashed_href + "}}"
-
- if need_skyvern_attrs:
- # adding the node attribute to attributes
- for attr in ELEMENT_NODE_ATTRIBUTES:
- value = element.get(attr)
- if value is None:
- continue
- attributes[attr] = value
-
- attributes_html = " ".join(build_attribute(key, value) for key, value in attributes.items())
-
- if element.get("isSelectable", False):
- tag = "select"
-
- text = element.get("text", "")
- # build children HTML
- children_html = "".join(
- json_to_html(child, need_skyvern_attrs=need_skyvern_attrs) for child in element.get("children", [])
- )
- # build option HTML
- option_html = "".join(
- f''
- if option.get("text")
- else f''
- for option in element.get("options", [])
- )
-
- if element.get("purgeable", False):
- return children_html + option_html
-
- before_pseudo_text = element.get("beforePseudoText") or ""
- after_pseudo_text = element.get("afterPseudoText") or ""
-
- # Check if the element is self-closing
- if (
- tag in ["img", "input", "br", "hr", "meta", "link"]
- and not option_html
- and not children_html
- and not before_pseudo_text
- and not after_pseudo_text
- ):
- return f"<{tag}{attributes_html if not attributes_html else ' ' + attributes_html}>"
- else:
- return f"<{tag}{attributes_html if not attributes_html else ' ' + attributes_html}>{before_pseudo_text}{text}{children_html + option_html}{after_pseudo_text}{tag}>"
-
-
def clean_element_before_hashing(element: dict) -> dict:
def clean_nested(element: dict) -> dict:
element_cleaned = {key: value for key, value in element.items() if key not in {"id", "rect", "frame_index"}}
@@ -220,198 +136,6 @@ def build_element_dict(
return id_to_css_dict, id_to_element_dict, id_to_frame_dict, id_to_element_hash, hash_to_element_ids
-class ElementTreeFormat(StrEnum):
- JSON = "json" # deprecate JSON format soon. please use HTML format
- HTML = "html"
-
-
-class ElementTreeBuilder(ABC):
- @abstractmethod
- def support_economy_elements_tree(self) -> bool:
- pass
-
- @abstractmethod
- def build_element_tree(
- self, fmt: ElementTreeFormat = ElementTreeFormat.HTML, html_need_skyvern_attrs: bool = True
- ) -> str:
- pass
-
- @abstractmethod
- def build_economy_elements_tree(
- self,
- fmt: ElementTreeFormat = ElementTreeFormat.HTML,
- html_need_skyvern_attrs: bool = True,
- percent_to_keep: float = 1,
- ) -> str:
- pass
-
-
-class ScrapedPage(BaseModel, ElementTreeBuilder):
- """
- Scraped response from a webpage, including:
- 1. List of elements
- 2. ID to css map
- 3. The element tree of the page (list of dicts). Each element has children and attributes.
- 4. The screenshot (base64 encoded)
- 5. The URL of the page
- 6. The HTML of the page
- 7. The extracted text from the page
- """
-
- elements: list[dict]
- id_to_element_dict: dict[str, dict] = {}
- id_to_frame_dict: dict[str, str] = {}
- id_to_css_dict: dict[str, str] = {}
- id_to_element_hash: dict[str, str] = {}
- hash_to_element_ids: dict[str, list[str]] = {}
- element_tree: list[dict]
- element_tree_trimmed: list[dict]
- economy_element_tree: list[dict] | None = None
- last_used_element_tree: list[dict] | None = None
- screenshots: list[bytes] = []
- url: str = ""
- html: str = ""
- extracted_text: str | None = None
- window_dimension: dict[str, int] | None = None
- _browser_state: BrowserState = PrivateAttr()
- _clean_up_func: CleanupElementTreeFunc = PrivateAttr()
- _scrape_exclude: ScrapeExcludeFunc | None = PrivateAttr(default=None)
-
- def __init__(self, **data: Any) -> None:
- missing_attrs = [attr for attr in ["_browser_state", "_clean_up_func"] if attr not in data]
- if len(missing_attrs) > 0:
- raise ValueError(f"Missing required private attributes: {', '.join(missing_attrs)}")
-
- # popup private attributes
- browser_state = data.pop("_browser_state")
- clean_up_func = data.pop("_clean_up_func")
- scrape_exclude = data.pop("_scrape_exclude")
-
- super().__init__(**data)
-
- self._browser_state = browser_state
- self._clean_up_func = clean_up_func
- self._scrape_exclude = scrape_exclude
-
- def support_economy_elements_tree(self) -> bool:
- return True
-
- def build_element_tree(
- self, fmt: ElementTreeFormat = ElementTreeFormat.HTML, html_need_skyvern_attrs: bool = True
- ) -> str:
- self.last_used_element_tree = self.element_tree_trimmed
- if fmt == ElementTreeFormat.JSON:
- return json.dumps(self.element_tree_trimmed)
-
- if fmt == ElementTreeFormat.HTML:
- return "".join(
- json_to_html(element, need_skyvern_attrs=html_need_skyvern_attrs)
- for element in self.element_tree_trimmed
- )
-
- raise UnknownElementTreeFormat(fmt=fmt)
-
- def build_economy_elements_tree(
- self,
- fmt: ElementTreeFormat = ElementTreeFormat.HTML,
- html_need_skyvern_attrs: bool = True,
- percent_to_keep: float = 1,
- ) -> str:
- """
- Economy elements tree doesn't include secondary elements like SVG, etc
- """
- if not self.economy_element_tree:
- economy_elements = []
- copied_element_tree_trimmed = copy.deepcopy(self.element_tree_trimmed)
-
- # Process each root element
- for root_element in copied_element_tree_trimmed:
- processed_element = self._process_element_for_economy_tree(root_element)
- if processed_element:
- economy_elements.append(processed_element)
-
- self.economy_element_tree = economy_elements
-
- self.last_used_element_tree = self.economy_element_tree
-
- if fmt == ElementTreeFormat.JSON:
- element_str = json.dumps(self.economy_element_tree)
- return element_str[: int(len(element_str) * percent_to_keep)]
-
- if fmt == ElementTreeFormat.HTML:
- element_str = "".join(
- json_to_html(element, need_skyvern_attrs=html_need_skyvern_attrs)
- for element in self.economy_element_tree
- )
- return element_str[: int(len(element_str) * percent_to_keep)]
-
- raise UnknownElementTreeFormat(fmt=fmt)
-
- def _process_element_for_economy_tree(self, element: dict) -> dict | None:
- """
- Helper method to process an element for the economy tree using BFS.
- Removes SVG elements and their children.
- """
- # Skip SVG elements entirely
- if element.get("tagName", "").lower() == "svg":
- return None
-
- # Process children using BFS
- if "children" in element:
- new_children = []
- for child in element["children"]:
- processed_child = self._process_element_for_economy_tree(child)
- if processed_child:
- new_children.append(processed_child)
- element["children"] = new_children
- return element
-
- async def refresh(self, draw_boxes: bool = True, scroll: bool = True, max_retries: int = 0) -> Self:
- refreshed_page = await scrape_website(
- browser_state=self._browser_state,
- url=self.url,
- cleanup_element_tree=self._clean_up_func,
- max_retries=max_retries,
- scrape_exclude=self._scrape_exclude,
- draw_boxes=draw_boxes,
- scroll=scroll,
- )
- self.elements = refreshed_page.elements
- self.id_to_css_dict = refreshed_page.id_to_css_dict
- self.id_to_element_dict = refreshed_page.id_to_element_dict
- self.id_to_frame_dict = refreshed_page.id_to_frame_dict
- self.id_to_element_hash = refreshed_page.id_to_element_hash
- self.hash_to_element_ids = refreshed_page.hash_to_element_ids
- self.element_tree = refreshed_page.element_tree
- self.element_tree_trimmed = refreshed_page.element_tree_trimmed
- self.screenshots = refreshed_page.screenshots or self.screenshots
- self.html = refreshed_page.html
- self.extracted_text = refreshed_page.extracted_text
- self.url = refreshed_page.url
- return self
-
- async def generate_scraped_page(
- self,
- draw_boxes: bool = True,
- scroll: bool = True,
- take_screenshots: bool = True,
- max_retries: int = 0,
- ) -> Self:
- return await scrape_website(
- browser_state=self._browser_state,
- url=self.url,
- cleanup_element_tree=self._clean_up_func,
- max_retries=max_retries,
- scrape_exclude=self._scrape_exclude,
- take_screenshots=take_screenshots,
- draw_boxes=draw_boxes,
- scroll=scroll,
- )
-
- async def generate_scraped_page_without_screenshots(self, max_retries: int = 0) -> Self:
- return await self.generate_scraped_page(take_screenshots=False, max_retries=max_retries)
-
-
@TraceManager.traced_async(ignore_input=True)
async def scrape_website(
browser_state: BrowserState,
@@ -557,6 +281,7 @@ async def scrape_web_unsafe(
:return: Tuple containing Page instance, base64 encoded screenshot, and page elements.
:note: This function does not handle exceptions. Ensure proper error handling in the calling context.
"""
+
# browser state must have the page instance, otherwise we should not do scraping
page = await browser_state.must_get_working_page()
# Take screenshots of the page with the bounding boxes. We will remove the bounding boxes later.
diff --git a/skyvern/webeye/utils/dom.py b/skyvern/webeye/utils/dom.py
index 0552ce7e..8324afd6 100644
--- a/skyvern/webeye/utils/dom.py
+++ b/skyvern/webeye/utils/dom.py
@@ -27,7 +27,8 @@ from skyvern.exceptions import (
)
from skyvern.experimentation.wait_utils import get_or_create_wait_config, get_wait_time, scroll_into_view_wait
from skyvern.webeye.actions import handler_utils
-from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, json_to_html, trim_element
+from skyvern.webeye.scraper.scraped_page import ScrapedPage, json_to_html
+from skyvern.webeye.scraper.scraper import IncrementalScrapePage, trim_element
from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()