multi-level selection support (#786)

This commit is contained in:
LawyZheng
2024-09-09 11:34:09 +08:00
committed by GitHub
parent 281977f395
commit d7cb4f3ae7
4 changed files with 279 additions and 98 deletions

View File

@@ -1,7 +1,15 @@
There is a screenshot from part of Web HTML page. Help me confirm it if it's an opened dropdown menu. There is a screenshot from part of Web HTML page. Help me confirm it if it's an opened dropdown menu.
An opened dropdown menu could be defined as:
- At least two options show on the screenshot.
MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc.
Reply in JSON format with the following keys: Reply in JSON format with the following keys:
{ {
"reasoning": str, // the reason why it's a dropdown menu or not a dropdown menu
"is_opened_dropdown_menu": bool, // true if it's a opened dropdown menu, otherwise false. "is_opened_dropdown_menu": bool, // true if it's a opened dropdown menu, otherwise false.
} }
Elements on the screenshot:
```
{{ elements }}
```

View File

@@ -81,13 +81,74 @@ LOG = structlog.get_logger()
COMMON_INPUT_TAGS = {"input", "textarea", "select"} COMMON_INPUT_TAGS = {"input", "textarea", "select"}
def remove_exist_elements(dom: DomUtil, element_tree: list[dict]) -> list[dict]: class CustomSingleSelectResult:
action_result: ActionResult | None = None
value: str | None = None
dropdown_menu: SkyvernElement | None = None
async def is_done(self) -> bool:
# check if the dropdown menu is still on the page
# if it still exists, might mean there might be multi-level selection
# FIXME: only able to execute multi-level selection logic when dropdown menu detected
if self.dropdown_menu is None:
return True
if not isinstance(self.action_result, ActionSuccess):
return True
if await self.dropdown_menu.get_locator().count() == 0:
return True
return False
def is_ul_or_listbox_element_factory(
incremental_scraped: IncrementalScrapePage, task: Task, step: Step
) -> Callable[[dict], Awaitable[bool]]:
async def wrapper(element_dict: dict) -> bool:
element_id: str = element_dict.get("id", "")
try:
element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id)
except Exception:
LOG.debug(
"Failed to element in the incremental page",
element_id=element_id,
step_id=step.step_id,
task_id=task.task_id,
exc_info=True,
)
return False
if element.get_tag_name() == "ul":
return True
if await element.get_attr("role") == "listbox":
return True
return False
return wrapper
CheckExistIDFunc = Callable[[str], bool]
def check_id_in_dict_factory(id_dict: dict[str, Any]) -> CheckExistIDFunc:
def helper(element_id: str) -> bool:
if id_dict.get(element_id, ""):
return True
return False
return helper
def remove_exist_elements(element_tree: list[dict], check_exist: CheckExistIDFunc) -> list[dict]:
new_element_tree = [] new_element_tree = []
for element in element_tree: for element in element_tree:
children_elements = element.get("children", []) children_elements = element.get("children", [])
if len(children_elements) > 0: if len(children_elements) > 0:
children_elements = remove_exist_elements(dom=dom, element_tree=children_elements) children_elements = remove_exist_elements(element_tree=children_elements, check_exist=check_exist)
if dom.check_id_in_dom(element.get("id", "")): if check_exist(element.get("id", "")):
new_element_tree.extend(children_elements) new_element_tree.extend(children_elements)
else: else:
element["children"] = children_elements element["children"] = children_elements
@@ -95,10 +156,14 @@ def remove_exist_elements(dom: DomUtil, element_tree: list[dict]) -> list[dict]:
return new_element_tree return new_element_tree
def clean_and_remove_element_tree_factory(task: Task, step: Step, dom: DomUtil) -> CleanupElementTreeFunc: def clean_and_remove_element_tree_factory(
task: Task, step: Step, check_exist_funcs: list[CheckExistIDFunc]
) -> CleanupElementTreeFunc:
async def helper_func(url: str, element_tree: list[dict]) -> list[dict]: async def helper_func(url: str, element_tree: list[dict]) -> list[dict]:
element_tree = await app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step)(url, element_tree) element_tree = await app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step)(url, element_tree)
return remove_exist_elements(dom=dom, element_tree=element_tree) for check_exist in check_exist_funcs:
element_tree = remove_exist_elements(element_tree=element_tree, check_exist=check_exist)
return element_tree
return helper_func return helper_func
@@ -369,7 +434,7 @@ async def handle_input_text_action(
await asyncio.sleep(5) await asyncio.sleep(5)
incremental_element = await incremental_scraped.get_incremental_element_tree( incremental_element = await incremental_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(task=task, step=step, dom=dom), clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]),
) )
if len(incremental_element) == 0: if len(incremental_element) == 0:
LOG.info( LOG.info(
@@ -382,7 +447,7 @@ async def handle_input_text_action(
else: else:
try: try:
# TODO: we don't select by value for the auto completion detect case # TODO: we don't select by value for the auto completion detect case
result, _ = await select_from_dropdown( result, _ = await sequentially_select_from_dropdown(
action=select_action, action=select_action,
page=page, page=page,
dom=dom, dom=dom,
@@ -674,12 +739,13 @@ async def handle_select_option_action(
is_open = True is_open = True
incremental_element = await incremental_scraped.get_incremental_element_tree( incremental_element = await incremental_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(task=task, step=step, dom=dom), clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]),
) )
if len(incremental_element) == 0: if len(incremental_element) == 0:
raise NoIncrementalElementFoundForCustomSelection(element_id=action.element_id) raise NoIncrementalElementFoundForCustomSelection(element_id=action.element_id)
result, suggested_value = await select_from_dropdown( # TODO: support sequetially select from dropdown by value, just support single select now
result, suggested_value = await sequentially_select_from_dropdown(
action=action, action=action,
page=page, page=page,
dom=dom, dom=dom,
@@ -1011,7 +1077,7 @@ async def choose_auto_completion_dropdown(
# wait for new elemnts to load # wait for new elemnts to load
await asyncio.sleep(5) await asyncio.sleep(5)
incremental_element = await incremental_scraped.get_incremental_element_tree( incremental_element = await incremental_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(task=task, step=step, dom=dom), clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]),
) )
# check if elements in preserve list are still on the page # check if elements in preserve list are still on the page
@@ -1247,7 +1313,7 @@ async def input_or_auto_complete_input(
return ActionFailure(FailToFindAutocompleteOption(current_value=text)) return ActionFailure(FailToFindAutocompleteOption(current_value=text))
async def select_from_dropdown( async def sequentially_select_from_dropdown(
action: SelectOptionAction, action: SelectOptionAction,
page: Page, page: Page,
dom: DomUtil, dom: DomUtil,
@@ -1259,6 +1325,81 @@ async def select_from_dropdown(
force_select: bool = False, force_select: bool = False,
should_relevant: bool = True, should_relevant: bool = True,
) -> tuple[ActionResult | None, str | None]: ) -> tuple[ActionResult | None, str | None]:
"""
TODO: support to return all values retrieved from the sequentially select
Only return the last value today
"""
# TODO: only suport the third-level dropdown selection now
MAX_SELECT_DEPTH = 3
values: list[str | None] = []
single_select_result = CustomSingleSelectResult()
check_exist_funcs: list[CheckExistIDFunc] = [dom.check_id_in_dom]
for i in range(MAX_SELECT_DEPTH):
single_select_result = await select_from_dropdown(
action=action,
page=page,
skyvern_frame=skyvern_frame,
incremental_scraped=incremental_scraped,
llm_handler=llm_handler,
check_exist_funcs=check_exist_funcs,
step=step,
task=task,
force_select=force_select,
should_relevant=should_relevant,
)
values.append(single_select_result.value)
if await single_select_result.is_done():
return single_select_result.action_result, values[-1] if len(values) > 0 else None
if i == MAX_SELECT_DEPTH - 1:
LOG.warning(
"Reaching the max selection depth",
depth=i,
task_id=task.task_id,
step_id=step.step_id,
)
break
LOG.info(
"Seems to be a multi-level selection, continue to select until it finishes",
selected_time=i + 1,
task_id=task.task_id,
step_id=step.step_id,
)
# wait for 3s to load new options
await asyncio.sleep(3)
current_element_to_dict = copy.deepcopy(incremental_scraped.id_to_css_dict)
check_exist_funcs.append(check_id_in_dict_factory(current_element_to_dict))
secondary_increment_element = await incremental_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(
task=task,
step=step,
check_exist_funcs=check_exist_funcs,
)
)
if len(secondary_increment_element) == 0:
return single_select_result.action_result, values[-1] if len(values) > 0 else None
return single_select_result.action_result, values[-1] if len(values) > 0 else None
async def select_from_dropdown(
action: SelectOptionAction,
page: Page,
skyvern_frame: SkyvernFrame,
incremental_scraped: IncrementalScrapePage,
llm_handler: LLMAPIHandler,
check_exist_funcs: list[CheckExistIDFunc],
step: Step,
task: Task,
force_select: bool = False,
should_relevant: bool = True,
) -> CustomSingleSelectResult:
""" """
force_select: is used to choose an element to click even there's no dropdown menu; force_select: is used to choose an element to click even there's no dropdown menu;
should_relevant: only valid when force_select is "False". When "True", the chosen value must be relevant to the target value; should_relevant: only valid when force_select is "False". When "True", the chosen value must be relevant to the target value;
@@ -1266,6 +1407,8 @@ async def select_from_dropdown(
1. force_select is false and no dropdown menu popped 1. force_select is false and no dropdown menu popped
2. force_select is false and match value is not relevant to the target value 2. force_select is false and match value is not relevant to the target value
""" """
single_select_result = CustomSingleSelectResult()
timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
dropdown_menu_element = await locate_dropdown_menu( dropdown_menu_element = await locate_dropdown_menu(
@@ -1274,24 +1417,31 @@ async def select_from_dropdown(
step=step, step=step,
task=task, task=task,
) )
single_select_result.dropdown_menu = dropdown_menu_element
if not force_select and dropdown_menu_element is None: if not force_select and dropdown_menu_element is None:
return None, None return single_select_result
if dropdown_menu_element and await skyvern_frame.get_element_scrollable( if dropdown_menu_element:
await dropdown_menu_element.get_element_handler() potential_scrollable_element = await try_to_find_potential_scrollable_element(
): skyvern_element=dropdown_menu_element,
await scroll_down_to_load_all_options(
dropdown_menu_element=dropdown_menu_element,
skyvern_frame=skyvern_frame,
page=page,
incremental_scraped=incremental_scraped, incremental_scraped=incremental_scraped,
step=step, step=step,
task=task, task=task,
) )
if await skyvern_frame.get_element_scrollable(await potential_scrollable_element.get_element_handler()):
await scroll_down_to_load_all_options(
scrollable_element=potential_scrollable_element,
skyvern_frame=skyvern_frame,
page=page,
incremental_scraped=incremental_scraped,
step=step,
task=task,
)
trimmed_element_tree = await incremental_scraped.get_incremental_element_tree( trimmed_element_tree = await incremental_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(task=task, step=step, dom=dom), clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=check_exist_funcs),
) )
html = incremental_scraped.build_html_tree(element_tree=trimmed_element_tree) html = incremental_scraped.build_html_tree(element_tree=trimmed_element_tree)
@@ -1323,6 +1473,7 @@ async def select_from_dropdown(
) )
value: str | None = json_response.get("value", None) value: str | None = json_response.get("value", None)
single_select_result.value = value
element_id: str | None = json_response.get("id", None) element_id: str | None = json_response.get("id", None)
if not element_id: if not element_id:
raise NoElementMatchedForTargetOption(target=target_value, reason=json_response.get("reasoning")) raise NoElementMatchedForTargetOption(target=target_value, reason=json_response.get("reasoning"))
@@ -1335,13 +1486,14 @@ async def select_from_dropdown(
task_id=task.task_id, task_id=task.task_id,
step_id=step.step_id, step_id=step.step_id,
) )
return None, None return single_select_result
try: try:
selected_element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id) selected_element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id)
await selected_element.scroll_into_view() await selected_element.scroll_into_view()
await selected_element.get_locator().click(timeout=timeout) await selected_element.get_locator().click(timeout=timeout)
return ActionSuccess(), None single_select_result.action_result = ActionSuccess()
return single_select_result
except MissingElement: except MissingElement:
if not value: if not value:
raise raise
@@ -1355,7 +1507,8 @@ async def select_from_dropdown(
) )
locator = await incremental_scraped.select_one_element_by_value(value=value) locator = await incremental_scraped.select_one_element_by_value(value=value)
if not locator: if not locator:
return ActionFailure(exception=MissingElement()), value single_select_result.action_result = ActionFailure(exception=MissingElement())
return single_select_result
try: try:
LOG.info( LOG.info(
@@ -1363,9 +1516,11 @@ async def select_from_dropdown(
value=value, value=value,
) )
await locator.click(timeout=timeout) await locator.click(timeout=timeout)
return ActionSuccess(), value single_select_result.action_result = ActionSuccess()
return single_select_result
except Exception as e: except Exception as e:
return ActionFailure(exception=e), value single_select_result.action_result = ActionFailure(exception=e)
return single_select_result
async def select_from_dropdown_by_value( async def select_from_dropdown_by_value(
@@ -1380,7 +1535,7 @@ async def select_from_dropdown_by_value(
) -> ActionResult: ) -> ActionResult:
timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
await incremental_scraped.get_incremental_element_tree( await incremental_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(task=task, step=step, dom=dom), clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]),
) )
element_locator = await incremental_scraped.select_one_element_by_value(value=value) element_locator = await incremental_scraped.select_one_element_by_value(value=value)
@@ -1395,16 +1550,25 @@ async def select_from_dropdown_by_value(
task=task, task=task,
) )
if not dropdown_menu_element or not await skyvern_frame.get_element_scrollable( if not dropdown_menu_element:
await dropdown_menu_element.get_element_handler() raise NoElementMatchedForTargetOption(target=value, reason="No value matched")
):
raise NoElementMatchedForTargetOption(target=value, reason="No value matched and element is not scrollable") potential_scrollable_element = await try_to_find_potential_scrollable_element(
skyvern_element=dropdown_menu_element,
incremental_scraped=incremental_scraped,
task=task,
step=step,
)
if not await skyvern_frame.get_element_scrollable(await potential_scrollable_element.get_element_handler()):
raise NoElementMatchedForTargetOption(
target=value, reason="No value matched and element can't scroll to find more options"
)
selected: bool = False selected: bool = False
async def continue_callback(incre_scraped: IncrementalScrapePage) -> bool: async def continue_callback(incre_scraped: IncrementalScrapePage) -> bool:
await incre_scraped.get_incremental_element_tree( await incre_scraped.get_incremental_element_tree(
clean_and_remove_element_tree_factory(task=task, step=step, dom=dom), clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]),
) )
element_locator = await incre_scraped.select_one_element_by_value(value=value) element_locator = await incre_scraped.select_one_element_by_value(value=value)
@@ -1417,7 +1581,7 @@ async def select_from_dropdown_by_value(
return True return True
await scroll_down_to_load_all_options( await scroll_down_to_load_all_options(
dropdown_menu_element=dropdown_menu_element, scrollable_element=potential_scrollable_element,
page=page, page=page,
skyvern_frame=skyvern_frame, skyvern_frame=skyvern_frame,
incremental_scraped=incremental_scraped, incremental_scraped=incremental_scraped,
@@ -1441,28 +1605,6 @@ async def locate_dropdown_menu(
) -> SkyvernElement | None: ) -> SkyvernElement | None:
skyvern_frame = incremental_scraped.skyvern_frame skyvern_frame = incremental_scraped.skyvern_frame
async def is_ul_or_listbox_element(element_dict: dict) -> bool:
element_id: str = element_dict.get("id", "")
try:
element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id)
except Exception:
LOG.debug(
"Failed to element in the incremental page",
element_id=element_id,
step_id=step.step_id,
task_id=task.task_id,
exc_info=True,
)
return False
if element.get_tag_name() == "ul":
return True
if await element.get_attr("role") == "listbox":
return True
return False
for idx, element_dict in enumerate(incremental_scraped.element_tree): for idx, element_dict in enumerate(incremental_scraped.element_tree):
# FIXME: confirm max to 10 nodes for now, preventing sendindg too many requests to LLM # FIXME: confirm max to 10 nodes for now, preventing sendindg too many requests to LLM
if idx >= 10: if idx >= 10:
@@ -1490,29 +1632,6 @@ async def locate_dropdown_menu(
) )
continue continue
found_element_id = await head_element.find_children_element_id_by_callback(
cb=is_ul_or_listbox_element,
)
if found_element_id and found_element_id != element_id:
LOG.debug(
"Found 'ul or listbox' element in children list",
element_id=found_element_id,
step_id=step.step_id,
task_id=task.task_id,
)
try:
head_element = await SkyvernElement.create_from_incremental(incremental_scraped, found_element_id)
element_id = found_element_id
except Exception:
LOG.debug(
"Failed to get head element by found element id, use the orignal element id",
element_id=found_element_id,
step_id=step.step_id,
task_id=task.task_id,
exc_info=True,
)
if not await skyvern_frame.get_element_visible(await head_element.get_element_handler()): if not await skyvern_frame.get_element_visible(await head_element.get_element_handler()):
LOG.debug( LOG.debug(
"Skip the element since it's invisible", "Skip the element since it's invisible",
@@ -1525,7 +1644,12 @@ async def locate_dropdown_menu(
screenshot = await head_element.get_locator().screenshot( screenshot = await head_element.get_locator().screenshot(
timeout=SettingsManager.get_settings().BROWSER_SCREENSHOT_TIMEOUT_MS timeout=SettingsManager.get_settings().BROWSER_SCREENSHOT_TIMEOUT_MS
) )
dropdown_confirm_prompt = prompt_engine.load_prompt("opened-dropdown-confirm")
# only for detecting the dropdown menu, better to send untrimmed HTML without skyvern attributes
dropdown_confirm_prompt = prompt_engine.load_prompt(
"opened-dropdown-confirm",
elements=head_element.build_HTML(need_trim_element=False, need_skyvern_attrs=False),
)
LOG.debug( LOG.debug(
"Confirm if it's an opened dropdown menu", "Confirm if it's an opened dropdown menu",
step_id=step.step_id, step_id=step.step_id,
@@ -1545,8 +1669,43 @@ async def locate_dropdown_menu(
return None return None
async def try_to_find_potential_scrollable_element(
skyvern_element: SkyvernElement,
incremental_scraped: IncrementalScrapePage,
task: Task,
step: Step,
) -> SkyvernElement:
"""
check any <ul> or <role="listbox"> element in the chidlren.
if yes, return the found element,
eles, return the orginal one
"""
found_element_id = await skyvern_element.find_children_element_id_by_callback(
cb=is_ul_or_listbox_element_factory(incremental_scraped=incremental_scraped, task=task, step=step),
)
if found_element_id and found_element_id != skyvern_element.get_id():
LOG.debug(
"Found 'ul or listbox' element in children list",
element_id=found_element_id,
step_id=step.step_id,
task_id=task.task_id,
)
try:
skyvern_element = await SkyvernElement.create_from_incremental(incremental_scraped, found_element_id)
except Exception:
LOG.debug(
"Failed to get head element by found element id, use the orignal element id",
element_id=found_element_id,
step_id=step.step_id,
task_id=task.task_id,
exc_info=True,
)
return skyvern_element
async def scroll_down_to_load_all_options( async def scroll_down_to_load_all_options(
dropdown_menu_element: SkyvernElement, scrollable_element: SkyvernElement,
page: Page, page: Page,
skyvern_frame: SkyvernFrame, skyvern_frame: SkyvernFrame,
incremental_scraped: IncrementalScrapePage, incremental_scraped: IncrementalScrapePage,
@@ -1562,14 +1721,14 @@ async def scroll_down_to_load_all_options(
) )
timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
dropdown_menu_element_handle = await dropdown_menu_element.get_locator().element_handle(timeout=timeout) dropdown_menu_element_handle = await scrollable_element.get_locator().element_handle(timeout=timeout)
if dropdown_menu_element_handle is None: if dropdown_menu_element_handle is None:
LOG.info("element handle is None, using focus to move the cursor", element_id=dropdown_menu_element.get_id()) LOG.info("element handle is None, using focus to move the cursor", element_id=scrollable_element.get_id())
await dropdown_menu_element.get_locator().focus(timeout=timeout) await scrollable_element.get_locator().focus(timeout=timeout)
else: else:
await dropdown_menu_element_handle.scroll_into_view_if_needed(timeout=timeout) await dropdown_menu_element_handle.scroll_into_view_if_needed(timeout=timeout)
await dropdown_menu_element.move_mouse_to(page=page) await scrollable_element.move_mouse_to(page=page)
scroll_pace = 0 scroll_pace = 0
previous_num = await incremental_scraped.get_incremental_elements_num() previous_num = await incremental_scraped.get_incremental_elements_num()
@@ -1581,11 +1740,12 @@ async def scroll_down_to_load_all_options(
# make sure we can scroll to the bottom # make sure we can scroll to the bottom
scroll_interval = SettingsManager.get_settings().BROWSER_HEIGHT * 5 scroll_interval = SettingsManager.get_settings().BROWSER_HEIGHT * 5
if dropdown_menu_element_handle is None: if dropdown_menu_element_handle is None:
LOG.info("element handle is None, using mouse to scroll down", element_id=dropdown_menu_element.get_id()) LOG.info("element handle is None, using mouse to scroll down", element_id=scrollable_element.get_id())
await page.mouse.wheel(0, scroll_interval) await page.mouse.wheel(0, scroll_interval)
scroll_pace += scroll_interval scroll_pace += scroll_interval
else: else:
await skyvern_frame.scroll_to_element_bottom(dropdown_menu_element_handle, page_by_page) await skyvern_frame.scroll_to_element_bottom(dropdown_menu_element_handle, page_by_page)
# wait until animation ends, otherwise the scroll operation could be overwritten
await asyncio.sleep(2) await asyncio.sleep(2)
# scoll a little back and scoll down to trigger the loading # scoll a little back and scoll down to trigger the loading
@@ -1613,7 +1773,7 @@ async def scroll_down_to_load_all_options(
# scoll back to the start point and wait for a while to make all options invisible on the page # scoll back to the start point and wait for a while to make all options invisible on the page
if dropdown_menu_element_handle is None: if dropdown_menu_element_handle is None:
LOG.info("element handle is None, using mouse to scroll back", element_id=dropdown_menu_element.get_id()) LOG.info("element handle is None, using mouse to scroll back", element_id=scrollable_element.get_id())
await page.mouse.wheel(0, -scroll_pace) await page.mouse.wheel(0, -scroll_pace)
else: else:
await skyvern_frame.scroll_to_element_top(dropdown_menu_element_handle) await skyvern_frame.scroll_to_element_top(dropdown_menu_element_handle)

View File

@@ -84,15 +84,16 @@ def build_attribute(key: str, value: Any) -> str:
return f'{key}="{str(value)}"' if value else key return f'{key}="{str(value)}"' if value else key
def json_to_html(element: dict) -> str: def json_to_html(element: dict, need_skyvern_attrs: bool = True) -> str:
attributes: dict[str, Any] = copy.deepcopy(element.get("attributes", {})) attributes: dict[str, Any] = copy.deepcopy(element.get("attributes", {}))
# adding the node attribute to attributes if need_skyvern_attrs:
for attr in ELEMENT_NODE_ATTRIBUTES: # adding the node attribute to attributes
value = element.get(attr) for attr in ELEMENT_NODE_ATTRIBUTES:
if value is None: value = element.get(attr)
continue if value is None:
attributes[attr] = value continue
attributes[attr] = value
attributes_html = " ".join(build_attribute(key, value) for key, value in attributes.items()) attributes_html = " ".join(build_attribute(key, value) for key, value in attributes.items())
@@ -487,10 +488,8 @@ class IncrementalScrapePage:
return "".join([json_to_html(element) for element in (element_tree or self.element_tree_trimmed)]) return "".join([json_to_html(element) for element in (element_tree or self.element_tree_trimmed)])
def trim_element_tree(elements: list[dict]) -> list[dict]: def trim_element(element: dict) -> dict:
queue = [] queue = [element]
for element in elements:
queue.append(element)
while queue: while queue:
queue_ele = queue.pop(0) queue_ele = queue.pop(0)
if "frame" in queue_ele: if "frame" in queue_ele:
@@ -524,6 +523,12 @@ def trim_element_tree(elements: list[dict]) -> list[dict]:
element_text = str(queue_ele["text"]).strip() element_text = str(queue_ele["text"]).strip()
if not element_text: if not element_text:
del queue_ele["text"] del queue_ele["text"]
return element
def trim_element_tree(elements: list[dict]) -> list[dict]:
for element in elements:
trim_element(element)
return elements return elements

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import copy
import typing import typing
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import StrEnum from enum import StrEnum
@@ -28,7 +29,7 @@ from skyvern.exceptions import (
SkyvernException, SkyvernException,
) )
from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.settings_manager import SettingsManager
from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, json_to_html, trim_element
from skyvern.webeye.utils.page import SkyvernFrame from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger() LOG = structlog.get_logger()
@@ -130,6 +131,13 @@ class SkyvernElement:
self.__frame = frame self.__frame = frame
self.locator = locator self.locator = locator
def build_HTML(self, need_trim_element: bool = True, need_skyvern_attrs: bool = True) -> str:
element_dict = self.get_element_dict()
if need_trim_element:
element_dict = trim_element(copy.deepcopy(element_dict))
return json_to_html(element_dict, need_skyvern_attrs)
async def is_select2_dropdown(self) -> bool: async def is_select2_dropdown(self) -> bool:
tag_name = self.get_tag_name() tag_name = self.get_tag_name()
element_class = await self.get_attr("class") element_class = await self.get_attr("class")