current viewpoint screenshot and scrolling n screenshot (#2716)
Co-authored-by: lawyzheng <lawyzheng1106@gmail.com>
This commit is contained in:
@@ -2,9 +2,12 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from enum import StrEnum
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
from PIL import Image
|
||||
from playwright._impl._errors import TimeoutError
|
||||
from playwright.async_api import ElementHandle, Frame, Page
|
||||
|
||||
@@ -31,17 +34,24 @@ def load_js_script() -> str:
|
||||
JS_FUNCTION_DEFS = load_js_script()
|
||||
|
||||
|
||||
class ScreenshotMode(StrEnum):
|
||||
LITE = "lite"
|
||||
DETAILED = "detailed"
|
||||
|
||||
|
||||
async def _current_viewpoint_screenshot_helper(
|
||||
page: Page,
|
||||
file_path: str | None = None,
|
||||
full_page: bool = False,
|
||||
timeout: float = settings.BROWSER_SCREENSHOT_TIMEOUT_MS,
|
||||
mode: ScreenshotMode = ScreenshotMode.DETAILED,
|
||||
) -> bytes:
|
||||
if page.is_closed():
|
||||
raise FailedToTakeScreenshot(error_message="Page is closed")
|
||||
try:
|
||||
await page.wait_for_load_state(timeout=settings.BROWSER_LOADING_TIMEOUT_MS)
|
||||
LOG.debug("Page is fully loaded, agent is about to take screenshots")
|
||||
if mode == ScreenshotMode.DETAILED:
|
||||
await page.wait_for_load_state(timeout=settings.BROWSER_LOADING_TIMEOUT_MS)
|
||||
LOG.debug("Page is fully loaded, agent is about to take screenshots")
|
||||
start_time = time.time()
|
||||
screenshot: bytes = b""
|
||||
if file_path:
|
||||
@@ -77,6 +87,7 @@ async def _scrolling_screenshots_helper(
|
||||
url: str | None = None,
|
||||
draw_boxes: bool = False,
|
||||
max_number: int = settings.MAX_NUM_SCREENSHOTS,
|
||||
mode: ScreenshotMode = ScreenshotMode.DETAILED,
|
||||
) -> list[bytes]:
|
||||
skyvern_page = await SkyvernFrame.create_instance(frame=page)
|
||||
# page is the main frame and the index must be 0
|
||||
@@ -84,6 +95,11 @@ async def _scrolling_screenshots_helper(
|
||||
frame = "main.frame"
|
||||
frame_index = 0
|
||||
|
||||
# when mode is lite, we don't draw bounding boxes
|
||||
# since draw_boxes impacts the performance of processing
|
||||
if mode == ScreenshotMode.LITE:
|
||||
draw_boxes = False
|
||||
|
||||
screenshots: list[bytes] = []
|
||||
if await skyvern_page.is_window_scrollable():
|
||||
scroll_y_px_old = -30.0
|
||||
@@ -92,12 +108,15 @@ async def _scrolling_screenshots_helper(
|
||||
# We are checking the difference between the old and new scroll_y_px to determine if we have reached the end of the
|
||||
# page. If the difference is less than 25, we assume we have reached the end of the page.
|
||||
while abs(scroll_y_px_old - scroll_y_px) > 25 and len(screenshots) < max_number:
|
||||
screenshot = await _current_viewpoint_screenshot_helper(page=skyvern_page.frame)
|
||||
screenshot = await _current_viewpoint_screenshot_helper(page=skyvern_page.frame, mode=mode)
|
||||
screenshots.append(screenshot)
|
||||
scroll_y_px_old = scroll_y_px
|
||||
LOG.debug("Scrolling to next page", url=url, num_screenshots=len(screenshots))
|
||||
scroll_y_px = await skyvern_page.scroll_to_next_page(
|
||||
draw_boxes=draw_boxes, frame=frame, frame_index=frame_index
|
||||
draw_boxes=draw_boxes,
|
||||
frame=frame,
|
||||
frame_index=frame_index,
|
||||
need_overlap=(mode == ScreenshotMode.DETAILED),
|
||||
)
|
||||
LOG.debug(
|
||||
"Scrolled to next page",
|
||||
@@ -107,15 +126,17 @@ async def _scrolling_screenshots_helper(
|
||||
if draw_boxes:
|
||||
await skyvern_page.remove_bounding_boxes()
|
||||
await skyvern_page.scroll_to_top(draw_boxes=False, frame=frame, frame_index=frame_index)
|
||||
# wait until animation ends, which is triggered by scrolling
|
||||
LOG.debug("Waiting for 2 seconds until animation ends.")
|
||||
await asyncio.sleep(2)
|
||||
|
||||
if mode == ScreenshotMode.DETAILED:
|
||||
# wait until animation ends, which is triggered by scrolling
|
||||
LOG.debug("Waiting for 2 seconds until animation ends.")
|
||||
await asyncio.sleep(2)
|
||||
else:
|
||||
if draw_boxes:
|
||||
await skyvern_page.build_elements_and_draw_bounding_boxes(frame=frame, frame_index=frame_index)
|
||||
|
||||
LOG.debug("Page is not scrollable", url=url, num_screenshots=len(screenshots))
|
||||
screenshot = await _current_viewpoint_screenshot_helper(page=skyvern_page.frame)
|
||||
screenshot = await _current_viewpoint_screenshot_helper(page=skyvern_page.frame, mode=mode)
|
||||
screenshots.append(screenshot)
|
||||
|
||||
if draw_boxes:
|
||||
@@ -144,28 +165,85 @@ class SkyvernFrame:
|
||||
return await SkyvernFrame.evaluate(frame=frame, expression="() => document.location.href")
|
||||
|
||||
@staticmethod
|
||||
async def take_screenshot(
|
||||
async def take_scrolling_screenshot(
|
||||
page: Page,
|
||||
full_page: bool = False,
|
||||
file_path: str | None = None,
|
||||
timeout: float = settings.BROWSER_SCREENSHOT_TIMEOUT_MS,
|
||||
mode: ScreenshotMode = ScreenshotMode.DETAILED,
|
||||
scrolling_number: int = settings.MAX_NUM_SCREENSHOTS,
|
||||
use_playwright_fullpage: bool = False, # TODO: THIS IS ONLY FOR EXPERIMENT. will be removed after experiment.
|
||||
) -> bytes:
|
||||
return await _current_viewpoint_screenshot_helper(
|
||||
page=page, file_path=file_path, full_page=full_page, timeout=timeout
|
||||
)
|
||||
if scrolling_number <= 0:
|
||||
return await _current_viewpoint_screenshot_helper(
|
||||
page=page, file_path=file_path, timeout=timeout, mode=mode
|
||||
)
|
||||
|
||||
if use_playwright_fullpage:
|
||||
return await _current_viewpoint_screenshot_helper(
|
||||
page=page, file_path=file_path, timeout=timeout, full_page=True
|
||||
)
|
||||
|
||||
if scrolling_number > settings.MAX_NUM_SCREENSHOTS:
|
||||
LOG.warning(
|
||||
"scrolling_number is greater than the max number of screenshots, setting it to the max number of screenshots",
|
||||
scrolling_number=scrolling_number,
|
||||
max_number=settings.MAX_NUM_SCREENSHOTS,
|
||||
)
|
||||
scrolling_number = settings.MAX_NUM_SCREENSHOTS
|
||||
|
||||
# use spilt screenshot with lite mode, isntead of fullpage screenshot from playwright
|
||||
LOG.debug("Page is fully loaded, agent is about to generate the full page screenshot")
|
||||
start_time = time.time()
|
||||
async with asyncio.timeout(timeout):
|
||||
screenshots = await _scrolling_screenshots_helper(page=page, mode=mode, max_number=scrolling_number)
|
||||
images = []
|
||||
|
||||
for screenshot in screenshots:
|
||||
with Image.open(BytesIO(screenshot)) as img:
|
||||
img.load()
|
||||
images.append(img)
|
||||
|
||||
total_height = sum(img.height for img in images)
|
||||
max_width = max(img.width for img in images)
|
||||
|
||||
merged_img = Image.new("RGB", (max_width, total_height), color=(255, 255, 255))
|
||||
|
||||
current_y = 0
|
||||
for img in images:
|
||||
merged_img.paste(img, (0, current_y))
|
||||
current_y += img.height
|
||||
|
||||
buffer = BytesIO()
|
||||
merged_img.save(buffer, format="PNG")
|
||||
buffer.seek(0)
|
||||
|
||||
img_data = buffer.read()
|
||||
if file_path is not None:
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(img_data)
|
||||
|
||||
end_time = time.time()
|
||||
LOG.debug(
|
||||
"Full page screenshot taking time",
|
||||
screenshot_time=end_time - start_time,
|
||||
file_path=file_path,
|
||||
)
|
||||
return img_data
|
||||
|
||||
@staticmethod
|
||||
async def take_split_screenshots(
|
||||
page: Page,
|
||||
url: str,
|
||||
url: str | None = None,
|
||||
draw_boxes: bool = False,
|
||||
max_number: int = settings.MAX_NUM_SCREENSHOTS,
|
||||
scroll: bool = True,
|
||||
) -> list[bytes]:
|
||||
if not scroll:
|
||||
return [await _current_viewpoint_screenshot_helper(page=page)]
|
||||
return [await _current_viewpoint_screenshot_helper(page=page, mode=ScreenshotMode.DETAILED)]
|
||||
|
||||
return await _scrolling_screenshots_helper(page=page, url=url, max_number=max_number, draw_boxes=draw_boxes)
|
||||
return await _scrolling_screenshots_helper(
|
||||
page=page, url=url, max_number=max_number, draw_boxes=draw_boxes, mode=ScreenshotMode.DETAILED
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def create_instance(cls, frame: Page | Frame) -> SkyvernFrame:
|
||||
@@ -235,19 +313,21 @@ class SkyvernFrame:
|
||||
)
|
||||
return scroll_y_px
|
||||
|
||||
async def scroll_to_next_page(self, draw_boxes: bool, frame: str, frame_index: int) -> float:
|
||||
async def scroll_to_next_page(
|
||||
self, draw_boxes: bool, frame: str, frame_index: int, need_overlap: bool = True
|
||||
) -> float:
|
||||
"""
|
||||
Scroll to the next page and take a screenshot.
|
||||
:param drow_boxes: If True, draw bounding boxes around the elements.
|
||||
:param page: Page instance to take the screenshot from.
|
||||
:return: Screenshot of the page.
|
||||
"""
|
||||
js_script = "async ([draw_boxes, frame, frame_index]) => await scrollToNextPage(draw_boxes, frame, frame_index)"
|
||||
js_script = "async ([draw_boxes, frame, frame_index, need_overlap]) => await scrollToNextPage(draw_boxes, frame, frame_index, need_overlap)"
|
||||
scroll_y_px = await self.evaluate(
|
||||
frame=self.frame,
|
||||
expression=js_script,
|
||||
timeout_ms=BUILDING_ELEMENT_TREE_TIMEOUT_MS,
|
||||
arg=[draw_boxes, frame, frame_index],
|
||||
arg=[draw_boxes, frame, frame_index, need_overlap],
|
||||
)
|
||||
return scroll_y_px
|
||||
|
||||
|
||||
Reference in New Issue
Block a user