iframes support (#405)
Co-authored-by: Aleksei Zarubin <12220926+alexzarbn@users.noreply.github.com>
This commit is contained in:
@@ -6,7 +6,7 @@ from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
from playwright.async_api import Page
|
||||
from playwright.async_api import Frame, Page
|
||||
from pydantic import BaseModel
|
||||
|
||||
from skyvern.constants import SKYVERN_DIR, SKYVERN_ID_ATTR
|
||||
@@ -122,6 +122,7 @@ class ScrapedPage(BaseModel):
|
||||
|
||||
elements: list[dict]
|
||||
id_to_element_dict: dict[str, dict] = {}
|
||||
id_to_frame_dict: dict[str, str] = {}
|
||||
id_to_xpath_dict: dict[str, str]
|
||||
element_tree: list[dict]
|
||||
element_tree_trimmed: list[dict]
|
||||
@@ -187,14 +188,30 @@ async def scrape_website(
|
||||
)
|
||||
|
||||
|
||||
async def get_all_visible_text(page: Page) -> str:
|
||||
async def get_frame_text(iframe: Frame) -> str:
|
||||
"""
|
||||
Get all the visible text on the page.
|
||||
:param page: Page instance to get the text from.
|
||||
:return: All the visible text on the page.
|
||||
Get all the visible text in the iframe.
|
||||
:param iframe: Frame instance to get the text from.
|
||||
:return: All the visible text from the iframe.
|
||||
"""
|
||||
js_script = "() => document.body.innerText"
|
||||
return await page.evaluate(js_script)
|
||||
|
||||
try:
|
||||
text = await iframe.evaluate(js_script)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"failed to get text from iframe",
|
||||
exc_info=True,
|
||||
)
|
||||
return ""
|
||||
|
||||
for child_frame in iframe.child_frames:
|
||||
if child_frame.is_detached():
|
||||
continue
|
||||
|
||||
text += await get_frame_text(child_frame)
|
||||
|
||||
return text
|
||||
|
||||
|
||||
async def scrape_web_unsafe(
|
||||
@@ -256,17 +273,22 @@ async def scrape_web_unsafe(
|
||||
|
||||
id_to_xpath_dict = {}
|
||||
id_to_element_dict = {}
|
||||
id_to_frame_dict = {}
|
||||
|
||||
for element in elements:
|
||||
element_id = element["id"]
|
||||
# get_interactable_element_tree marks each interactable element with a unique_id attribute
|
||||
id_to_xpath_dict[element_id] = f"//*[@{SKYVERN_ID_ATTR}='{element_id}']"
|
||||
id_to_element_dict[element_id] = element
|
||||
id_to_frame_dict[element_id] = element["frame"]
|
||||
|
||||
text_content = await get_frame_text(page.main_frame)
|
||||
|
||||
text_content = await get_all_visible_text(page)
|
||||
return ScrapedPage(
|
||||
elements=elements,
|
||||
id_to_xpath_dict=id_to_xpath_dict,
|
||||
id_to_element_dict=id_to_element_dict,
|
||||
id_to_frame_dict=id_to_frame_dict,
|
||||
element_tree=element_tree,
|
||||
element_tree_trimmed=trim_element_tree(copy.deepcopy(element_tree)),
|
||||
screenshots=screenshots,
|
||||
@@ -276,6 +298,47 @@ async def scrape_web_unsafe(
|
||||
)
|
||||
|
||||
|
||||
async def get_interactable_element_tree_in_frame(
|
||||
frames: list[Frame], elements: list[dict], element_tree: list[dict]
|
||||
) -> tuple[list[dict], list[dict]]:
|
||||
for frame in frames:
|
||||
if frame.is_detached():
|
||||
continue
|
||||
|
||||
try:
|
||||
frame_element = await frame.frame_element()
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Unable to get frame_element",
|
||||
exc_info=True,
|
||||
)
|
||||
continue
|
||||
|
||||
unique_id = await frame_element.get_attribute("unique_id")
|
||||
|
||||
frame_js_script = f"() => buildTreeFromBody('{unique_id}')"
|
||||
|
||||
await frame.evaluate(JS_FUNCTION_DEFS)
|
||||
frame_elements, frame_element_tree = await frame.evaluate(frame_js_script)
|
||||
|
||||
if len(frame.child_frames) > 0:
|
||||
frame_elements, frame_element_tree = await get_interactable_element_tree_in_frame(
|
||||
frame.child_frames, frame_elements, frame_element_tree
|
||||
)
|
||||
|
||||
for element in elements:
|
||||
if element["id"] == unique_id:
|
||||
element["children"] = frame_elements
|
||||
|
||||
for element_tree_item in element_tree:
|
||||
if element_tree_item["id"] == unique_id:
|
||||
element_tree_item["children"] = frame_element_tree
|
||||
|
||||
elements = elements + frame_elements
|
||||
|
||||
return elements, element_tree
|
||||
|
||||
|
||||
async def get_interactable_element_tree(page: Page) -> tuple[list[dict], list[dict]]:
|
||||
"""
|
||||
Get the element tree of the page, including all the elements that are interactable.
|
||||
@@ -283,8 +346,14 @@ async def get_interactable_element_tree(page: Page) -> tuple[list[dict], list[di
|
||||
:return: Tuple containing the element tree and a map of element IDs to elements.
|
||||
"""
|
||||
await page.evaluate(JS_FUNCTION_DEFS)
|
||||
js_script = "() => buildTreeFromBody()"
|
||||
elements, element_tree = await page.evaluate(js_script)
|
||||
main_frame_js_script = "() => buildTreeFromBody('main.frame')"
|
||||
elements, element_tree = await page.evaluate(main_frame_js_script)
|
||||
|
||||
if len(page.main_frame.child_frames) > 0:
|
||||
elements, element_tree = await get_interactable_element_tree_in_frame(
|
||||
page.main_frame.child_frames, elements, element_tree
|
||||
)
|
||||
|
||||
return elements, element_tree
|
||||
|
||||
|
||||
@@ -352,6 +421,9 @@ def trim_element_tree(elements: list[dict]) -> list[dict]:
|
||||
queue.append(element)
|
||||
while queue:
|
||||
queue_ele = queue.pop(0)
|
||||
if "frame" in queue_ele:
|
||||
del queue_ele["frame"]
|
||||
|
||||
if "attributes" in queue_ele:
|
||||
tag_name = queue_ele["tagName"] if "tagName" in queue_ele else ""
|
||||
new_attributes = _trimmed_attributes(tag_name, queue_ele["attributes"])
|
||||
|
||||
Reference in New Issue
Block a user