From bcbd73832664fe2f24b9269c3cab57ffbbd14c36 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Tue, 9 Jul 2024 02:22:16 +0800 Subject: [PATCH] refactor select handler (#565) --- skyvern/exceptions.py | 34 +++++ skyvern/webeye/actions/handler.py | 169 +++++++++--------------- skyvern/webeye/scraper/scraper.py | 6 + skyvern/webeye/utils/dom.py | 213 ++++++++++++++++++++++++++++-- 4 files changed, 309 insertions(+), 113 deletions(-) diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 671cfa71..fb69672f 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -313,6 +313,11 @@ class ElementIsNotSelect2Dropdown(SkyvernException): super().__init__(f"element[{element}] is not select2 dropdown. element_id={element_id}") +class ElementIsNotComboboxDropdown(SkyvernException): + def __init__(self, element_id: str, element: dict): + super().__init__(f"element[{element}] is not combobox dropdown. element_id={element_id}") + + class NoneFrameError(SkyvernException): def __init__(self, frame_id: str): super().__init__(f"frame content is none. frame_id={frame_id}") @@ -382,3 +387,32 @@ class TaskAlreadyCanceled(SkyvernHTTPException): class InvalidTaskStatusTransition(SkyvernHTTPException): def __init__(self, old_status: str, new_status: str, task_id: str): super().__init__(f"Invalid task status transition from {old_status} to {new_status} for {task_id}") + + +class ErrFoundSelectableElement(SkyvernException): + def __init__(self, element_id: str, err: Exception): + super().__init__( + f"error when selecting elements in the children list. element_id={element_id}, error={repr(err)}" + ) + + +class NoSelectableElementFound(SkyvernException): + def __init__(self, element_id: str): + super().__init__(f"No selectable elements found in the children list. element_id={element_id}") + + +class NoDropdownAnchorErr(SkyvernException): + def __init__(self, dropdowm_type: str, element_id: str): + super().__init__(f"No {dropdowm_type} dropdown found. element_id={element_id}") + + +class MultipleDropdownAnchorErr(SkyvernException): + def __init__(self, dropdowm_type: str, element_id: str): + super().__init__(f"Multiple {dropdowm_type} dropdown found. element_id={element_id}") + + +class FailedToGetCurrentValueOfDropdown(SkyvernException): + def __init__(self, dropdowm_type: str, element_id: str, fail_reason: str): + super().__init__( + f"Failed to get current value of {dropdowm_type} dropdown. element_id={element_id}, failure_reason={fail_reason}" + ) diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index 80f74490..8b948aea 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -11,6 +11,7 @@ from playwright.async_api import Locator, Page, TimeoutError from skyvern.constants import INPUT_TEXT_TIMEOUT, REPO_ROOT_DIR from skyvern.exceptions import ( EmptySelect, + ErrFoundSelectableElement, FailToClick, FailToSelectByIndex, FailToSelectByLabel, @@ -21,6 +22,7 @@ from skyvern.exceptions import ( MissingElement, MissingFileUrl, MultipleElementsFound, + NoSelectableElementFound, OptionIndexOutOfBound, ) from skyvern.forge import app @@ -48,7 +50,7 @@ from skyvern.webeye.actions.actions import ( from skyvern.webeye.actions.responses import ActionFailure, ActionResult, ActionSuccess from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.scraper.scraper import ScrapedPage -from skyvern.webeye.utils.dom import DomUtil, InteractiveElement, SkyvernElement +from skyvern.webeye.utils.dom import AbstractSelectDropdown, DomUtil, SkyvernElement LOG = structlog.get_logger() TEXT_INPUT_DELAY = 10 # 10ms between each character input @@ -427,79 +429,73 @@ async def handle_select_option_action( element_dict=element_dict, ) - # if element is not a select option, prioritize clicking the linked element if any - if tag_name != "select" and "linked_element" in element_dict: + if not await skyvern_element.is_selectable(): + # 1. find from children + # TODO: 2. find from siblings and their chidren LOG.info( - "SelectOptionAction is not on a select tag and found a linked element", + "Element is not selectable, try to find the selectable element in the chidren", + tag_name=tag_name, action=action, - linked_element=element_dict["linked_element"], - ) - listbox_click_success = await click_listbox_option(scraped_page, page, action, element_dict["linked_element"]) - if listbox_click_success: - LOG.info( - "Successfully clicked linked element", - action=action, - linked_element=element_dict["linked_element"], - ) - return [ActionSuccess()] - LOG.warning( - "Failed to click linked element", - action=action, - linked_element=element_dict["linked_element"], ) - # check if the element is an a tag first. If yes, click it instead of selecting the option - if tag_name == "label": - # label pointed to select2 element - select2_element_id: str | None = None - # search anchor first and then search anchor - select2_element_id = skyvern_element.find_element_id_in_label_children(InteractiveElement.A) - if select2_element_id is None: - select2_element_id = skyvern_element.find_element_id_in_label_children(InteractiveElement.INPUT) - - if select2_element_id is not None: - select2_skyvern_element = await dom.get_skyvern_element_by_id(element_id=select2_element_id) - if await select2_skyvern_element.is_select2_dropdown(): - LOG.info( - "SelectOptionAction is on and element left + # we should make sure the locator is on , so we're able to find the [class="select2-chosen"] child + locator = self.skyvern_element.get_locator() + if tag_name == "span": + locator = locator.locator("..") + elif tag_name == "a": + pass + else: + raise FailedToGetCurrentValueOfDropdown( + self.name(), self.skyvern_element.get_id(), "invalid element of select2" + ) + + try: + return await locator.locator("span[class='select2-chosen']").text_content(timeout=timeout) + except Exception as e: + raise FailedToGetCurrentValueOfDropdown(self.name(), self.skyvern_element.get_id(), repr(e)) async def get_options( self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS ) -> typing.List[SkyvernOptionType]: - element_handler = await self.skyvern_element.get_locator().element_handle(timeout=timeout) + anchor = await self.__find_anchor(timeout=timeout) + element_handler = await anchor.element_handle(timeout=timeout) options = await get_select2_options(self.frame, element_handler) return typing.cast(typing.List[SkyvernOptionType], options) async def select_by_index( self, index: int, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS ) -> None: - anchor = self.frame.locator("#select2-drop li[role='option']") - await anchor.nth(index).click(timeout=timeout) + anchor = await self.__find_anchor(timeout=timeout) + options = anchor.locator("li[role='option']") + await options.nth(index).click(timeout=timeout) + + +class ComboboxDropdown(AbstractSelectDropdown): + def __init__(self, frame: Page | Frame, skyvern_element: SkyvernElement) -> None: + self.skyvern_element = skyvern_element + self.frame = frame + + async def __find_anchor(self, timeout: float) -> Locator: + control_id = await self.skyvern_element.get_attr("aria-controls", timeout=timeout) + locator = self.frame.locator(f"#{control_id}") + await locator.wait_for(state="visible", timeout=timeout) + cnt = await locator.count() + if cnt == 0: + raise NoDropdownAnchorErr(self.name(), self.skyvern_element.get_id()) + if cnt > 1: + raise MultipleDropdownAnchorErr(self.name(), self.skyvern_element.get_id()) + return locator + + def name(self) -> str: + return "combobox" + + async def open(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None: + await self.skyvern_element.get_locator().click(timeout=timeout) + await self.__find_anchor(timeout=timeout) + + async def close(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> None: + await self.skyvern_element.get_locator().press("Tab", timeout=timeout) + + async def get_current_value(self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) -> str: + try: + return await self.skyvern_element.get_attr("value", dynamic=True, timeout=timeout) + except Exception as e: + raise FailedToGetCurrentValueOfDropdown(self.name(), self.skyvern_element.get_id(), repr(e)) + + async def get_options( + self, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS + ) -> typing.List[SkyvernOptionType]: + anchor = await self.__find_anchor(timeout=timeout) + element_handler = await anchor.element_handle() + options = await get_combobox_options(self.frame, element_handler) + return typing.cast(typing.List[SkyvernOptionType], options) + + async def select_by_index( + self, index: int, timeout: float = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS + ) -> None: + anchor = await self.__find_anchor(timeout=timeout) + options = anchor.locator("li[role='option']") + await options.nth(index).click(timeout=timeout)