import asyncio import copy import json import os import urllib.parse import uuid from datetime import datetime, timedelta, timezone from typing import Any, Awaitable, Callable, List import structlog from playwright.async_api import FileChooser, Frame, Locator, Page, TimeoutError from pydantic import BaseModel from skyvern.constants import REPO_ROOT_DIR, SKYVERN_ID_ATTR from skyvern.exceptions import ( EmptySelect, ErrEmptyTweakValue, ErrFoundSelectableElement, FailedToFetchSecret, FailToClick, FailToSelectByIndex, FailToSelectByLabel, FailToSelectByValue, IllegitComplete, ImaginaryFileUrl, InteractWithDisabledElement, InvalidElementForTextInput, MissingElement, MissingElementDict, MissingElementInCSSMap, MissingFileUrl, MultipleElementsFound, NoAutoCompleteOptionMeetCondition, NoAvailableOptionFoundForCustomSelection, NoElementMatchedForTargetOption, NoIncrementalElementFoundForAutoCompletion, NoIncrementalElementFoundForCustomSelection, NoSuitableAutoCompleteOption, OptionIndexOutOfBound, WrongElementToUploadFile, ) from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import ( download_file, get_number_of_files_in_directory, get_path_for_workflow_download_directory, ) from skyvern.forge.sdk.core.aiohttp_helper import aiohttp_post from skyvern.forge.sdk.core.security import generate_skyvern_signature from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.services.bitwarden import BitwardenConstants from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.webeye.actions import actions from skyvern.webeye.actions.actions import ( Action, ActionStatus, ActionType, CheckboxAction, ClickAction, InputOrSelectContext, ScrapeResult, SelectOption, SelectOptionAction, UploadFileAction, WebAction, ) from skyvern.webeye.actions.responses import ActionAbort, ActionFailure, ActionResult, ActionSuccess from skyvern.webeye.scraper.scraper import ( CleanupElementTreeFunc, ElementTreeFormat, IncrementalScrapePage, ScrapedPage, hash_element, json_to_html, trim_element_tree, ) from skyvern.webeye.utils.dom import DomUtil, InteractiveElement, SkyvernElement from skyvern.webeye.utils.page import SkyvernFrame LOG = structlog.get_logger() COMMON_INPUT_TAGS = {"input", "textarea", "select"} class CustomSingleSelectResult: def __init__(self, skyvern_frame: SkyvernFrame) -> None: self.reasoning: str | None = None self.action_result: ActionResult | None = None self.value: str | None = None self.dropdown_menu: SkyvernElement | None = None self.skyvern_frame = skyvern_frame async def is_done(self) -> bool: # check if the dropdown menu is still on the page # if it still exists, might mean there might be multi-level selection # FIXME: only able to execute multi-level selection logic when dropdown menu detected if self.dropdown_menu is None: return True if not isinstance(self.action_result, ActionSuccess): return True if await self.dropdown_menu.get_locator().count() == 0: return True return not await self.skyvern_frame.get_element_visible(await self.dropdown_menu.get_element_handler()) def is_ul_or_listbox_element_factory( incremental_scraped: IncrementalScrapePage, task: Task, step: Step ) -> Callable[[dict], Awaitable[bool]]: async def wrapper(element_dict: dict) -> bool: element_id: str = element_dict.get("id", "") try: element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id) except Exception: LOG.debug( "Failed to element in the incremental page", element_id=element_id, step_id=step.step_id, task_id=task.task_id, exc_info=True, ) return False if element.get_tag_name() == "ul": return True if await element.get_attr("role") == "listbox": return True return False return wrapper CheckExistIDFunc = Callable[[str], bool] def check_id_in_dict_factory(id_dict: dict[str, Any]) -> CheckExistIDFunc: def helper(element_id: str) -> bool: if id_dict.get(element_id, ""): return True return False return helper def remove_exist_elements(element_tree: list[dict], check_exist: CheckExistIDFunc) -> list[dict]: new_element_tree = [] for element in element_tree: children_elements = element.get("children", []) if len(children_elements) > 0: children_elements = remove_exist_elements(element_tree=children_elements, check_exist=check_exist) if check_exist(element.get("id", "")): new_element_tree.extend(children_elements) else: element["children"] = children_elements new_element_tree.append(element) return new_element_tree def clean_and_remove_element_tree_factory( task: Task, step: Step, check_exist_funcs: list[CheckExistIDFunc] ) -> CleanupElementTreeFunc: async def helper_func(frame: Page | Frame, url: str, element_tree: list[dict]) -> list[dict]: element_tree = await app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step)( frame, url, element_tree ) for check_exist in check_exist_funcs: element_tree = remove_exist_elements(element_tree=element_tree, check_exist=check_exist) return element_tree return helper_func class AutoCompletionResult(BaseModel): auto_completion_attempt: bool = False incremental_elements: list[dict] = [] action_result: ActionResult = ActionSuccess() class ActionHandler: _handled_action_types: dict[ ActionType, Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ] = {} _setup_action_types: dict[ ActionType, Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ] = {} _teardown_action_types: dict[ ActionType, Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ] = {} @classmethod def register_action_type( cls, action_type: ActionType, handler: Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ) -> None: cls._handled_action_types[action_type] = handler @classmethod def register_setup_for_action_type( cls, action_type: ActionType, handler: Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ) -> None: cls._setup_action_types[action_type] = handler @classmethod def register_teardown_for_action_type( cls, action_type: ActionType, handler: Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ) -> None: cls._teardown_action_types[action_type] = handler @staticmethod async def handle_action( scraped_page: ScrapedPage, task: Task, step: Step, page: Page, action: Action, ) -> list[ActionResult]: LOG.info("Handling action", action=action) actions_result: list[ActionResult] = [] try: if action.action_type in ActionHandler._handled_action_types: invalid_web_action_check = check_for_invalid_web_action(action, page, scraped_page, task, step) if invalid_web_action_check: actions_result.extend(invalid_web_action_check) return actions_result # do setup before action handler if setup := ActionHandler._setup_action_types.get(action.action_type): results = await setup(action, page, scraped_page, task, step) actions_result.extend(results) if results and results[-1] != ActionSuccess: return actions_result # do the handler handler = ActionHandler._handled_action_types[action.action_type] results = await handler(action, page, scraped_page, task, step) actions_result.extend(results) if not results or not isinstance(actions_result[-1], ActionSuccess): return actions_result # do the teardown teardown = ActionHandler._teardown_action_types.get(action.action_type) if teardown: results = await teardown(action, page, scraped_page, task, step) actions_result.extend(results) return actions_result else: LOG.error( "Unsupported action type in handler", action=action, type=type(action), ) actions_result.append(ActionFailure(Exception(f"Unsupported action type: {type(action)}"))) return actions_result except MissingElement as e: LOG.info( "Known exceptions", action=action, exception_type=type(e), exception_message=str(e), ) actions_result.append(ActionFailure(e)) except MultipleElementsFound as e: LOG.exception( "Cannot handle multiple elements with the same selector in one action.", action=action, ) actions_result.append(ActionFailure(e)) except Exception as e: LOG.exception("Unhandled exception in action handler", action=action) actions_result.append(ActionFailure(e)) finally: if actions_result and isinstance(actions_result[-1], ActionSuccess): action.status = ActionStatus.completed elif actions_result and isinstance(actions_result[-1], ActionAbort): action.status = ActionStatus.skipped else: # either actions_result is empty or the last action is a failure if not actions_result: LOG.warning("Action failed to execute, setting status to failed", action=action) action.status = ActionStatus.failed await app.DATABASE.create_action(action=action) return actions_result def check_for_invalid_web_action( action: actions.Action, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: if isinstance(action, WebAction) and action.element_id not in scraped_page.id_to_element_dict: return [ActionFailure(MissingElement(element_id=action.element_id), stop_execution_on_failure=False)] return [] async def handle_solve_captcha_action( action: actions.SolveCaptchaAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: LOG.warning( "Please solve the captcha on the page, you have 30 seconds", action=action, ) await asyncio.sleep(30) return [ActionSuccess()] async def handle_click_action( action: actions.ClickAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: num_downloaded_files_before = 0 download_dir = None if task.workflow_run_id: download_dir = get_path_for_workflow_download_directory(task.workflow_run_id) num_downloaded_files_before = get_number_of_files_in_directory(download_dir) LOG.info( "Number of files in download directory before click", num_downloaded_files_before=num_downloaded_files_before, download_dir=download_dir, ) dom = DomUtil(scraped_page=scraped_page, page=page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) await asyncio.sleep(0.3) # dynamically validate the attr, since it could change into enabled after the previous actions if await skyvern_element.is_disabled(dynamic=True): LOG.warning( "Try to click on a disabled element", action_type=action.action_type, task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), ) return [ActionFailure(InteractWithDisabledElement(skyvern_element.get_id()))] if action.download: results = await handle_click_to_download_file_action(action, page, scraped_page, task) else: results = await chain_click( task, scraped_page, page, action, skyvern_element, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, ) if results and task.workflow_run_id and download_dir: LOG.info("Sleeping for 5 seconds to let the download finish") await asyncio.sleep(5) num_downloaded_files_after = get_number_of_files_in_directory(download_dir) LOG.info( "Number of files in download directory after click", num_downloaded_files_after=num_downloaded_files_after, download_dir=download_dir, ) if num_downloaded_files_after > num_downloaded_files_before: results[-1].download_triggered = True return results async def handle_click_to_download_file_action( action: actions.ClickAction, page: Page, scraped_page: ScrapedPage, task: Task, ) -> list[ActionResult]: dom = DomUtil(scraped_page=scraped_page, page=page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) locator = skyvern_element.locator try: await locator.click(timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) await page.wait_for_load_state(timeout=SettingsManager.get_settings().BROWSER_LOADING_TIMEOUT_MS) except Exception as e: LOG.exception("ClickAction with download failed", action=action, exc_info=True) return [ActionFailure(e, download_triggered=False)] return [ActionSuccess()] async def handle_input_text_action( action: actions.InputTextAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: dom = DomUtil(scraped_page, page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) skyvern_frame = await SkyvernFrame.create_instance(skyvern_element.get_frame()) incremental_scraped = IncrementalScrapePage(skyvern_frame=skyvern_frame) timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS current_text = await get_input_value(skyvern_element.get_tag_name(), skyvern_element.get_locator()) if current_text == action.text: return [ActionSuccess()] # before filling text, we need to validate if the element can be filled if it's not one of COMMON_INPUT_TAGS tag_name = scraped_page.id_to_element_dict[action.element_id]["tagName"].lower() text = await get_actual_value_of_parameter_if_secret(task, action.text) if text is None: return [ActionFailure(FailedToFetchSecret())] # dynamically validate the attr, since it could change into enabled after the previous actions if await skyvern_element.is_disabled(dynamic=True): LOG.warning( "Try to input text on a disabled element", action_type=action.action_type, task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), ) return [ActionFailure(InteractWithDisabledElement(skyvern_element.get_id()))] incremental_element: list[dict] = [] auto_complete_hacky_flag: bool = False # check if it's selectable if skyvern_element.get_tag_name() == InteractiveElement.INPUT and not await skyvern_element.is_raw_input(): select_action = SelectOptionAction( reasoning=action.reasoning, element_id=skyvern_element.get_id(), option=SelectOption(label=text), ) await skyvern_element.scroll_into_view() if skyvern_element.get_selectable(): LOG.info( "Input element is selectable, doing select actions", task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), action=action, ) return await handle_select_option_action(select_action, page, scraped_page, task, step) # press arrowdown to watch if there's any options popping up await incremental_scraped.start_listen_dom_increment() try: await skyvern_element.press_key("ArrowDown") except TimeoutError: # sometimes we notice `press_key()` raise a timeout but actually the dropdown is opened. LOG.info( "Timeout to press ArrowDown to open dropdown, ignore the timeout and continue to execute the action", task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), action=action, ) await asyncio.sleep(5) incremental_element = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) if len(incremental_element) == 0: LOG.info( "No new element detected, indicating it couldn't be a selectable auto-completion input", task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), action=action, ) await incremental_scraped.stop_listen_dom_increment() else: auto_complete_hacky_flag = True try: # TODO: we don't select by value for the auto completion detect case result, _ = await sequentially_select_from_dropdown( action=select_action, page=page, dom=dom, skyvern_frame=skyvern_frame, incremental_scraped=incremental_scraped, step=step, task=task, target_value=text, ) if result is not None: return [result] LOG.info( "It might not be a selectable auto-completion input, exit the custom selection mode", task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), action=action, ) except Exception: await skyvern_element.scroll_into_view() LOG.warning( "Failed to do custom selection transformed from input action, continue to input text", exc_info=True, task_id=task.task_id, step_id=step.step_id, ) finally: await skyvern_element.press_key("Escape") await skyvern_element.blur() await incremental_scraped.stop_listen_dom_increment() # force to move focus back to the element await skyvern_element.get_locator().focus(timeout=timeout) # `Locator.clear()` on a spin button could cause the cursor moving away, and never be back if not await skyvern_element.is_spinbtn_input(): try: await skyvern_element.input_clear() except TimeoutError: LOG.info("None input tag clear timeout", action=action) return [ActionFailure(InvalidElementForTextInput(element_id=action.element_id, tag_name=tag_name))] except Exception: LOG.warning("Failed to clear the input field", action=action, exc_info=True) return [ActionFailure(InvalidElementForTextInput(element_id=action.element_id, tag_name=tag_name))] try: # TODO: not sure if this case will trigger auto-completion if tag_name not in COMMON_INPUT_TAGS: await skyvern_element.input_fill(text) return [ActionSuccess()] if len(text) == 0: return [ActionSuccess()] # parse the input context to help executing input action prompt = prompt_engine.load_prompt( "parse-input-or-select-context", element_id=action.element_id, action_reasoning=action.reasoning, elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML), ) json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) input_or_select_context = InputOrSelectContext.model_validate(json_response) LOG.info( "Parsed input/select context", context=input_or_select_context, task_id=task.task_id, step_id=step.step_id, ) if await skyvern_element.is_auto_completion_input() or input_or_select_context.is_location_input: if result := await input_or_auto_complete_input( input_or_select_context=input_or_select_context, page=page, dom=dom, text=text, skyvern_element=skyvern_element, step=step, task=task, ): return [result] await incremental_scraped.start_listen_dom_increment() try: await skyvern_element.input_sequentially(text=text) finally: incremental_element = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) if len(incremental_element) > 0: auto_complete_hacky_flag = True await incremental_scraped.stop_listen_dom_increment() return [ActionSuccess()] finally: # HACK: force to finish missing auto completion input if auto_complete_hacky_flag: LOG.debug( "Trigger input-selection hack, pressing Tab to choose one", action=action, ) await skyvern_element.press_key("Tab") async def handle_upload_file_action( action: actions.UploadFileAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: if not action.file_url: LOG.warning("InputFileAction has no file_url", action=action) return [ActionFailure(MissingFileUrl())] # ************************************************************************************************************** # # After this point if the file_url is a secret, it will be replaced with the actual value # In order to make sure we don't log the secret value, we log the action with the original value action.file_url # ************************************************************************************************************** # file_url = await get_actual_value_of_parameter_if_secret(task, action.file_url) decoded_url = urllib.parse.unquote(file_url) if file_url not in str(task.navigation_payload) and decoded_url not in str(task.navigation_payload): LOG.warning( "LLM might be imagining the file url, which is not in navigation payload", action=action, file_url=action.file_url, ) return [ActionFailure(ImaginaryFileUrl(action.file_url))] dom = DomUtil(scraped_page=scraped_page, page=page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) # dynamically validate the attr, since it could change into enabled after the previous actions if await skyvern_element.is_disabled(dynamic=True): LOG.warning( "Try to upload file on a disabled element", action_type=action.action_type, task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), ) return [ActionFailure(InteractWithDisabledElement(skyvern_element.get_id()))] locator = skyvern_element.locator file_path = await download_file(file_url) is_file_input = await skyvern_element.is_file_input() if is_file_input: LOG.info("Taking UploadFileAction. Found file input tag", action=action) if file_path: await locator.set_input_files( file_path, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, ) # Sleep for 10 seconds after uploading a file to let the page process it await asyncio.sleep(10) return [ActionSuccess()] else: return [ActionFailure(Exception(f"Failed to download file from {action.file_url}"))] else: LOG.info("Taking UploadFileAction. Found non file input tag", action=action) # treat it as a click action action.is_upload_file_tag = False return await chain_click( task, scraped_page, page, action, skyvern_element, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, ) # This function is deprecated. Downloads are handled by the click action handler now. async def handle_download_file_action( action: actions.DownloadFileAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: dom = DomUtil(scraped_page=scraped_page, page=page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) file_name = f"{action.file_name or uuid.uuid4()}" full_file_path = f"{REPO_ROOT_DIR}/downloads/{task.workflow_run_id or task.task_id}/{file_name}" try: # Start waiting for the download async with page.expect_download() as download_info: await asyncio.sleep(0.3) locator = skyvern_element.locator await locator.click( timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, modifiers=["Alt"], ) download = await download_info.value # Create download folders if they don't exist download_folder = f"{REPO_ROOT_DIR}/downloads/{task.workflow_run_id or task.task_id}" os.makedirs(download_folder, exist_ok=True) # Wait for the download process to complete and save the downloaded file await download.save_as(full_file_path) except Exception as e: LOG.exception( "DownloadFileAction: Failed to download file", action=action, full_file_path=full_file_path, ) return [ActionFailure(e)] return [ActionSuccess(data={"file_path": full_file_path})] async def handle_null_action( action: actions.NullAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: return [ActionSuccess()] async def handle_select_option_action( action: actions.SelectOptionAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: dom = DomUtil(scraped_page, page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) tag_name = skyvern_element.get_tag_name() element_dict = scraped_page.id_to_element_dict[action.element_id] LOG.info( "SelectOptionAction", action=action, tag_name=tag_name, element_dict=element_dict, ) # Handle the edge case: # Sometimes our custom select logic could fail, and leaving the dropdown being opened. # Confirm if the select action is on the custom option element if await skyvern_element.is_custom_option(): click_action = ClickAction(element_id=action.element_id) return await chain_click(task, scraped_page, page, click_action, skyvern_element) if not await skyvern_element.is_selectable(): # 1. find from children # TODO: 2. find from siblings and their chidren LOG.info( "Element is not selectable, try to find the selectable element in the chidren", tag_name=tag_name, action=action, ) selectable_child: SkyvernElement | None = None try: selectable_child = await skyvern_element.find_selectable_child(dom=dom) except Exception as e: LOG.error( "Failed to find selectable element in chidren", exc_info=True, tag_name=tag_name, action=action, ) return [ActionFailure(ErrFoundSelectableElement(action.element_id, e))] if selectable_child: LOG.info( "Found selectable element in the children", tag_name=selectable_child.get_tag_name(), element_id=selectable_child.get_id(), ) select_action = SelectOptionAction( reasoning=action.reasoning, element_id=selectable_child.get_id(), option=action.option, ) return await handle_select_option_action(select_action, page, scraped_page, task, step) # dynamically validate the attr, since it could change into enabled after the previous actions if await skyvern_element.is_disabled(dynamic=True): LOG.warning( "Try to select on a disabled element", action_type=action.action_type, task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), ) return [ActionFailure(InteractWithDisabledElement(skyvern_element.get_id()))] if tag_name == InteractiveElement.SELECT: LOG.info( "SelectOptionAction is on checkbox", action=action, task_id=task.task_id, step_id=step.step_id, ) check_action = CheckboxAction(element_id=action.element_id, is_checked=True) return await handle_checkbox_action(check_action, page, scraped_page, task, step) if await skyvern_element.is_radio(): LOG.info( "SelectOptionAction is on radio", action=action, task_id=task.task_id, step_id=step.step_id, ) click_action = ClickAction(element_id=action.element_id) return await chain_click(task, scraped_page, page, click_action, skyvern_element) # FIXME: maybe there's a case where could trigger dropdown menu? if await skyvern_element.is_btn_input(): LOG.info( "SelectOptionAction is on button", action=action, task_id=task.task_id, step_id=step.step_id, ) click_action = ClickAction(element_id=action.element_id) return await chain_click(task, scraped_page, page, click_action, skyvern_element) LOG.info( "Trigger custom select", action=action, ) timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS skyvern_frame = await SkyvernFrame.create_instance(skyvern_element.get_frame()) incremental_scraped = IncrementalScrapePage(skyvern_frame=skyvern_frame) is_open = False suggested_value: str | None = None results: list[ActionResult] = [] try: await incremental_scraped.start_listen_dom_increment() await skyvern_element.scroll_into_view() try: await skyvern_element.get_locator().click(timeout=timeout) except Exception: LOG.info( "fail to open dropdown by clicking, try to press ArrowDown to open", element_id=skyvern_element.get_id(), task_id=task.task_id, step_id=step.step_id, ) await skyvern_element.scroll_into_view() await skyvern_element.press_key("ArrowDown") # wait 5s for options to load await asyncio.sleep(5) is_open = True incremental_element = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) if len(incremental_element) == 0 and skyvern_element.get_tag_name() == InteractiveElement.INPUT: LOG.info( "No incremental elements detected for the input element, trying to press Arrowdown to trigger the dropdown", element_id=skyvern_element.get_id(), task_id=task.task_id, step_id=step.step_id, ) await skyvern_element.scroll_into_view() await skyvern_element.press_key("ArrowDown") # wait 5s for options to load await asyncio.sleep(5) incremental_element = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) if len(incremental_element) == 0: raise NoIncrementalElementFoundForCustomSelection(element_id=action.element_id) # TODO: support sequetially select from dropdown by value, just support single select now result, suggested_value = await sequentially_select_from_dropdown( action=action, page=page, dom=dom, skyvern_frame=skyvern_frame, incremental_scraped=incremental_scraped, step=step, task=task, force_select=True, ) # force_select won't return None result assert result is not None results.append(result) if isinstance(result, ActionSuccess) or suggested_value is None: return results except Exception as e: LOG.exception("Custom select error") results.append(ActionFailure(exception=e)) return results finally: if is_open and len(results) > 0 and not isinstance(results[-1], ActionSuccess): await skyvern_element.scroll_into_view() await skyvern_element.coordinate_click(page=page) await skyvern_element.press_key("Escape") is_open = False await skyvern_element.blur() await incremental_scraped.stop_listen_dom_increment() LOG.info( "Try to select by value in custom select", element_id=skyvern_element.get_id(), value=suggested_value, task_id=task.task_id, step_id=step.step_id, ) try: await incremental_scraped.start_listen_dom_increment() timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS await skyvern_element.scroll_into_view() try: await skyvern_element.get_locator().click(timeout=timeout) except Exception: LOG.info( "fail to open dropdown by clicking, try to press arrow down to open", element_id=skyvern_element.get_id(), task_id=task.task_id, step_id=step.step_id, ) await skyvern_element.scroll_into_view() await skyvern_element.press_key("ArrowDown") await asyncio.sleep(5) is_open = True result = await select_from_dropdown_by_value( value=suggested_value, page=page, dom=dom, skyvern_frame=skyvern_frame, incremental_scraped=incremental_scraped, task=task, step=step, ) results.append(result) return results except Exception as e: LOG.exception("Custom select by value error") results.append(ActionFailure(exception=e)) return results finally: if is_open and len(results) > 0 and not isinstance(results[-1], ActionSuccess): await skyvern_element.scroll_into_view() await skyvern_element.coordinate_click(page=page) await skyvern_element.press_key("Escape") await skyvern_element.blur() await incremental_scraped.stop_listen_dom_increment() async def handle_checkbox_action( action: actions.CheckboxAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: """ ******* NOT REGISTERED ******* This action causes more harm than it does good. It frequently mis-behaves, or gets stuck in click loops. Treating checkbox actions as click actions seem to perform way more reliably Developers who tried this and failed: 2 (Suchintan and Shu 😂) """ dom = DomUtil(scraped_page=scraped_page, page=page) skyvern_element = await dom.get_skyvern_element_by_id(action.element_id) locator = skyvern_element.locator if action.is_checked: await locator.check(timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) else: await locator.uncheck(timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) # TODO (suchintan): Why does checking the label work, but not the actual input element? return [ActionSuccess()] async def handle_wait_action( action: actions.WaitAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: await asyncio.sleep(20) return [ActionFailure(exception=Exception("Wait action is treated as a failure"))] async def handle_terminate_action( action: actions.TerminateAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: return [ActionSuccess()] async def handle_complete_action( action: actions.CompleteAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step, ) -> list[ActionResult]: # If this action has a source_action_id, then we need to make sure if the goal is actually completed. if action.source_action_id: LOG.info("CompleteAction has source_action_id, checking if goal is completed") complete_action = await app.agent.check_user_goal_complete(page, scraped_page, task, step) if complete_action is None: return [ ActionFailure( exception=IllegitComplete( data={ "error": "Cached complete action wasn't verified by LLM, fallback to default execution mode" } ) ) ] extracted_data = None if action.data_extraction_goal: scrape_action_result = await extract_information_for_navigation_goal( scraped_page=scraped_page, task=task, step=step, ) extracted_data = scrape_action_result.scraped_data return [ActionSuccess(data=extracted_data)] ActionHandler.register_action_type(ActionType.SOLVE_CAPTCHA, handle_solve_captcha_action) ActionHandler.register_action_type(ActionType.CLICK, handle_click_action) ActionHandler.register_action_type(ActionType.INPUT_TEXT, handle_input_text_action) ActionHandler.register_action_type(ActionType.UPLOAD_FILE, handle_upload_file_action) # ActionHandler.register_action_type(ActionType.DOWNLOAD_FILE, handle_download_file_action) ActionHandler.register_action_type(ActionType.NULL_ACTION, handle_null_action) ActionHandler.register_action_type(ActionType.SELECT_OPTION, handle_select_option_action) ActionHandler.register_action_type(ActionType.WAIT, handle_wait_action) ActionHandler.register_action_type(ActionType.TERMINATE, handle_terminate_action) ActionHandler.register_action_type(ActionType.COMPLETE, handle_complete_action) async def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> Any: """ Get the actual value of a parameter if it's a secret. If it's not a secret, return the parameter value as is. Just return the parameter value if the task isn't a workflow's task. This is only used for InputTextAction, UploadFileAction, and ClickAction (if it has a file_url). """ if task.workflow_run_id is None: return parameter workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id) secret_value = workflow_run_context.get_original_secret_value_or_none(parameter) if secret_value == BitwardenConstants.TOTP: secrets = await workflow_run_context.get_secrets_from_password_manager() secret_value = secrets[BitwardenConstants.TOTP] return secret_value if secret_value is not None else parameter async def chain_click( task: Task, scraped_page: ScrapedPage, page: Page, action: ClickAction | UploadFileAction, skyvern_element: SkyvernElement, timeout: int = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, ) -> List[ActionResult]: # Add a defensive page handler here in case a click action opens a file chooser. # This automatically dismisses the dialog # File choosers are impossible to close if you don't expect one. Instead of dealing with it, close it! locator = skyvern_element.locator # TODO (suchintan): This should likely result in an ActionFailure -- we can figure out how to do this later! LOG.info("Chain click starts", action=action, locator=locator) file: list[str] | str = [] if action.file_url: file_url = await get_actual_value_of_parameter_if_secret(task, action.file_url) try: file = await download_file(file_url) except Exception: LOG.exception( "Failed to download file, continuing without it", action=action, file_url=file_url, ) file = [] is_filechooser_trigger = False async def fc_func(fc: FileChooser) -> None: nonlocal is_filechooser_trigger is_filechooser_trigger = True await fc.set_files(files=file) page.on("filechooser", fc_func) LOG.info("Registered file chooser listener", action=action, path=file) """ Clicks on an element identified by the css and its parent if failed. :param css: css of the element to click """ try: await locator.click(timeout=timeout) LOG.info("Chain click: main element click succeeded", action=action, locator=locator) return [ActionSuccess()] except Exception: action_results: list[ActionResult] = [ActionFailure(FailToClick(action.element_id))] if skyvern_element.get_tag_name() == "label": LOG.info( "Chain click: it's a label element. going to try for-click", task_id=task.task_id, action=action, locator=locator, ) try: if bound_element := await skyvern_element.find_label_for( dom=DomUtil(scraped_page=scraped_page, page=page) ): await bound_element.get_locator().click(timeout=timeout) action_results.append(ActionSuccess()) return action_results except Exception: action_results.append(ActionFailure(FailToClick(action.element_id, anchor="for"))) if skyvern_element.get_tag_name() == InteractiveElement.INPUT: LOG.info( "Chain click: it's an input element. going to try sibling click", task_id=task.task_id, action=action, locator=locator, ) sibling_action_result = await click_sibling_of_input(locator, timeout=timeout) action_results.append(sibling_action_result) if isinstance(sibling_action_result, ActionSuccess): return action_results try: parent_locator = locator.locator("..") await parent_locator.click(timeout=timeout) LOG.info( "Chain click: successfully clicked parent element", action=action, parent_locator=parent_locator, ) action_results.append(ActionSuccess(interacted_with_parent=True)) except Exception: LOG.warning( "Failed to click parent element", action=action, parent_locator=parent_locator, exc_info=True, ) action_results.append( ActionFailure( FailToClick(action.element_id, anchor="parent"), interacted_with_parent=True, ) ) # We don't raise exception here because we do log the exception, and return ActionFailure as the last action return action_results finally: LOG.info("Remove file chooser listener", action=action) # Sleep for 15 seconds after uploading a file to let the page process it # Removing this breaks file uploads using the filechooser # KEREM DO NOT REMOVE if file: await asyncio.sleep(15) page.remove_listener("filechooser", fc_func) if action.file_url and not is_filechooser_trigger: LOG.warning( "Action has file_url, but filechoose even hasn't been triggered. Upload file attempt seems to fail", action=action, ) return [ActionFailure(WrongElementToUploadFile(action.element_id))] async def choose_auto_completion_dropdown( context: InputOrSelectContext, page: Page, dom: DomUtil, text: str, skyvern_element: SkyvernElement, step: Step, task: Task, preserved_elements: list[dict] | None = None, relevance_threshold: float = 0.8, ) -> AutoCompletionResult: preserved_elements = preserved_elements or [] clear_input = True result = AutoCompletionResult() current_frame = skyvern_element.get_frame() skyvern_frame = await SkyvernFrame.create_instance(current_frame) incremental_scraped = IncrementalScrapePage(skyvern_frame=skyvern_frame) await incremental_scraped.start_listen_dom_increment() try: await skyvern_element.press_fill(text) # wait for new elemnts to load await asyncio.sleep(5) incremental_element = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) # check if elements in preserve list are still on the page confirmed_preserved_list: list[dict] = [] for element in preserved_elements: element_id = element.get("id") if not element_id: continue locator = current_frame.locator(f'[{SKYVERN_ID_ATTR}="{element_id}"]') cnt = await locator.count() if cnt == 0: continue element_handler = await locator.element_handle( timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS ) if not element_handler: continue current_element = await skyvern_frame.parse_element_from_html( skyvern_element.get_frame_id(), element_handler, skyvern_element.is_interactable() ) confirmed_preserved_list.append(current_element) if len(confirmed_preserved_list) > 0: confirmed_preserved_list = await app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step)( skyvern_frame.get_frame(), skyvern_frame.get_frame().url, copy.deepcopy(confirmed_preserved_list) ) confirmed_preserved_list = trim_element_tree(copy.deepcopy(confirmed_preserved_list)) incremental_element.extend(confirmed_preserved_list) result.incremental_elements = copy.deepcopy(incremental_element) if len(incremental_element) == 0: raise NoIncrementalElementFoundForAutoCompletion(element_id=skyvern_element.get_id(), text=text) cleaned_incremental_element = remove_duplicated_HTML_element(incremental_element) html = incremental_scraped.build_html_tree(cleaned_incremental_element) auto_completion_confirm_prompt = prompt_engine.load_prompt( "auto-completion-choose-option", field_information=context.field, filled_value=text, navigation_goal=task.navigation_goal, navigation_payload_str=json.dumps(task.navigation_payload), elements=html, ) LOG.info( "Confirm if it's an auto completion dropdown", step_id=step.step_id, task_id=task.task_id, ) json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=auto_completion_confirm_prompt, step=step) element_id = json_response.get("id", "") relevance_float = json_response.get("relevance_float", 0) if not element_id: reasoning = json_response.get("reasoning") raise NoSuitableAutoCompleteOption(reasoning=reasoning, target_value=text) if relevance_float < relevance_threshold: LOG.info( f"The closest option doesn't meet the condition(relevance_float>={relevance_threshold})", element_id=element_id, relevance_float=relevance_float, ) reasoning = json_response.get("reasoning") raise NoAutoCompleteOptionMeetCondition( reasoning=reasoning, required_relevance=relevance_threshold, target_value=text, closest_relevance=relevance_float, ) LOG.info( "Find a suitable option to choose", element_id=element_id, step_id=step.step_id, task_id=task.task_id, ) locator = current_frame.locator(f'[{SKYVERN_ID_ATTR}="{element_id}"]') if await locator.count() == 0: raise MissingElement(element_id=element_id) await locator.click(timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) clear_input = False return result except Exception as e: LOG.info( "Failed to choose the auto completion dropdown", exc_info=True, input_value=text, task_id=task.task_id, step_id=step.step_id, ) result.action_result = ActionFailure(exception=e) return result finally: await incremental_scraped.stop_listen_dom_increment() if clear_input: await skyvern_element.input_clear() def remove_duplicated_HTML_element(elements: list[dict]) -> list[dict]: cache_map = set() new_elements: list[dict] = [] for element in elements: key = hash_element(element=element) if key in cache_map: continue cache_map.add(key) new_elements.append(element) return new_elements async def input_or_auto_complete_input( input_or_select_context: InputOrSelectContext, page: Page, dom: DomUtil, text: str, skyvern_element: SkyvernElement, step: Step, task: Task, ) -> ActionResult | None: LOG.info( "Trigger auto completion", task_id=task.task_id, step_id=step.step_id, element_id=skyvern_element.get_id(), ) # 1. press the orignal text to see if there's a match # 2. call LLM to find 5 potential values based on the orginal text # 3. try each potential values from #2 # 4. call LLM to tweak the orignal text according to the information from #3, then start #1 again # FIXME: try the whole loop for twice now, to prevent too many LLM calls MAX_AUTO_COMPLETE_ATTEMP = 2 current_attemp = 0 current_value = text result = AutoCompletionResult() while current_attemp < MAX_AUTO_COMPLETE_ATTEMP: current_attemp += 1 whole_new_elements: list[dict] = [] tried_values: list[str] = [] LOG.info( "Try the potential value for auto completion", step_id=step.step_id, task_id=task.task_id, input_value=current_value, ) result = await choose_auto_completion_dropdown( context=input_or_select_context, page=page, dom=dom, text=current_value, preserved_elements=result.incremental_elements, skyvern_element=skyvern_element, step=step, task=task, ) if isinstance(result.action_result, ActionSuccess): return ActionSuccess() if input_or_select_context.is_search_bar: LOG.info( "Stop generating potential values for the auto-completion since it's a search bar", context=input_or_select_context, step_id=step.step_id, task_id=task.task_id, ) return None tried_values.append(current_value) whole_new_elements.extend(result.incremental_elements) prompt = prompt_engine.load_prompt( "auto-completion-potential-answers", field_information=input_or_select_context.field, current_value=current_value, navigation_goal=task.navigation_goal, navigation_payload_str=json.dumps(task.navigation_payload), ) LOG.info( "Ask LLM to give 10 potential values based on the current value", current_value=current_value, step_id=step.step_id, task_id=task.task_id, ) json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) values: list[dict] = json_respone.get("potential_values", []) for each_value in values: value: str = each_value.get("value", "") if not value: LOG.info( "Empty potential value, skip this attempt", step_id=step.step_id, task_id=task.task_id, value=each_value, ) continue LOG.info( "Try the potential value for auto completion", step_id=step.step_id, task_id=task.task_id, input_value=value, ) result = await choose_auto_completion_dropdown( context=input_or_select_context, page=page, dom=dom, text=value, preserved_elements=result.incremental_elements, skyvern_element=skyvern_element, step=step, task=task, ) if isinstance(result.action_result, ActionSuccess): return ActionSuccess() tried_values.append(value) whole_new_elements.extend(result.incremental_elements) if current_attemp < MAX_AUTO_COMPLETE_ATTEMP: LOG.info( "Ask LLM to tweak the current value based on tried input values", step_id=step.step_id, task_id=task.task_id, current_value=current_value, current_attemp=current_attemp, ) cleaned_new_elements = remove_duplicated_HTML_element(whole_new_elements) prompt = prompt_engine.load_prompt( "auto-completion-tweak-value", field_information=input_or_select_context.field, current_value=current_value, navigation_goal=task.navigation_goal, navigation_payload_str=json.dumps(task.navigation_payload), tried_values=json.dumps(tried_values), popped_up_elements="".join([json_to_html(element) for element in cleaned_new_elements]), ) json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) context_reasoning = json_respone.get("reasoning") new_current_value = json_respone.get("tweaked_value", "") if not new_current_value: return ActionFailure(ErrEmptyTweakValue(reasoning=context_reasoning, current_value=current_value)) LOG.info( "Ask LLM tweaked the current value with a new value", step_id=step.step_id, task_id=task.task_id, field_information=input_or_select_context.field, current_value=current_value, new_value=new_current_value, ) current_value = new_current_value else: LOG.warning( "Auto completion didn't finish, this might leave the input value to be empty.", context=input_or_select_context, step_id=step.step_id, task_id=task.task_id, ) return None async def sequentially_select_from_dropdown( action: SelectOptionAction, page: Page, dom: DomUtil, skyvern_frame: SkyvernFrame, incremental_scraped: IncrementalScrapePage, step: Step, task: Task, dropdown_menu_element: SkyvernElement | None = None, force_select: bool = False, target_value: str = "", ) -> tuple[ActionResult | None, str | None]: """ TODO: support to return all values retrieved from the sequentially select Only return the last value today """ prompt = prompt_engine.load_prompt( "parse-input-or-select-context", action_reasoning=action.reasoning, element_id=action.element_id, elements=dom.scraped_page.build_element_tree(ElementTreeFormat.HTML), ) json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) input_or_select_context = InputOrSelectContext.model_validate(json_response) LOG.info( "Parsed input/select context", context=input_or_select_context, task_id=task.task_id, step_id=step.step_id, ) if not force_select and input_or_select_context.is_search_bar: LOG.info( "Exit custom selection mode since it's a non-force search bar", context=input_or_select_context, task_id=task.task_id, step_id=step.step_id, ) return None, None # TODO: only suport the third-level dropdown selection now MAX_SELECT_DEPTH = 3 values: list[str | None] = [] select_history: list[CustomSingleSelectResult] = [] check_exist_funcs: list[CheckExistIDFunc] = [dom.check_id_in_dom] for i in range(MAX_SELECT_DEPTH): single_select_result = await select_from_dropdown( context=input_or_select_context, page=page, skyvern_frame=skyvern_frame, incremental_scraped=incremental_scraped, check_exist_funcs=check_exist_funcs, step=step, task=task, dropdown_menu_element=dropdown_menu_element, select_history=select_history, force_select=force_select, target_value=target_value, ) select_history.append(single_select_result) values.append(single_select_result.value) # wait 1s until DOM finished updating await asyncio.sleep(1) if await single_select_result.is_done(): return single_select_result.action_result, values[-1] if len(values) > 0 else None if i == MAX_SELECT_DEPTH - 1: LOG.warning( "Reaching the max selection depth", depth=i, task_id=task.task_id, step_id=step.step_id, ) break LOG.info( "Seems to be a multi-level selection, continue to select until it finishes", selected_time=i + 1, task_id=task.task_id, step_id=step.step_id, ) # wait for 3s to load new options await asyncio.sleep(3) current_element_to_dict = copy.deepcopy(incremental_scraped.id_to_css_dict) check_exist_funcs.append(check_id_in_dict_factory(current_element_to_dict)) secondary_increment_element = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory( task=task, step=step, check_exist_funcs=check_exist_funcs, ) ) if len(secondary_increment_element) == 0: LOG.info( "No incremental element detected for the next level selection, going to quit the custom select mode", selected_time=i + 1, task_id=task.task_id, step_id=step.step_id, ) return single_select_result.action_result, values[-1] if len(values) > 0 else None return select_history[-1].action_result if len(select_history) > 0 else None, values[-1] if len( values ) > 0 else None def build_sequential_select_history(history_list: list[CustomSingleSelectResult]) -> list[dict[str, Any]]: result = [ { "reasoning": select_result.reasoning, "value": select_result.value, "result": "success" if isinstance(select_result.action_result, ActionSuccess) else "failed", } for select_result in history_list ] return result async def select_from_dropdown( context: InputOrSelectContext, page: Page, skyvern_frame: SkyvernFrame, incremental_scraped: IncrementalScrapePage, check_exist_funcs: list[CheckExistIDFunc], step: Step, task: Task, dropdown_menu_element: SkyvernElement | None = None, select_history: list[CustomSingleSelectResult] | None = None, force_select: bool = False, target_value: str = "", ) -> CustomSingleSelectResult: """ force_select: is used to choose an element to click even there's no dropdown menu; targe_value: only valid when force_select is "False". When target_value is not empty, the matched option must be relevent to target value; None will be only returned when: 1. force_select is false and no dropdown menu popped 2. force_select is false and match value is not relevant to the target value """ select_history = [] if select_history is None else select_history single_select_result = CustomSingleSelectResult(skyvern_frame=skyvern_frame) timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS if dropdown_menu_element is None: dropdown_menu_element = await locate_dropdown_menu( incremental_scraped=incremental_scraped, step=step, task=task, ) single_select_result.dropdown_menu = dropdown_menu_element if not force_select and dropdown_menu_element is None: return single_select_result if dropdown_menu_element: potential_scrollable_element = await try_to_find_potential_scrollable_element( skyvern_element=dropdown_menu_element, incremental_scraped=incremental_scraped, step=step, task=task, ) if await skyvern_frame.get_element_scrollable(await potential_scrollable_element.get_element_handler()): await scroll_down_to_load_all_options( scrollable_element=potential_scrollable_element, skyvern_frame=skyvern_frame, page=page, incremental_scraped=incremental_scraped, step=step, task=task, ) trimmed_element_tree = await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=check_exist_funcs), ) html = incremental_scraped.build_html_tree(element_tree=trimmed_element_tree) prompt = prompt_engine.load_prompt( "custom-select", field_information=context.field, required_field=context.is_required, target_value="" if force_select else target_value, navigation_goal=task.navigation_goal, navigation_payload_str=json.dumps(task.navigation_payload), elements=html, select_history=json.dumps(build_sequential_select_history(select_history)) if select_history else "", ) LOG.info( "Calling LLM to find the match element", step_id=step.step_id, task_id=task.task_id, ) json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step) value: str | None = json_response.get("value", None) single_select_result.value = value select_reason: str | None = json_response.get("reasoning", None) single_select_result.reasoning = select_reason LOG.info( "LLM response for the matched element", matched_value=value, response=json_response, step_id=step.step_id, task_id=task.task_id, ) action_type: str = json_response.get("action_type", "") action_type = action_type.upper() element_id: str | None = json_response.get("id", None) if not element_id or action_type not in ["CLICK", "INPUT_TEXT"]: raise NoAvailableOptionFoundForCustomSelection(reason=json_response.get("reasoning")) if not force_select and target_value: if not json_response.get("relevant", False): LOG.info( "The selected option is not relevant to the target value", element_id=element_id, task_id=task.task_id, step_id=step.step_id, ) return single_select_result if value is not None and action_type == "INPUT_TEXT": LOG.info( "No clickable option found, but found input element to search", element_id=element_id, task_id=task.task_id, step_id=step.step_id, ) try: input_element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id) await input_element.scroll_into_view() current_text = await get_input_value(input_element.get_tag_name(), input_element.get_locator()) if current_text == value: single_select_result.action_result = ActionSuccess() return single_select_result await input_element.input_clear() await input_element.input_sequentially(value) single_select_result.action_result = ActionSuccess() return single_select_result except Exception as e: single_select_result.action_result = ActionFailure(exception=e) return single_select_result try: selected_element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id) await selected_element.scroll_into_view() await selected_element.get_locator().click(timeout=timeout) single_select_result.action_result = ActionSuccess() return single_select_result except (MissingElement, MissingElementDict, MissingElementInCSSMap, MultipleElementsFound): if not value: raise # sometimes we have multiple elements pointed to the same value, # but only one option is clickable on the page LOG.debug( "Searching option with the same value in incremetal elements", value=value, elements=incremental_scraped.element_tree, ) locator = await incremental_scraped.select_one_element_by_value(value=value) if not locator: single_select_result.action_result = ActionFailure(exception=MissingElement()) return single_select_result try: LOG.info( "Find an alternative option with the same value. Try to select the option.", value=value, ) await locator.click(timeout=timeout) single_select_result.action_result = ActionSuccess() return single_select_result except Exception as e: single_select_result.action_result = ActionFailure(exception=e) return single_select_result async def select_from_dropdown_by_value( value: str, page: Page, skyvern_frame: SkyvernFrame, dom: DomUtil, incremental_scraped: IncrementalScrapePage, task: Task, step: Step, dropdown_menu_element: SkyvernElement | None = None, ) -> ActionResult: timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS await incremental_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) element_locator = await incremental_scraped.select_one_element_by_value(value=value) if element_locator is not None: await element_locator.click(timeout=timeout) return ActionSuccess() if dropdown_menu_element is None: dropdown_menu_element = await locate_dropdown_menu( incremental_scraped=incremental_scraped, step=step, task=task, ) if not dropdown_menu_element: raise NoElementMatchedForTargetOption(target=value, reason="No value matched") potential_scrollable_element = await try_to_find_potential_scrollable_element( skyvern_element=dropdown_menu_element, incremental_scraped=incremental_scraped, task=task, step=step, ) if not await skyvern_frame.get_element_scrollable(await potential_scrollable_element.get_element_handler()): raise NoElementMatchedForTargetOption( target=value, reason="No value matched and element can't scroll to find more options" ) selected: bool = False async def continue_callback(incre_scraped: IncrementalScrapePage) -> bool: await incre_scraped.get_incremental_element_tree( clean_and_remove_element_tree_factory(task=task, step=step, check_exist_funcs=[dom.check_id_in_dom]), ) element_locator = await incre_scraped.select_one_element_by_value(value=value) if element_locator is not None: await element_locator.click(timeout=timeout) nonlocal selected selected = True return False return True await scroll_down_to_load_all_options( scrollable_element=potential_scrollable_element, page=page, skyvern_frame=skyvern_frame, incremental_scraped=incremental_scraped, step=step, task=task, page_by_page=True, is_continue=continue_callback, ) if selected: return ActionSuccess() raise NoElementMatchedForTargetOption(target=value, reason="No value matched after scrolling") async def locate_dropdown_menu( incremental_scraped: IncrementalScrapePage, step: Step, task: Task, ) -> SkyvernElement | None: skyvern_frame = incremental_scraped.skyvern_frame for idx, element_dict in enumerate(incremental_scraped.element_tree): # FIXME: confirm max to 10 nodes for now, preventing sendindg too many requests to LLM if idx >= 10: break element_id = element_dict.get("id") if not element_id: LOG.debug( "Skip the element without id for the dropdown menu confirm", step_id=step.step_id, task_id=task.task_id, element=element_dict, ) continue try: head_element = await SkyvernElement.create_from_incremental(incremental_scraped, element_id) except Exception: LOG.debug( "Failed to get head element in the incremental page", element_id=element_id, step_id=step.step_id, task_id=task.task_id, exc_info=True, ) continue if not await skyvern_frame.get_element_visible(await head_element.get_element_handler()): LOG.debug( "Skip the element since it's invisible", step_id=step.step_id, task_id=task.task_id, element_id=element_id, ) continue ul_or_listbox_element_id = await head_element.find_children_element_id_by_callback( cb=is_ul_or_listbox_element_factory(incremental_scraped=incremental_scraped, task=task, step=step), ) if ul_or_listbox_element_id: try: await SkyvernElement.create_from_incremental(incremental_scraped, ul_or_listbox_element_id) LOG.info( "Confirm it's an opened dropdown menu since it includes