refactor select handler (#565)

This commit is contained in:
LawyZheng
2024-07-09 02:22:16 +08:00
committed by GitHub
parent 7a75bb3741
commit bcbd738326
4 changed files with 309 additions and 113 deletions

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import asyncio
import typing
from abc import ABC, abstractmethod
from enum import StrEnum
import structlog
@@ -9,18 +9,22 @@ from playwright.async_api import Frame, FrameLocator, Locator, Page
from skyvern.constants import INPUT_TEXT_TIMEOUT, SKYVERN_ID_ATTR
from skyvern.exceptions import (
ElementIsNotComboboxDropdown,
ElementIsNotLabel,
ElementIsNotSelect2Dropdown,
FailedToGetCurrentValueOfDropdown,
MissingElement,
MissingElementDict,
MissingElementInCSSMap,
MissingElementInIframe,
MultipleDropdownAnchorErr,
MultipleElementsFound,
NoDropdownAnchorErr,
NoneFrameError,
SkyvernException,
)
from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.webeye.scraper.scraper import ScrapedPage, get_select2_options
from skyvern.webeye.scraper.scraper import ScrapedPage, get_combobox_options, get_select2_options
LOG = structlog.get_logger()
@@ -68,6 +72,9 @@ class InteractiveElement(StrEnum):
BUTTON = "button"
SELECTABLE_ELEMENT = [InteractiveElement.INPUT, InteractiveElement.SELECT]
class SkyvernOptionType(typing.TypedDict):
optionIndex: int
text: str
@@ -96,6 +103,12 @@ class SkyvernElement:
or (tag_name == "input" and "select2-input" in element_class)
)
async def is_combobox_dropdown(self) -> bool:
tag_name = self.get_tag_name()
role = await self.get_attr("role")
haspopup = await self.get_attr("aria-haspopup")
return tag_name == InteractiveElement.INPUT and role == "combobox" and haspopup == "listbox"
async def is_checkbox(self) -> bool:
tag_name = self.get_tag_name()
if tag_name != "input":
@@ -112,6 +125,16 @@ class SkyvernElement:
button_type = await self.get_attr("type")
return button_type == "radio"
def is_interactable(self) -> bool:
return self.__static_element.get("interactable", False)
async def is_selectable(self) -> bool:
return (
await self.is_select2_dropdown()
or await self.is_combobox_dropdown()
or self.get_tag_name() in SELECTABLE_ELEMENT
)
def get_tag_name(self) -> str:
return self.__static_element.get("tagName", "")
@@ -140,6 +163,12 @@ class SkyvernElement:
return Select2Dropdown(self.get_frame(), self)
async def get_combobox_dropdown(self) -> ComboboxDropdown:
if not await self.is_combobox_dropdown():
raise ElementIsNotComboboxDropdown(self.get_id(), self.__static_element)
return ComboboxDropdown(self.get_frame(), self)
def find_element_id_in_label_children(self, element_type: InteractiveElement) -> str | None:
tag_name = self.get_tag_name()
if tag_name != "label":
@@ -155,6 +184,56 @@ class SkyvernElement:
return None
async def find_label_for(
self, dom: DomUtil, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> SkyvernElement | None:
if self.get_tag_name() != "label":
return None
for_id = await self.get_attr("for")
if for_id == "":
return None
locator = self.get_frame().locator(f"#{for_id}")
# supposed to be only one element, since id is unique in the whole DOM
if await locator.count() != 1:
return None
unique_id = await locator.get_attribute(SKYVERN_ID_ATTR, timeout=timeout)
if unique_id is None:
return None
return await dom.get_skyvern_element_by_id(unique_id)
async def find_selectable_child(self, dom: DomUtil) -> SkyvernElement | None:
# BFS to find the first selectable child
index = 0
queue = [self]
while index < len(queue):
item = queue[index]
if item.is_interactable() and await item.is_selectable():
return item
try:
for_element = await item.find_label_for(dom=dom)
if for_element is not None and await for_element.is_selectable():
return for_element
except Exception:
LOG.error(
"Failed to find element by label-for",
element=item.__static_element,
exc_info=True,
)
children: list[dict] = item.__static_element.get("children", [])
for child in children:
child_id = child.get("id", "")
child_element = await dom.get_skyvern_element_by_id(child_id)
queue.append(child_element)
index += 1
return None
async def get_attr(
self,
attr_name: str,
@@ -216,28 +295,144 @@ class DomUtil:
return SkyvernElement(locator, frame_content, element)
class Select2Dropdown:
class AbstractSelectDropdown(ABC):
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
async def open(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None:
pass
@abstractmethod
async def close(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None:
pass
@abstractmethod
async def get_current_value(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> str:
pass
@abstractmethod
async def get_options(
self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> typing.List[SkyvernOptionType]:
pass
@abstractmethod
async def select_by_index(
self, index: int, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> None:
pass
class Select2Dropdown(AbstractSelectDropdown):
def __init__(self, frame: Page | Frame, skyvern_element: SkyvernElement) -> None:
self.skyvern_element = skyvern_element
self.frame = frame
async def __find_anchor(self, timeout: float) -> Locator:
locator = self.frame.locator("#select2-drop")
await locator.wait_for(state="visible", timeout=timeout)
cnt = await locator.count()
if cnt == 0:
raise NoDropdownAnchorErr(self.name(), self.skyvern_element.get_id())
if cnt > 1:
raise MultipleDropdownAnchorErr(self.name(), self.skyvern_element.get_id())
return locator
def name(self) -> str:
return "select2"
async def open(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.skyvern_element.get_locator().click(timeout=timeout)
# wait for the options to load
await asyncio.sleep(3)
await self.__find_anchor(timeout=timeout)
async def close(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.frame.locator("#select2-drop").press("Escape", timeout=timeout)
anchor = await self.__find_anchor(timeout=timeout)
await anchor.press("Escape", timeout=timeout)
async def get_current_value(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> str:
tag_name = self.skyvern_element.get_tag_name()
if tag_name == "input":
# TODO: this is multiple options case, we haven't fully supported it yet.
return ""
# check SkyvernElement.is_select2_dropdown() method, only <a> and <span> element left
# we should make sure the locator is on <a>, so we're able to find the [class="select2-chosen"] child
locator = self.skyvern_element.get_locator()
if tag_name == "span":
locator = locator.locator("..")
elif tag_name == "a":
pass
else:
raise FailedToGetCurrentValueOfDropdown(
self.name(), self.skyvern_element.get_id(), "invalid element of select2"
)
try:
return await locator.locator("span[class='select2-chosen']").text_content(timeout=timeout)
except Exception as e:
raise FailedToGetCurrentValueOfDropdown(self.name(), self.skyvern_element.get_id(), repr(e))
async def get_options(
self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> typing.List[SkyvernOptionType]:
element_handler = await self.skyvern_element.get_locator().element_handle(timeout=timeout)
anchor = await self.__find_anchor(timeout=timeout)
element_handler = await anchor.element_handle(timeout=timeout)
options = await get_select2_options(self.frame, element_handler)
return typing.cast(typing.List[SkyvernOptionType], options)
async def select_by_index(
self, index: int, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> None:
anchor = self.frame.locator("#select2-drop li[role='option']")
await anchor.nth(index).click(timeout=timeout)
anchor = await self.__find_anchor(timeout=timeout)
options = anchor.locator("li[role='option']")
await options.nth(index).click(timeout=timeout)
class ComboboxDropdown(AbstractSelectDropdown):
def __init__(self, frame: Page | Frame, skyvern_element: SkyvernElement) -> None:
self.skyvern_element = skyvern_element
self.frame = frame
async def __find_anchor(self, timeout: float) -> Locator:
control_id = await self.skyvern_element.get_attr("aria-controls", timeout=timeout)
locator = self.frame.locator(f"#{control_id}")
await locator.wait_for(state="visible", timeout=timeout)
cnt = await locator.count()
if cnt == 0:
raise NoDropdownAnchorErr(self.name(), self.skyvern_element.get_id())
if cnt > 1:
raise MultipleDropdownAnchorErr(self.name(), self.skyvern_element.get_id())
return locator
def name(self) -> str:
return "combobox"
async def open(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.skyvern_element.get_locator().click(timeout=timeout)
await self.__find_anchor(timeout=timeout)
async def close(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.skyvern_element.get_locator().press("Tab", timeout=timeout)
async def get_current_value(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> str:
try:
return await self.skyvern_element.get_attr("value", dynamic=True, timeout=timeout)
except Exception as e:
raise FailedToGetCurrentValueOfDropdown(self.name(), self.skyvern_element.get_id(), repr(e))
async def get_options(
self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> typing.List[SkyvernOptionType]:
anchor = await self.__find_anchor(timeout=timeout)
element_handler = await anchor.element_handle()
options = await get_combobox_options(self.frame, element_handler)
return typing.cast(typing.List[SkyvernOptionType], options)
async def select_by_index(
self, index: int, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
) -> None:
anchor = await self.__find_anchor(timeout=timeout)
options = anchor.locator("li[role='option']")
await options.nth(index).click(timeout=timeout)