general autocomplete solution (#713)

This commit is contained in:
LawyZheng
2024-08-21 10:54:32 +08:00
committed by GitHub
parent ef95dc6eca
commit 8baa8de032
9 changed files with 610 additions and 128 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import copy
import json
import os
import urllib.parse
@@ -9,13 +10,16 @@ from typing import Any, Awaitable, Callable, List
import structlog
from deprecation import deprecated
from playwright.async_api import FileChooser, Locator, Page, TimeoutError
from pydantic import BaseModel
from skyvern.constants import REPO_ROOT_DIR, VERIFICATION_CODE_POLLING_TIMEOUT_MINS
from skyvern.constants import REPO_ROOT_DIR, SKYVERN_ID_ATTR, VERIFICATION_CODE_POLLING_TIMEOUT_MINS
from skyvern.exceptions import (
EmptySelect,
ErrEmptyTweakValue,
ErrFoundSelectableElement,
FailedToFetchSecret,
FailToClick,
FailToFindAutocompleteOption,
FailToSelectByIndex,
FailToSelectByLabel,
FailToSelectByValue,
@@ -24,9 +28,12 @@ from skyvern.exceptions import (
MissingElement,
MissingFileUrl,
MultipleElementsFound,
NoAutoCompleteOptionMeetCondition,
NoElementMatchedForTargetOption,
NoIncrementalElementFoundForAutoCompletion,
NoIncrementalElementFoundForCustomSelection,
NoLabelOrValueForCustomSelection,
NoSuitableAutoCompleteOption,
OptionIndexOutOfBound,
WrongElementToUploadFile,
)
@@ -59,7 +66,13 @@ from skyvern.webeye.actions.actions import (
)
from skyvern.webeye.actions.responses import ActionFailure, ActionResult, ActionSuccess
from skyvern.webeye.browser_factory import BrowserState, get_download_dir
from skyvern.webeye.scraper.scraper import ElementTreeFormat, IncrementalScrapePage, ScrapedPage
from skyvern.webeye.scraper.scraper import (
ElementTreeFormat,
IncrementalScrapePage,
ScrapedPage,
json_to_html,
trim_element_tree,
)
from skyvern.webeye.utils.dom import DomUtil, InteractiveElement, SkyvernElement
from skyvern.webeye.utils.page import SkyvernFrame
@@ -67,6 +80,12 @@ LOG = structlog.get_logger()
COMMON_INPUT_TAGS = {"input", "textarea", "select"}
class AutoCompletionResult(BaseModel):
auto_completion_attempt: bool = False
incremental_elements: list[dict] = []
action_result: ActionResult = ActionSuccess()
class ActionHandler:
_handled_action_types: dict[
ActionType,
@@ -290,6 +309,7 @@ async def handle_input_text_action(
dom = DomUtil(scraped_page, page)
skyvern_element = await dom.get_skyvern_element_by_id(action.element_id)
skyvern_frame = await SkyvernFrame.create_instance(skyvern_element.get_frame())
incremental_scraped = IncrementalScrapePage(skyvern_frame=skyvern_frame)
timeout = SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
current_text = await get_input_value(skyvern_element.get_tag_name(), skyvern_element.get_locator())
@@ -319,7 +339,6 @@ async def handle_input_text_action(
return await handle_select_option_action(select_action, page, scraped_page, task, step)
# press arrowdown to watch if there's any options popping up
incremental_scraped = IncrementalScrapePage(skyvern_frame=skyvern_frame)
await incremental_scraped.start_listen_dom_increment()
await skyvern_element.get_locator().focus(timeout=timeout)
await skyvern_element.get_locator().press("ArrowDown", timeout=timeout)
@@ -376,12 +395,26 @@ async def handle_input_text_action(
LOG.warning("Failed to clear the input field", action=action, exc_info=True)
return [ActionFailure(InvalidElementForTextInput(element_id=action.element_id, tag_name=tag_name))]
# TODO: not sure if this case will trigger auto-completion
if tag_name not in COMMON_INPUT_TAGS:
await skyvern_element.input_fill(text)
return [ActionSuccess()]
# If the input is a text input, we type the text character by character
# 3 times the time it takes to type the text so it has time to finish typing
if len(text) == 0:
return [ActionSuccess()]
if await skyvern_element.is_auto_completion_input():
result = await input_or_auto_complete_input(
action=action,
page=page,
dom=dom,
text=text,
skyvern_element=skyvern_element,
step=step,
task=task,
)
return [result]
await skyvern_element.input_sequentially(text=text)
return [ActionSuccess()]
@@ -848,6 +881,282 @@ async def chain_click(
return [ActionFailure(WrongElementToUploadFile(action.element_id))]
def remove_exist_elements(dom: DomUtil, element_tree: list[dict]) -> list[dict]:
new_element_tree = []
for element in element_tree:
children_elements = element.get("children", [])
if len(children_elements) > 0:
children_elements = remove_exist_elements(dom=dom, element_tree=children_elements)
if dom.check_id_in_dom(element.get("id", "")):
new_element_tree.extend(children_elements)
else:
element["children"] = children_elements
new_element_tree.append(element)
return new_element_tree
async def choose_auto_completion_dropdown(
action: actions.InputTextAction,
page: Page,
dom: DomUtil,
text: str,
skyvern_element: SkyvernElement,
step: Step,
task: Task,
preserved_elements: list[dict] | None = None,
relevance_threshold: float = 0.8,
) -> AutoCompletionResult:
preserved_elements = preserved_elements or []
clear_input = True
result = AutoCompletionResult()
current_frame = skyvern_element.get_frame()
skyvern_frame = await SkyvernFrame.create_instance(current_frame)
incremental_scraped = IncrementalScrapePage(skyvern_frame=skyvern_frame)
await incremental_scraped.start_listen_dom_increment()
try:
await skyvern_element.press_fill(text)
# wait for new elemnts to load
await asyncio.sleep(5)
incremental_element = await incremental_scraped.get_incremental_element_tree(
app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step)
)
incremental_element = remove_exist_elements(dom=dom, element_tree=incremental_element)
# check if elements in preserve list are still on the page
confirmed_preserved_list: list[dict] = []
for element in preserved_elements:
element_id = element.get("id")
if not element_id:
continue
locator = current_frame.locator(f'[{SKYVERN_ID_ATTR}="{element_id}"]')
cnt = await locator.count()
if cnt == 0:
continue
element_handler = await locator.element_handle(
timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS
)
if not element_handler:
continue
current_element = await skyvern_frame.parse_element_from_html(
skyvern_element.get_frame_id(), element_handler, skyvern_element.is_interactable()
)
confirmed_preserved_list.append(current_element)
if len(confirmed_preserved_list) > 0:
confirmed_preserved_list = await app.AGENT_FUNCTION.cleanup_element_tree_factory(task=task, step=step)(
skyvern_frame.get_frame().url, copy.deepcopy(confirmed_preserved_list)
)
confirmed_preserved_list = trim_element_tree(copy.deepcopy(confirmed_preserved_list))
incremental_element.extend(confirmed_preserved_list)
result.incremental_elements = copy.deepcopy(incremental_element)
if len(incremental_element) == 0:
raise NoIncrementalElementFoundForAutoCompletion(element_id=skyvern_element.get_id(), text=text)
html = incremental_scraped.build_html_tree(incremental_element)
auto_completion_confirm_prompt = prompt_engine.load_prompt(
"auto-completion-choose-option",
context_reasoning=action.reasoning,
filled_value=text,
elements=html,
)
LOG.info(
"Confirm if it's an auto completion dropdown",
step_id=step.step_id,
task_id=task.task_id,
)
json_response = await app.SECONDARY_LLM_API_HANDLER(prompt=auto_completion_confirm_prompt, step=step)
element_id = json_response.get("id", "")
relevance_float = json_response.get("relevance_float", 0)
if not element_id:
reasoning = json_response.get("reasoning")
raise NoSuitableAutoCompleteOption(reasoning=reasoning, target_value=text)
if relevance_float < relevance_threshold:
LOG.info(
f"The closest option doesn't meet the condition(relevance_float>={relevance_threshold})",
element_id=element_id,
relevance_float=relevance_float,
)
reasoning = json_response.get("reasoning")
raise NoAutoCompleteOptionMeetCondition(
reasoning=reasoning,
required_relevance=relevance_threshold,
target_value=text,
closest_relevance=relevance_float,
)
LOG.info(
"Find a suitable option to choose",
element_id=element_id,
step_id=step.step_id,
task_id=task.task_id,
)
locator = current_frame.locator(f'[{SKYVERN_ID_ATTR}="{element_id}"]')
if await locator.count() == 0:
raise MissingElement(element_id=element_id)
await locator.click(timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS)
clear_input = False
return result
except Exception as e:
LOG.info(
"Failed to choose the auto completion dropdown",
exc_info=True,
input_value=text,
task_id=task.task_id,
step_id=step.step_id,
)
result.action_result = ActionFailure(exception=e)
return result
finally:
await incremental_scraped.stop_listen_dom_increment()
if clear_input:
await skyvern_element.input_clear()
async def input_or_auto_complete_input(
action: actions.InputTextAction,
page: Page,
dom: DomUtil,
text: str,
skyvern_element: SkyvernElement,
step: Step,
task: Task,
) -> ActionResult:
LOG.info(
"Trigger auto completion",
task_id=task.task_id,
step_id=step.step_id,
element_id=skyvern_element.get_id(),
)
# 1. press the orignal text to see if there's a match
# 2. call LLM to find 5 potential values based on the orginal text
# 3. try each potential values from #2
# 4. call LLM to tweak the orignal text according to the information from #3, then start #1 again
# FIXME: try the whole loop for twice now, to prevent too many LLM calls
MAX_AUTO_COMPLETE_ATTEMP = 2
current_attemp = 0
context_reasoning = action.reasoning
current_value = text
result = AutoCompletionResult()
while current_attemp < MAX_AUTO_COMPLETE_ATTEMP:
current_attemp += 1
whole_new_elements: list[dict] = []
tried_values: list[str] = []
LOG.info(
"Try the potential value for auto completion",
step_id=step.step_id,
task_id=task.task_id,
input_value=current_value,
)
result = await choose_auto_completion_dropdown(
action=action,
page=page,
dom=dom,
text=current_value,
preserved_elements=result.incremental_elements,
skyvern_element=skyvern_element,
step=step,
task=task,
)
if isinstance(result.action_result, ActionSuccess):
return ActionSuccess()
tried_values.append(current_value)
whole_new_elements.extend(result.incremental_elements)
prompt = prompt_engine.load_prompt(
"auto-completion-potential-answers",
context_reasoning=context_reasoning,
current_value=current_value,
)
LOG.info(
"Ask LLM to give 10 potential values based on the current value",
current_value=current_value,
step_id=step.step_id,
task_id=task.task_id,
)
json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
values: list[dict] = json_respone.get("potential_values", [])
for each_value in values:
value: str = each_value.get("value", "")
if not value:
LOG.info(
"Empty potential value, skip this attempt",
step_id=step.step_id,
task_id=task.task_id,
value=each_value,
)
continue
LOG.info(
"Try the potential value for auto completion",
step_id=step.step_id,
task_id=task.task_id,
input_value=value,
)
result = await choose_auto_completion_dropdown(
action=action,
page=page,
dom=dom,
text=value,
preserved_elements=result.incremental_elements,
skyvern_element=skyvern_element,
step=step,
task=task,
)
if isinstance(result.action_result, ActionSuccess):
return ActionSuccess()
tried_values.append(value)
whole_new_elements.extend(result.incremental_elements)
if current_attemp < MAX_AUTO_COMPLETE_ATTEMP:
LOG.info(
"Ask LLM to tweak the current value based on tried input values",
step_id=step.step_id,
task_id=task.task_id,
current_value=current_value,
current_attemp=current_attemp,
)
prompt = prompt_engine.load_prompt(
"auto-completion-tweak-value",
context_reasoning=context_reasoning,
current_value=current_value,
tried_values=json.dumps(tried_values),
popped_up_elements="".join([json_to_html(element) for element in whole_new_elements]),
)
json_respone = await app.SECONDARY_LLM_API_HANDLER(prompt=prompt, step=step)
context_reasoning = json_respone.get("reasoning")
new_current_value = json_respone.get("tweaked_value", "")
if not new_current_value:
return ActionFailure(ErrEmptyTweakValue(reasoning=context_reasoning, current_value=current_value))
LOG.info(
"Ask LLM tweaked the current value with a new value",
step_id=step.step_id,
task_id=task.task_id,
reasoning=context_reasoning,
current_value=current_value,
new_value=new_current_value,
)
current_value = new_current_value
else:
return ActionFailure(FailToFindAutocompleteOption(current_value=text))
async def select_from_dropdown(
action: SelectOptionAction,
page: Page,