Files
Dorod-Sky/skyvern/webeye/utils/dom.py

773 lines
28 KiB
Python
Raw Normal View History

2024-07-06 13:32:55 +08:00
from __future__ import annotations
2024-08-06 13:30:52 +08:00
import asyncio
2024-09-09 11:34:09 +08:00
import copy
2024-06-13 15:34:21 +08:00
import typing
from enum import StrEnum
2024-08-06 13:30:52 +08:00
from random import uniform
2024-06-13 15:34:21 +08:00
import structlog
2024-08-28 14:51:05 +08:00
from playwright.async_api import ElementHandle, Frame, FrameLocator, Locator, Page, TimeoutError
2024-06-13 15:34:21 +08:00
from skyvern.config import settings
2024-07-10 00:04:03 +08:00
from skyvern.constants import SKYVERN_ID_ATTR
2024-06-13 15:34:21 +08:00
from skyvern.exceptions import (
ElementIsNotLabel,
InteractWithDisabledElement,
2024-06-13 15:34:21 +08:00
MissingElement,
MissingElementDict,
MissingElementInCSSMap,
2024-06-13 15:34:21 +08:00
MissingElementInIframe,
MultipleElementsFound,
2024-08-06 13:30:52 +08:00
NoElementBoudingBox,
2024-07-06 13:32:55 +08:00
NoneFrameError,
2024-06-14 14:58:44 +08:00
SkyvernException,
2024-06-13 15:34:21 +08:00
)
2024-09-09 11:34:09 +08:00
from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, json_to_html, trim_element
2024-10-31 00:12:13 +08:00
from skyvern.webeye.utils.page import SkyvernFrame
2024-06-13 15:34:21 +08:00
LOG = structlog.get_logger()
2024-07-10 00:04:03 +08:00
TEXT_INPUT_DELAY = 10 # 10ms between each character input
2024-08-30 11:11:27 +08:00
TEXT_PRESS_MAX_LENGTH = 20
2024-07-10 00:04:03 +08:00
2024-06-13 15:34:21 +08:00
2024-07-06 13:32:55 +08:00
async def resolve_locator(
scrape_page: ScrapedPage, page: Page, frame: str, css: str
) -> typing.Tuple[Locator, Page | Frame]:
2024-06-14 14:58:44 +08:00
iframe_path: list[str] = []
while frame != "main.frame":
iframe_path.append(frame)
frame_element = scrape_page.id_to_element_dict.get(frame)
if frame_element is None:
raise MissingElement(element_id=frame)
parent_frame = frame_element.get("frame")
if not parent_frame:
raise SkyvernException(f"element without frame: {frame_element}")
frame = parent_frame
current_page: Page | FrameLocator = page
2024-07-06 13:32:55 +08:00
current_frame: Page | Frame = page
2024-06-14 14:58:44 +08:00
while len(iframe_path) > 0:
child_frame = iframe_path.pop()
2024-07-06 13:32:55 +08:00
frame_handler = await current_frame.query_selector(f"[{SKYVERN_ID_ATTR}='{child_frame}']")
2024-07-30 11:35:14 +08:00
if frame_handler is None:
raise NoneFrameError(frame_id=child_frame)
2024-07-06 13:32:55 +08:00
content_frame = await frame_handler.content_frame()
if content_frame is None:
raise NoneFrameError(frame_id=child_frame)
current_frame = content_frame
2024-06-14 14:58:44 +08:00
current_page = current_page.frame_locator(f"[{SKYVERN_ID_ATTR}='{child_frame}']")
2024-07-06 13:32:55 +08:00
return current_page.locator(css), current_frame
2024-06-14 14:58:44 +08:00
2024-06-13 15:34:21 +08:00
class InteractiveElement(StrEnum):
2024-06-18 11:34:52 +08:00
A = "a"
2024-06-13 15:34:21 +08:00
INPUT = "input"
SELECT = "select"
BUTTON = "button"
2024-07-09 02:22:16 +08:00
SELECTABLE_ELEMENT = [InteractiveElement.INPUT, InteractiveElement.SELECT]
RAW_INPUT_TYPE_VALUE = ["number", "url", "tel", "email", "username", "password"]
RAW_INPUT_NAME_VALUE = ["name", "email", "username", "password", "phone"]
2024-07-09 02:22:16 +08:00
2024-06-18 11:34:52 +08:00
class SkyvernOptionType(typing.TypedDict):
optionIndex: int
text: str
2024-06-13 15:34:21 +08:00
class SkyvernElement:
"""
SkyvernElement is a python interface to interact with js elements built during the scarping.
When you try to interact with these elements by python, you are supposed to use this class as an interface.
"""
2024-08-06 13:30:52 +08:00
@classmethod
async def create_from_incremental(cls, incre_page: IncrementalScrapePage, element_id: str) -> SkyvernElement:
element_dict = incre_page.id_to_element_dict.get(element_id)
if element_dict is None:
raise MissingElementDict(element_id)
css_selector = incre_page.id_to_css_dict.get(element_id)
if not css_selector:
raise MissingElementInCSSMap(element_id)
frame = incre_page.skyvern_frame.get_frame()
locator = frame.locator(css_selector)
num_elements = await locator.count()
if num_elements < 1:
LOG.debug("No elements found with css. Validation failed.", css=css_selector, element_id=element_id)
2024-08-06 13:30:52 +08:00
raise MissingElement(selector=css_selector, element_id=element_id)
elif num_elements > 1:
LOG.debug(
2024-08-06 13:30:52 +08:00
"Multiple elements found with css. Expected 1. Validation failed.",
num_elements=num_elements,
selector=css_selector,
element_id=element_id,
)
raise MultipleElementsFound(num=num_elements, selector=css_selector, element_id=element_id)
return cls(locator, frame, element_dict)
def __init__(self, locator: Locator, frame: Page | Frame, static_element: dict, hash_value: str = "") -> None:
2024-06-13 15:34:21 +08:00
self.__static_element = static_element
2024-07-06 13:32:55 +08:00
self.__frame = frame
2024-06-13 15:34:21 +08:00
self.locator = locator
self.hash_value = hash_value
self._id_cache = static_element.get("id", "")
self._tag_name = static_element.get("tagName", "")
self._selectable = static_element.get("isSelectable", False)
self._frame_id = static_element.get("frame", "")
self._attributes = static_element.get("attributes", {})
2024-06-13 15:34:21 +08:00
2024-11-14 02:33:44 +08:00
def __repr__(self) -> str:
return f"SkyvernElement({str(self.__static_element)})"
2024-09-09 11:34:09 +08:00
def build_HTML(self, need_trim_element: bool = True, need_skyvern_attrs: bool = True) -> str:
element_dict = self.get_element_dict()
if need_trim_element:
element_dict = trim_element(copy.deepcopy(element_dict))
return json_to_html(element_dict, need_skyvern_attrs)
2024-08-21 10:54:32 +08:00
async def is_auto_completion_input(self) -> bool:
tag_name = self.get_tag_name()
if tag_name != InteractiveElement.INPUT:
return False
autocomplete = await self.get_attr("aria-autocomplete")
2024-09-03 02:21:07 +08:00
if autocomplete and autocomplete == "list":
2024-08-21 10:54:32 +08:00
return True
2024-11-12 21:18:41 +08:00
class_name: str | None = await self.get_attr("class")
if class_name and "autocomplete-input" in class_name:
return True
2024-08-21 10:54:32 +08:00
return False
2024-10-16 19:23:12 +08:00
async def is_custom_option(self) -> bool:
return self.get_tag_name() == "li" or await self.get_attr("role") == "option"
2024-06-25 10:26:14 +08:00
async def is_checkbox(self) -> bool:
tag_name = self.get_tag_name()
if tag_name != "input":
return False
button_type = await self.get_attr("type")
return button_type == "checkbox"
async def is_radio(self) -> bool:
tag_name = self.get_tag_name()
if tag_name != "input":
return False
button_type = await self.get_attr("type")
return button_type == "radio"
2024-10-28 23:52:26 +08:00
async def is_btn_input(self) -> bool:
tag_name = self.get_tag_name()
if tag_name != InteractiveElement.INPUT:
return False
input_type = await self.get_attr("type")
return input_type == "button"
async def is_raw_input(self) -> bool:
if self.get_tag_name() != InteractiveElement.INPUT:
return False
if await self.is_spinbtn_input():
return True
input_type = str(await self.get_attr("type"))
if input_type.lower() in RAW_INPUT_TYPE_VALUE:
return True
name = str(await self.get_attr("name"))
if name.lower() in RAW_INPUT_NAME_VALUE:
return True
2024-10-10 09:22:20 +08:00
# if input has these attrs, it expects user to type and input sth
if await self.get_attr("min") or await self.get_attr("max") or await self.get_attr("step"):
return True
return False
2024-09-11 11:53:47 +08:00
async def is_spinbtn_input(self) -> bool:
"""
confirm the element is:
1. <input> element
2. role=spinbutton
Usage of <input role="spinbutton">, https://developer.mozilla.org/en-US/docs/Web/Accessibility/ARIA/Roles/spinbutton_role
"""
if self.get_tag_name() != InteractiveElement.INPUT:
return False
if await self.get_attr("role") == "spinbutton":
return True
return False
async def is_file_input(self) -> bool:
return self.get_tag_name() == InteractiveElement.INPUT and await self.get_attr("type") == "file"
2024-07-09 02:22:16 +08:00
def is_interactable(self) -> bool:
return self.__static_element.get("interactable", False)
2024-10-25 14:52:02 +08:00
async def is_disabled(self, dynamic: bool = False) -> bool:
# if attr not exist, return None
# if attr is like 'disabled', return empty string or True
# if attr is like `disabled=false`, return the value
disabled = False
aria_disabled = False
disabled_attr: bool | str | None = None
aria_disabled_attr: bool | str | None = None
2024-10-31 00:12:13 +08:00
style_disabled: bool = False
2024-10-25 14:52:02 +08:00
try:
disabled_attr = await self.get_attr("disabled", dynamic=dynamic)
aria_disabled_attr = await self.get_attr("aria-disabled", dynamic=dynamic)
2024-10-31 00:12:13 +08:00
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
style_disabled = await skyvern_frame.get_disabled_from_style(await self.get_element_handler())
2024-10-25 14:52:02 +08:00
except Exception:
# FIXME: maybe it should be considered as "disabled" element if failed to get the attributes?
LOG.exception(
"Failed to get the disabled attribute",
element=self.__static_element,
element_id=self.get_id(),
)
if disabled_attr is not None:
# disabled_attr should be bool or str
if isinstance(disabled_attr, bool):
disabled = disabled_attr
if isinstance(disabled_attr, str):
disabled = disabled_attr.lower() != "false"
if aria_disabled_attr is not None:
# aria_disabled_attr should be bool or str
if isinstance(aria_disabled_attr, bool):
aria_disabled = aria_disabled_attr
if isinstance(aria_disabled_attr, str):
aria_disabled = aria_disabled_attr.lower() != "false"
2024-10-31 00:12:13 +08:00
return disabled or aria_disabled or style_disabled
2024-10-25 14:52:02 +08:00
2024-07-09 02:22:16 +08:00
async def is_selectable(self) -> bool:
2024-08-06 13:30:52 +08:00
return self.get_selectable() or self.get_tag_name() in SELECTABLE_ELEMENT
2025-03-11 12:33:09 -07:00
async def is_visible(self, must_visible_style: bool = True) -> bool:
2025-01-14 13:08:35 +08:00
if not await self.get_locator().count():
return False
2025-03-11 12:33:09 -07:00
if not must_visible_style:
return True
2024-11-14 02:33:44 +08:00
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
return await skyvern_frame.get_element_visible(await self.get_element_handler())
async def is_parent_of(self, target: ElementHandle) -> bool:
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
return await skyvern_frame.is_parent(await self.get_element_handler(), target)
async def is_child_of(self, target: ElementHandle) -> bool:
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
return await skyvern_frame.is_parent(target, await self.get_element_handler())
async def is_sibling_of(self, target: ElementHandle) -> bool:
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
return await skyvern_frame.is_sibling(await self.get_element_handler(), target)
2024-08-21 10:54:32 +08:00
def get_element_dict(self) -> dict:
return self.__static_element
2024-08-06 13:30:52 +08:00
def get_selectable(self) -> bool:
return self._selectable
2024-07-09 02:22:16 +08:00
2024-06-13 15:34:21 +08:00
def get_tag_name(self) -> str:
return self._tag_name
2024-06-13 15:34:21 +08:00
2024-07-06 13:32:55 +08:00
def get_id(self) -> str:
return self._id_cache
2024-07-06 13:32:55 +08:00
2024-08-21 10:54:32 +08:00
def get_frame_id(self) -> str:
return self._frame_id
2024-08-21 10:54:32 +08:00
2024-07-06 13:32:55 +08:00
def get_attributes(self) -> typing.Dict:
return self._attributes
2024-06-13 15:34:21 +08:00
2024-06-25 01:46:54 +08:00
def get_options(self) -> typing.List[SkyvernOptionType]:
options = self.__static_element.get("options", None)
if options is None:
return []
return typing.cast(typing.List[SkyvernOptionType], options)
2024-07-06 13:32:55 +08:00
def get_frame(self) -> Page | Frame:
return self.__frame
2025-02-18 08:58:23 +08:00
def get_frame_index(self) -> int:
return self.__static_element.get("frame_index", -1)
2024-07-06 13:32:55 +08:00
def get_locator(self) -> Locator:
return self.locator
async def get_element_handler(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> ElementHandle:
2024-08-28 14:51:05 +08:00
handler = await self.locator.element_handle(timeout=timeout)
assert handler is not None
return handler
2025-01-08 14:27:50 +08:00
async def find_blocking_element(
self, dom: DomUtil, incremental_page: IncrementalScrapePage | None = None
) -> tuple[SkyvernElement | None, bool]:
2024-11-14 02:33:44 +08:00
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
2024-11-27 22:44:05 +08:00
blocking_element_id, blocked = await skyvern_frame.get_blocking_element_id(await self.get_element_handler())
2024-11-14 02:33:44 +08:00
if not blocking_element_id:
2024-11-27 22:44:05 +08:00
return None, blocked
2025-01-08 14:27:50 +08:00
if await dom.check_id_in_dom(blocking_element_id):
2025-01-08 14:27:50 +08:00
return await dom.get_skyvern_element_by_id(blocking_element_id), blocked
if incremental_page and incremental_page.check_id_in_page(blocking_element_id):
return await SkyvernElement.create_from_incremental(incremental_page, blocking_element_id), blocked
return None, blocked
2024-11-14 02:33:44 +08:00
2024-11-14 21:33:19 +08:00
async def find_element_in_label_children(
self, dom: DomUtil, element_type: InteractiveElement
) -> SkyvernElement | None:
element_id = self.find_element_id_in_label_children(element_type=element_type)
if not element_id:
return None
return await dom.get_skyvern_element_by_id(element_id=element_id)
2024-06-13 15:34:21 +08:00
def find_element_id_in_label_children(self, element_type: InteractiveElement) -> str | None:
tag_name = self.get_tag_name()
if tag_name != "label":
raise ElementIsNotLabel(tag_name)
children: list[dict] = self.__static_element.get("children", [])
for child in children:
if not child.get("interactable"):
continue
if child.get("tagName") == element_type:
return child.get("id")
return None
2024-09-07 09:34:33 +08:00
async def find_children_element_id_by_callback(
self, cb: typing.Callable[[dict], typing.Awaitable[bool]]
) -> str | None:
index = 0
queue = [self.get_element_dict()]
while index < len(queue):
item = queue[index]
if await cb(item):
return item.get("id", "")
children: list[dict] = item.get("children", [])
for child in children:
queue.append(child)
index += 1
return None
2024-07-09 02:22:16 +08:00
async def find_label_for(
self, dom: DomUtil, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS
2024-07-09 02:22:16 +08:00
) -> SkyvernElement | None:
if self.get_tag_name() != "label":
return None
for_id = await self.get_attr("for")
if for_id == "":
return None
2024-07-10 14:55:59 +08:00
locator = self.get_frame().locator(f"[id='{for_id}']")
2024-07-09 02:22:16 +08:00
# supposed to be only one element, since id is unique in the whole DOM
if await locator.count() != 1:
return None
unique_id = await locator.get_attribute(SKYVERN_ID_ATTR, timeout=timeout)
if unique_id is None:
return None
return await dom.get_skyvern_element_by_id(unique_id)
async def find_bound_label_by_attr_id(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> Locator | None:
2024-11-14 02:33:44 +08:00
if self.get_tag_name() == "label":
return None
element_id: str = await self.get_attr("id", timeout=timeout)
if not element_id:
return None
locator = self.get_frame().locator(f"label[for='{element_id}']")
cnt = await locator.count()
if cnt == 1:
return locator
return None
2024-11-14 21:33:19 +08:00
async def find_bound_label_by_direct_parent(
self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS
2024-11-14 21:33:19 +08:00
) -> Locator | None:
if self.get_tag_name() == "label":
return None
parent_locator = self.get_locator().locator("..")
cnt = await parent_locator.count()
if cnt != 1:
return None
timeout_sec = timeout / 1000
async with asyncio.timeout(timeout_sec):
tag_name: str | None = await parent_locator.evaluate("el => el.tagName")
if not tag_name:
return None
if tag_name.lower() != "label":
return None
return parent_locator
2024-07-09 02:22:16 +08:00
async def find_selectable_child(self, dom: DomUtil) -> SkyvernElement | None:
# BFS to find the first selectable child
index = 0
queue = [self]
while index < len(queue):
item = queue[index]
if item.is_interactable() and await item.is_selectable():
return item
try:
for_element = await item.find_label_for(dom=dom)
if for_element is not None and await for_element.is_selectable():
return for_element
except Exception:
LOG.error(
"Failed to find element by label-for",
element=item.__static_element,
exc_info=True,
)
children: list[dict] = item.__static_element.get("children", [])
for child in children:
child_id = child.get("id", "")
child_element = await dom.get_skyvern_element_by_id(child_id)
queue.append(child_element)
index += 1
return None
async def find_interactable_anchor_child(
self, dom: DomUtil, element_type: InteractiveElement
) -> SkyvernElement | None:
index = 0
queue = [self]
while index < len(queue):
item = queue[index]
if item.is_interactable() and item.get_tag_name() == element_type:
return item
try:
for_element = await item.find_label_for(dom=dom)
if for_element is not None and for_element.get_tag_name() == element_type:
return for_element
except Exception:
LOG.error(
"Failed to find element by label-for",
element=item.__static_element,
exc_info=True,
)
children: list[dict] = item.__static_element.get("children", [])
for child in children:
child_id = child.get("id", "")
child_element = await dom.get_skyvern_element_by_id(child_id)
queue.append(child_element)
index += 1
return None
2024-06-13 15:34:21 +08:00
async def get_attr(
self,
attr_name: str,
dynamic: bool = False,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
2024-06-13 15:34:21 +08:00
) -> typing.Any:
if not dynamic:
2024-10-25 14:52:02 +08:00
attr = self.get_attributes().get(attr_name)
if attr is not None:
2024-06-13 15:34:21 +08:00
return attr
return await self.locator.get_attribute(attr_name, timeout=timeout)
async def focus(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-08-28 14:51:05 +08:00
await self.get_locator().focus(timeout=timeout)
async def input_sequentially(self, text: str, default_timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-07-10 00:04:03 +08:00
length = len(text)
if length > TEXT_PRESS_MAX_LENGTH:
# if the text is longer than TEXT_PRESS_MAX_LENGTH characters, we will locator.fill in initial texts until the last TEXT_PRESS_MAX_LENGTH characters
# and then type the last TEXT_PRESS_MAX_LENGTH characters with locator.press_sequentially
2024-08-21 10:54:32 +08:00
await self.input_fill(text[: length - TEXT_PRESS_MAX_LENGTH])
2024-07-10 00:04:03 +08:00
text = text[length - TEXT_PRESS_MAX_LENGTH :]
2024-08-21 10:54:32 +08:00
await self.press_fill(text, timeout=default_timeout)
async def press_key(self, key: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-08-28 14:51:05 +08:00
await self.get_locator().press(key=key, timeout=timeout)
async def press_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-08-21 10:54:32 +08:00
await self.get_locator().press_sequentially(text, delay=TEXT_INPUT_DELAY, timeout=timeout)
2024-07-10 00:04:03 +08:00
async def input_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-07-10 00:04:03 +08:00
await self.get_locator().fill(text, timeout=timeout)
async def input_clear(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-07-10 00:04:03 +08:00
await self.get_locator().clear(timeout=timeout)
2024-06-13 15:34:21 +08:00
async def move_mouse_to_safe(
self,
page: Page,
task_id: str | None = None,
step_id: str | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> tuple[float, float] | tuple[None, None]:
element_id = self.get_id()
try:
return await self.move_mouse_to(page, timeout=timeout)
except NoElementBoudingBox:
LOG.warning(
"Failed to move mouse to the element - NoElementBoudingBox",
task_id=task_id,
step_id=step_id,
element_id=element_id,
exc_info=True,
)
except Exception:
LOG.warning(
"Failed to move mouse to the element - unexpectd exception",
task_id=task_id,
step_id=step_id,
element_id=element_id,
exc_info=True,
)
return None, None
2024-08-06 13:30:52 +08:00
async def move_mouse_to(
self, page: Page, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS
2024-08-06 13:30:52 +08:00
) -> tuple[float, float]:
bounding_box = await self.get_locator().bounding_box(timeout=timeout)
if not bounding_box:
raise NoElementBoudingBox(element_id=self.get_id())
x, y, width, height = bounding_box["x"], bounding_box["y"], bounding_box["width"], bounding_box["height"]
# calculate the click point, use open interval to avoid clicking on the border
epsilon = 0.01
dest_x = uniform(x + epsilon, x + width - epsilon) if width > 2 * epsilon else (x + width) / 2
dest_y = uniform(y + epsilon, y + height - epsilon) if height > 2 * epsilon else (y + height) / 2
await page.mouse.move(dest_x, dest_y)
return dest_x, dest_y
async def click(
self,
page: Page,
dom: DomUtil | None = None,
incremental_page: IncrementalScrapePage | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> None:
if await self.is_disabled(dynamic=True):
raise InteractWithDisabledElement(element_id=self.get_id())
try:
await self.get_locator().click(timeout=timeout)
return
except Exception:
LOG.info("Failed to click by playwright", exc_info=True, element_id=self.get_id())
if dom is not None:
# try to click on the blocking element
try:
await self.scroll_into_view(timeout=timeout)
blocking_element, _ = await self.find_blocking_element(dom=dom, incremental_page=incremental_page)
if blocking_element:
LOG.debug("Find the blocking element", element_id=blocking_element.get_id())
await blocking_element.get_locator().click(timeout=timeout)
return
except Exception:
LOG.info("Failed to click on the blocking element", exc_info=True, element_id=self.get_id())
try:
await self.scroll_into_view(timeout=timeout)
await self.coordinate_click(page=page, timeout=timeout)
return
except Exception:
LOG.info("Failed to click by coordinate", exc_info=True, element_id=self.get_id())
await self.scroll_into_view(timeout=timeout)
await self.click_in_javascript()
return
2024-11-27 22:44:05 +08:00
async def click_in_javascript(self) -> None:
skyvern_frame = await SkyvernFrame.create_instance(self.get_frame())
await skyvern_frame.click_element_in_javascript(await self.get_element_handler())
async def coordinate_click(self, page: Page, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
2024-08-06 13:30:52 +08:00
click_x, click_y = await self.move_mouse_to(page=page, timeout=timeout)
await page.mouse.click(click_x, click_y)
2024-08-28 14:51:05 +08:00
async def blur(self) -> None:
if not await self.is_visible():
return
await SkyvernFrame.evaluate(
frame=self.get_frame(), expression="(element) => element.blur()", arg=await self.get_element_handler()
)
2024-08-28 14:51:05 +08:00
async def scroll_into_view(self, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
if not await self.is_visible():
return
2024-08-07 15:38:04 +08:00
try:
element_handler = await self.get_element_handler(timeout=timeout)
2024-08-07 15:38:04 +08:00
await element_handler.scroll_into_view_if_needed(timeout=timeout)
except TimeoutError:
LOG.info(
"Timeout to execute scrolling into view, try to re-focus to locate the element",
element_id=self.get_id(),
)
2024-08-28 14:51:05 +08:00
await self.blur()
await self.focus(timeout=timeout)
2024-08-06 13:30:52 +08:00
await asyncio.sleep(2) # wait for scrolling into the target
2025-01-08 14:27:50 +08:00
async def calculate_min_y_distance_to(
self,
target_locator: Locator,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> float:
self_rect = await self.get_locator().bounding_box(timeout=timeout)
if self_rect is None:
raise Exception("Can't Skyvern element rect")
target_rect = await target_locator.bounding_box(timeout=timeout)
if self_rect is None or target_rect is None:
raise Exception("Can't get the target element rect")
y_1 = self_rect["y"] + self_rect["height"] - target_rect["y"]
y_2 = self_rect["y"] - (target_rect["y"] + target_rect["height"])
# if y1 * y2 <= 0, it means the two elements are overlapping
if y_1 * y_2 <= 0:
return 0
return min(
abs(y_1),
abs(y_2),
)
async def calculate_min_x_distance_to(
self,
target_locator: Locator,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> float:
self_rect = await self.get_locator().bounding_box(timeout=timeout)
if self_rect is None:
raise Exception("Can't Skyvern element rect")
target_rect = await target_locator.bounding_box(timeout=timeout)
if self_rect is None or target_rect is None:
raise Exception("Can't get the target element rect")
2025-01-08 14:27:50 +08:00
x_1 = self_rect["x"] + self_rect["width"] - target_rect["x"]
x_2 = self_rect["x"] - (target_rect["x"] + target_rect["width"])
# if x1 * x2 <= 0, it means the two elements are overlapping
if x_1 * x_2 <= 0:
return 0
return min(
abs(x_1),
abs(x_2),
)
async def is_next_to_element(
self,
target_locator: Locator,
max_x_distance: float = 0,
max_y_distance: float = 0,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> bool:
if max_x_distance > 0 and await self.calculate_min_x_distance_to(target_locator, timeout) > max_x_distance:
return False
if max_y_distance > 0 and await self.calculate_min_y_distance_to(target_locator, timeout) > max_y_distance:
return False
return True
2024-06-13 15:34:21 +08:00
class DomUtil:
"""
DomUtil is a python interface to interact with the DOM.
The ultimate goal here is to provide a full python-js interaction.
Some functions like wait_for_xxx should be supposed to define here.
"""
def __init__(self, scraped_page: ScrapedPage, page: Page) -> None:
self.scraped_page = scraped_page
self.page = page
2025-01-09 16:14:31 +08:00
async def check_id_in_dom(self, element_id: str) -> bool:
2024-08-21 10:54:32 +08:00
css_selector = self.scraped_page.id_to_css_dict.get(element_id, "")
if css_selector:
return True
return False
2024-06-13 15:34:21 +08:00
async def get_skyvern_element_by_id(self, element_id: str) -> SkyvernElement:
element = self.scraped_page.id_to_element_dict.get(element_id)
if not element:
raise MissingElementDict(element_id)
frame = self.scraped_page.id_to_frame_dict.get(element_id)
if not frame:
raise MissingElementInIframe(element_id)
css = self.scraped_page.id_to_css_dict.get(element_id)
if not css:
raise MissingElementInCSSMap(element_id)
2024-06-13 15:34:21 +08:00
2024-07-06 13:32:55 +08:00
locator, frame_content = await resolve_locator(self.scraped_page, self.page, frame, css)
2024-06-13 15:34:21 +08:00
num_elements = await locator.count()
if num_elements < 1:
LOG.warning("No elements found with css. Validation failed.", css=css, element_id=element_id)
raise MissingElement(selector=css, element_id=element_id)
2024-06-13 15:34:21 +08:00
elif num_elements > 1:
LOG.warning(
"Multiple elements found with css. Expected 1. Validation failed.",
2024-06-13 15:34:21 +08:00
num_elements=num_elements,
selector=css,
element_id=element_id,
2024-06-13 15:34:21 +08:00
)
raise MultipleElementsFound(num=num_elements, selector=css, element_id=element_id)
2024-06-13 15:34:21 +08:00
hash_value = self.scraped_page.id_to_element_hash.get(element_id, "")
return SkyvernElement(locator, frame_content, element, hash_value)