general selection (#675)

This commit is contained in:
LawyZheng
2024-08-06 13:30:52 +08:00
committed by GitHub
parent 845ae8d3e4
commit cba0f68a5e
10 changed files with 655 additions and 241 deletions

View File

@@ -96,6 +96,9 @@ def json_to_html(element: dict) -> str:
attributes_html = " ".join(build_attribute(key, value) for key, value in attributes.items())
tag = element["tagName"]
if element.get("isSelectable", False):
tag = "select"
text = element.get("text", "")
# build children HTML
children_html = "".join(json_to_html(child) for child in element.get("children", []))
@@ -112,6 +115,21 @@ def json_to_html(element: dict) -> str:
return f'<{tag}{attributes_html if not attributes_html else " "+attributes_html}>{text}{children_html+option_html}</{tag}>'
def build_element_dict(elements: list[dict]) -> tuple[dict[str, str], dict[str, dict], dict[str, str]]:
id_to_css_dict: dict[str, str] = {}
id_to_element_dict: dict[str, dict] = {}
id_to_frame_dict: dict[str, str] = {}
for element in elements:
element_id: str = element.get("id", "")
# get_interactable_element_tree marks each interactable element with a unique_id attribute
id_to_css_dict[element_id] = f"[{SKYVERN_ID_ATTR}='{element_id}']"
id_to_element_dict[element_id] = element
id_to_frame_dict[element_id] = element["frame"]
return id_to_css_dict, id_to_element_dict, id_to_frame_dict
class ElementTreeFormat(StrEnum):
JSON = "json"
HTML = "html"
@@ -266,16 +284,7 @@ async def scrape_web_unsafe(
elements, element_tree = await get_interactable_element_tree(page, scrape_exclude)
element_tree = await cleanup_element_tree(url, copy.deepcopy(element_tree))
id_to_css_dict = {}
id_to_element_dict = {}
id_to_frame_dict = {}
for element in elements:
element_id = element["id"]
# get_interactable_element_tree marks each interactable element with a unique_id attribute
id_to_css_dict[element_id] = f"[{SKYVERN_ID_ATTR}='{element_id}']"
id_to_element_dict[element_id] = element
id_to_frame_dict[element_id] = element["frame"]
id_to_css_dict, id_to_element_dict, id_to_frame_dict = build_element_dict(elements)
text_content = await get_frame_text(page.main_frame)
@@ -378,6 +387,65 @@ async def get_interactable_element_tree(
return elements, element_tree
class IncrementalScrapePage:
id_to_element_dict: dict[str, dict] = {}
id_to_css_dict: dict[str, str]
elements: list[dict]
element_tree: list[dict]
element_tree_trimmed: list[dict]
def __init__(self, skyvern_frame: SkyvernFrame) -> None:
self.skyvern_frame = skyvern_frame
async def get_incremental_element_tree(
self,
cleanup_element_tree: Callable[[str, list[dict]], Awaitable[list[dict]]],
) -> list[dict]:
frame = self.skyvern_frame.get_frame()
frame_id = "main.frame"
if isinstance(frame, Frame):
try:
frame_element = await frame.frame_element()
frame_id = await frame_element.get_attribute("unique_id")
except Exception:
# TODO: do we really care about the frame_id ?
LOG.warning(
"Unable to get frame_element",
exc_info=True,
)
js_script = f"async () => await getIncrementElements('{frame_id}')"
incremental_elements, incremental_tree = await frame.evaluate(js_script)
# we listen the incremental elements seperated by frames, so all elements will be in the same SkyvernFrame
self.id_to_css_dict, self.id_to_element_dict, _ = build_element_dict(incremental_elements)
self.elements = incremental_elements
incremental_tree = await cleanup_element_tree(frame.url, copy.deepcopy(incremental_tree))
trimmed_element_tree = trim_element_tree(copy.deepcopy(incremental_tree))
self.element_tree = incremental_tree
self.element_tree_trimmed = trimmed_element_tree
return self.element_tree_trimmed
async def start_listen_dom_increment(self) -> None:
js_script = "() => startGlobalIncrementalObserver()"
await self.skyvern_frame.get_frame().evaluate(js_script)
async def stop_listen_dom_increment(self) -> None:
js_script = "() => stopGlobalIncrementalObserver()"
await self.skyvern_frame.get_frame().evaluate(js_script)
async def get_incremental_elements_num(self) -> int:
js_script = "() => window.globalOneTimeIncrementElements.length"
return await self.skyvern_frame.get_frame().evaluate(js_script)
def build_html_tree(self) -> str:
return "".join([json_to_html(element) for element in self.element_tree_trimmed])
def trim_element_tree(elements: list[dict]) -> list[dict]:
queue = []
for element in elements: