Support listbox generated by dropdown selection click with linked_element concept (#53)

This commit is contained in:
Shuchang Zheng
2024-03-12 11:37:41 -07:00
committed by GitHub
parent ad6de7faf0
commit 59a4a528e0
4 changed files with 284 additions and 2 deletions

View File

@@ -28,7 +28,7 @@ from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.forge.sdk.workflow.context_manager import ContextManager
from skyvern.forge.sdk.workflow.models.block import TaskBlock
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun
from skyvern.webeye.actions.actions import Action, ActionType, CompleteAction, parse_actions
from skyvern.webeye.actions.actions import Action, ActionType, CompleteAction, WebAction, parse_actions
from skyvern.webeye.actions.handler import ActionHandler
from skyvern.webeye.actions.models import AgentStepOutput, DetailedAgentStepOutput
from skyvern.webeye.actions.responses import ActionResult
@@ -381,7 +381,14 @@ class ForgeAgent(Agent):
# of an exception, we can still see all the actions
detailed_agent_step_output.actions_and_results = [(action, []) for action in actions]
web_action_element_ids = set()
for action_idx, action in enumerate(actions):
if isinstance(action, WebAction):
if action.element_id in web_action_element_ids:
LOG.error("Duplicate action element id. Action handling stops", action=action)
break
web_action_element_ids.add(action.element_id)
results = await ActionHandler.handle_action(scraped_page, task, step, browser_state, action)
detailed_agent_step_output.actions_and_results[action_idx] = (action, results)
# wait random time between actions to avoid detection
@@ -408,7 +415,9 @@ class ForgeAgent(Agent):
# for now, we're being optimistic and assuming that
# js call doesn't have impact on the following actions
if results[-1].javascript_triggered:
LOG.info("Action triggered javascript, ", action=action)
LOG.info("Action triggered javascript. Stop executing reamaining actions.", action=action)
# stop executing the rest actions
break
else:
LOG.warning(
"Action failed, marking step as failed",

View File

@@ -155,6 +155,92 @@ async def handle_select_option_action(
) -> list[ActionResult]:
xpath = await validate_actions_in_dom(action, page, scraped_page)
locator = page.locator(f"xpath={xpath}")
tag_name = await get_tag_name_lowercase(locator)
element_dict = scraped_page.id_to_element_dict[action.element_id]
LOG.info("SelectOptionAction", action=action, tag_name=tag_name, element_dict=element_dict)
# if element is not a select option, prioritize clicking the linked element if any
if tag_name != "select" and "linked_element" in element_dict:
LOG.info(
"SelectOptionAction is not on a select tag and found a linked element",
action=action,
linked_element=element_dict["linked_element"],
)
listbox_click_success = await click_listbox_option(scraped_page, page, action, element_dict["linked_element"])
if listbox_click_success:
LOG.info(
"Successfully clicked linked element",
action=action,
linked_element=element_dict["linked_element"],
)
return [ActionSuccess()]
LOG.warning("Failed to click linked element", action=action, linked_element=element_dict["linked_element"])
# check if the element is an a tag first. If yes, click it instead of selecting the option
if tag_name == "label":
# TODO: this is a hack to handle the case where the label is the only thing that's clickable
# it's a label, look for the anchor tag
child_anchor_xpath = get_anchor_to_click(scraped_page, action.element_id)
if child_anchor_xpath:
LOG.info(
"SelectOptionAction is a label tag. Clicking the anchor tag instead of selecting the option",
action=action,
child_anchor_xpath=child_anchor_xpath,
)
click_action = ClickAction(element_id=action.element_id)
return await chain_click(page, click_action, child_anchor_xpath)
return [ActionFailure(Exception("No anchor tag found for the label for SelectOptionAction"))]
elif tag_name == "a":
# turn the SelectOptionAction into a ClickAction
LOG.info(
"SelectOptionAction is an anchor tag. Clicking it instead of selecting the option",
action=action,
)
click_action = ClickAction(element_id=action.element_id)
action_result = await chain_click(page, click_action, xpath)
return action_result
elif tag_name == "ul" or tag_name == "div" or tag_name == "li":
# if the role is listbox, find the option with the "label" or "value" and click that option element
# references:
# https://developer.mozilla.org/en-US/docs/Web/Accessibility/ARIA/Roles/listbox_role
# https://developer.mozilla.org/en-US/docs/Web/Accessibility/ARIA/Roles/option_role
role_attribute = await locator.get_attribute("role")
if role_attribute == "listbox":
LOG.info(
"SelectOptionAction on a listbox element. Searching for the option and click it",
action=action,
)
# use playwright to click the option
# clickOption is defined in domUtils.js
option_locator = locator.locator('[role="option"]')
option_num = await option_locator.count()
if action.option.index and action.option.index < option_num:
try:
await option_locator.nth(action.option.index).click(timeout=2000)
return [ActionSuccess()]
except Exception as e:
LOG.error(
"Failed to click option",
action=action,
exception=e,
)
return [ActionFailure(e)]
return [ActionFailure(Exception(f"SelectOption option index is missing"))]
elif role_attribute == "option":
LOG.info(
"SelectOptionAction on an option element. Clicking the option",
action=action,
)
# click the option element
click_action = ClickAction(element_id=action.element_id)
return await chain_click(page, click_action, xpath)
else:
LOG.error(
"SelectOptionAction on a non-listbox element. Cannot handle this action",
)
return [ActionFailure(Exception(f"Cannot handle SelectOptionAction on a non-listbox element"))]
try:
# First click by label (if it matches)
await page.click(f"xpath={xpath}", timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS)
@@ -354,6 +440,20 @@ async def chain_click(
page.remove_listener("filechooser", fc_func)
def get_anchor_to_click(scraped_page: ScrapedPage, element_id: int) -> str | None:
"""
Get the anchor tag under the label to click
"""
LOG.info("Getting anchor tag to click", element_id=element_id)
element_id = int(element_id)
for ele in scraped_page.elements:
if "id" in ele and ele["id"] == element_id:
for child in ele["children"]:
if "tagName" in child and child["tagName"] == "a":
return scraped_page.id_to_xpath_dict[child["id"]]
return None
async def is_javascript_triggered(page: Page, xpath: str) -> bool:
locator = page.locator(f"xpath={xpath}")
element = locator.first
@@ -366,6 +466,14 @@ async def is_javascript_triggered(page: Page, xpath: str) -> bool:
return False
async def get_tag_name_lowercase(locator: Locator) -> str | None:
element = locator.first
if element:
tag_name = await element.evaluate("e => e.tagName")
return tag_name.lower()
return None
async def is_file_input_element(locator: Locator) -> bool:
element = locator.first
if element:
@@ -443,3 +551,39 @@ async def extract_information_for_navigation_goal(
return ScrapeResult(
scraped_data=json_response,
)
async def click_listbox_option(
scraped_page: ScrapedPage,
page: Page,
action: actions.SelectOptionAction,
listbox_element_id: int,
) -> bool:
listbox_element = scraped_page.id_to_element_dict[listbox_element_id]
# this is a listbox element, get all the children
if "children" not in listbox_element:
return False
LOG.info("starting bfs", listbox_element_id=listbox_element_id)
bfs_queue = [child for child in listbox_element["children"]]
while bfs_queue:
child = bfs_queue.pop(0)
LOG.info("popped child", element_id=child["id"])
if "attributes" in child and "role" in child["attributes"] and child["attributes"]["role"] == "option":
LOG.info("found option", element_id=child["id"])
text = child["text"] if "text" in child else ""
if text and (text == action.option.label or text == action.option.value):
option_xpath = scraped_page.id_to_xpath_dict[child["id"]]
try:
await page.click(f"xpath={option_xpath}", timeout=1000)
return True
except Exception as e:
LOG.error(
"Failed to click on the option",
action=action,
option_xpath=option_xpath,
exception=e,
)
if "children" in child:
bfs_queue.extend(child["children"])
return False

View File

@@ -358,6 +358,22 @@ function isInteractable(element) {
return hasPointer || hasCursor;
}
// support listbox and options underneath it
if (
(tagName === "ul" || tagName === "div") &&
element.hasAttribute("role") &&
element.getAttribute("role").toLowerCase() === "listbox"
) {
return true;
}
if (
(tagName === "li" || tagName === "div") &&
element.hasAttribute("role") &&
element.getAttribute("role").toLowerCase() === "option"
) {
return true;
}
return false;
}
@@ -463,6 +479,20 @@ function getSelectOptions(element) {
return selectOptions;
}
function getListboxOptions(element) {
// get all the elements with role="option" under the element
var optionElements = element.querySelectorAll('[role="option"]');
let selectOptions = [];
for (var i = 0; i < optionElements.length; i++) {
var ele = optionElements[i];
selectOptions.push({
optionIndex: i,
text: removeMultipleSpaces(ele.textContent),
});
}
return selectOptions;
}
function buildTreeFromBody() {
var elements = [];
var resultArray = [];
@@ -512,6 +542,9 @@ function buildTreeFromBody() {
let selectOptions = null;
if (elementTagNameLower === "select") {
selectOptions = getSelectOptions(element);
} else if (attrs["role"] && attrs["role"].toLowerCase() === "listbox") {
// if "role" key is inside attrs, then get all the elements with role "option" and get their text
selectOptions = getListboxOptions(element);
}
if (selectOptions) {
elementObj.options = selectOptions;
@@ -786,6 +819,7 @@ function removeBoundingBoxes() {
function scrollToTop(draw_boxes) {
removeBoundingBoxes();
window.scrollTo(0, 0);
scrollDownAndUp();
if (draw_boxes) {
var elementsAndResultArray = buildTreeFromBody();
drawBoundingBoxes(elementsAndResultArray[0]);
@@ -798,9 +832,39 @@ function scrollToNextPage(draw_boxes) {
// return true if there is a next page, false otherwise
removeBoundingBoxes();
window.scrollBy(0, window.innerHeight - 200);
scrollUpAndDown();
if (draw_boxes) {
var elementsAndResultArray = buildTreeFromBody();
drawBoundingBoxes(elementsAndResultArray[0]);
}
return window.scrollY;
}
function scrollUpAndDown() {
// remove select2-drop-above class to prevent dropdown from being rendered on top of the box
// then scroll up by 1 and scroll down by 1
removeSelect2DropAbove();
window.scrollBy(0, -1);
removeSelect2DropAbove();
window.scrollBy(0, 1);
}
function scrollDownAndUp() {
// remove select2-drop-above class to prevent dropdown from being rendered on top of the box
// then scroll up by 1 and scroll down by 1
removeSelect2DropAbove();
window.scrollBy(0, 1);
removeSelect2DropAbove();
window.scrollBy(0, -1);
}
function removeSelect2DropAbove() {
var select2DropAbove = document.getElementsByClassName("select2-drop-above");
var allElements = [];
for (var i = 0; i < select2DropAbove.length; i++) {
allElements.push(select2DropAbove[i]);
}
allElements.forEach((ele) => {
ele.classList.remove("select2-drop-above");
});
}

View File

@@ -1,5 +1,6 @@
import asyncio
import copy
from collections import defaultdict
import structlog
from playwright.async_api import Page
@@ -68,6 +69,7 @@ class ScrapedPage(BaseModel):
"""
elements: list[dict]
id_to_element_dict: dict[int, dict] = {}
id_to_xpath_dict: dict[int, str]
element_tree: list[dict]
element_tree_trimmed: list[dict]
@@ -180,16 +182,21 @@ async def scrape_web_unsafe(
elements, element_tree = await get_interactable_element_tree(page)
element_tree = cleanup_elements(copy.deepcopy(element_tree))
_build_element_links(elements)
id_to_xpath_dict = {}
id_to_element_dict = {}
for element in elements:
element_id = element["id"]
# get_interactable_element_tree marks each interactable element with a unique_id attribute
id_to_xpath_dict[element_id] = f"//*[@{SKYVERN_ID_ATTR}='{element_id}']"
id_to_element_dict[element_id] = element
text_content = await get_all_visible_text(page)
return ScrapedPage(
elements=elements,
id_to_xpath_dict=id_to_xpath_dict,
id_to_element_dict=id_to_element_dict,
element_tree=element_tree,
element_tree_trimmed=trim_element_tree(copy.deepcopy(element_tree)),
screenshots=screenshots,
@@ -299,6 +306,8 @@ def _trimmed_attributes(tag_name: str, attributes: dict) -> dict:
if key == "id" and tag_name in ["input", "textarea", "select"]:
# We don't want to remove the id attribute any of these elements in case there's a label for it
new_attributes[key] = attributes[key]
if key == "role" and attributes[key] in ["listbox", "option"]:
new_attributes[key] = attributes[key]
if key in RESERVED_ATTRIBUTES:
new_attributes[key] = attributes[key]
return new_attributes
@@ -314,3 +323,59 @@ def _remove_unique_id(element: dict) -> None:
return
if SKYVERN_ID_ATTR in element["attributes"]:
del element["attributes"][SKYVERN_ID_ATTR]
def _build_element_links(elements: list[dict]) -> None:
"""
Build the links for listbox. A listbox could be mapped back to another element if:
1. The listbox element's text matches context or text of an element
"""
# first, build mapping between text/context and elements
text_to_elements_map: dict[str, list[dict]] = defaultdict(list)
context_to_elements_map: dict[str, list[dict]] = defaultdict(list)
for element in elements:
if "text" in element:
text_to_elements_map[element["text"]].append(element)
if "context" in element:
context_to_elements_map[element["context"]].append(element)
# then, build the links from element to listbox elements
for element in elements:
if not (
"attributes" in element and "role" in element["attributes"] and "listbox" == element["attributes"]["role"]
):
continue
listbox_text = element["text"] if "text" in element else ""
# WARNING: If a listbox has really little commont content (yes/no, etc.),
# it might have conflict and will connect to wrong element. If so, code should be added to prevent that:
# if len(listbox_text) < 10:
# # do not support small listbox text as it's error proning. larger text match is more reliable
# continue
for text, linked_elements in text_to_elements_map.items():
if listbox_text in text:
for linked_element in linked_elements:
if linked_element["id"] != element["id"]:
LOG.info(
"Match listbox to target element text",
listbox_text=listbox_text,
text=text,
listbox_id=element["id"],
linked_element_id=linked_element["id"],
)
linked_element["linked_element"] = element["id"]
for context, linked_elements in context_to_elements_map.items():
if listbox_text in context:
for linked_element in linked_elements:
# if _ensure_nearby_rects(element["rect"], linked_element["rect"]):
if linked_element["id"] != element["id"]:
LOG.info(
"Match listbox to target element context",
listbox_text=listbox_text,
context=context,
listbox_id=element["id"],
linked_element_id=linked_element["id"],
)
linked_element["linked_element"] = element["id"]