script generation: regenerate click xpath from intention (#3169)
This commit is contained in:
@@ -1,24 +1,9 @@
|
||||
from typing import Any
|
||||
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.webeye.browser_factory import BrowserContextFactory
|
||||
|
||||
|
||||
# TODO: find a better name for this function
|
||||
async def setup(parameters: dict[str, Any], generate_response: bool = False) -> tuple[SkyvernPage, RunContext]:
|
||||
# set up skyvern context
|
||||
skyvern_context.set(skyvern_context.SkyvernContext())
|
||||
# start playwright
|
||||
pw = await async_playwright().start()
|
||||
(
|
||||
browser_context,
|
||||
_,
|
||||
_,
|
||||
) = await BrowserContextFactory.create_browser_context(playwright=pw)
|
||||
new_page = await browser_context.new_page()
|
||||
# skyvern_page = SkyvernPage(page=new_page, generate_response=generate_response)
|
||||
skyvern_page = SkyvernPage(page=new_page)
|
||||
return skyvern_page, RunContext(parameters=parameters, page=skyvern_page)
|
||||
skyvern_page = await SkyvernPage.create()
|
||||
run_context = RunContext(parameters=parameters, page=skyvern_page)
|
||||
return skyvern_page, run_context
|
||||
|
||||
@@ -1,16 +1,22 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import Any, Callable, Literal
|
||||
|
||||
from playwright.async_api import Page
|
||||
|
||||
from skyvern.config import settings
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.prompts import prompt_engine
|
||||
from skyvern.forge.sdk.api.files import download_file
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
from skyvern.webeye.actions import handler_utils
|
||||
from skyvern.webeye.actions.action_types import ActionType
|
||||
from skyvern.webeye.scraper.scraper import ScrapedPage, scrape_website
|
||||
|
||||
|
||||
class Driver(StrEnum):
|
||||
@@ -45,16 +51,37 @@ class SkyvernPage:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
scraped_page: ScrapedPage,
|
||||
page: Page,
|
||||
driver: Driver = Driver.PLAYWRIGHT,
|
||||
*,
|
||||
recorder: Callable[[ActionCall], None] | None = None,
|
||||
# generate_response: bool = False,
|
||||
):
|
||||
self.driver = driver
|
||||
self.page = page # e.g. Playwright's Page
|
||||
self.scraped_page = scraped_page
|
||||
self.page = page
|
||||
self._record = recorder or (lambda ac: None)
|
||||
|
||||
@classmethod
|
||||
async def create(cls) -> SkyvernPage:
|
||||
# set up skyvern context if not already set
|
||||
current_skyvern_context = skyvern_context.current()
|
||||
if not current_skyvern_context:
|
||||
skyvern_context.set(skyvern_context.SkyvernContext())
|
||||
|
||||
# initialize browser state
|
||||
browser_state = await app.BROWSER_MANAGER.get_or_create_for_script()
|
||||
scraped_page = await scrape_website(
|
||||
browser_state=browser_state,
|
||||
url="",
|
||||
cleanup_element_tree=app.AGENT_FUNCTION.cleanup_element_tree_factory(),
|
||||
scrape_exclude=app.scrape_exclude,
|
||||
max_screenshot_number=settings.MAX_NUM_SCREENSHOTS,
|
||||
draw_boxes=True,
|
||||
scroll=True,
|
||||
)
|
||||
page = await scraped_page._browser_state.must_get_working_page()
|
||||
return cls(scraped_page=scraped_page, page=page)
|
||||
|
||||
@staticmethod
|
||||
def action_wrap(
|
||||
action: ActionType,
|
||||
@@ -97,32 +124,47 @@ class SkyvernPage:
|
||||
######### Public Interfaces #########
|
||||
@action_wrap(ActionType.CLICK)
|
||||
async def click(self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None:
|
||||
# if self.generate_response:
|
||||
# # TODO: get element tree
|
||||
# # generate click action based on the current html
|
||||
# single_click_prompt = prompt_engine.load_prompt(
|
||||
# template="single-click-action",
|
||||
# navigation_goal=intention,
|
||||
# navigation_payload_str=data,
|
||||
# current_url=self.page.url,
|
||||
# elements=element_tree,
|
||||
# local_datetime=datetime.now(context.tz_info).isoformat(),
|
||||
# user_context=context.prompt,
|
||||
# )
|
||||
# json_response = await app.SINGLE_CLICK_AGENT_LLM_API_HANDLER(
|
||||
# prompt=single_click_prompt,
|
||||
# prompt_name="single-click-action",
|
||||
# step=step,
|
||||
# )
|
||||
# click_actions = parse_actions(new_task, step.step_id, step.order, scraped_page, json_response["actions"])
|
||||
# if not click_actions:
|
||||
# raise CachedActionPlanError("No click actions to execute")
|
||||
# for click_action in click_actions:
|
||||
# await _handle_action(
|
||||
# click_action, step, new_task, scraped_page, current_page, detailed_output, browser_state, engine
|
||||
# )
|
||||
"""Click an element identified by ``xpath``.
|
||||
|
||||
locator = self.page.locator(f"xpath={xpath}")
|
||||
When ``intention`` and ``data`` are provided a new click action is
|
||||
generated via the ``single-click-action`` prompt. The model returns a
|
||||
fresh xpath based on the current DOM and the updated data for this run.
|
||||
The browser then clicks the element using this newly generated xpath.
|
||||
|
||||
If the prompt generation or parsing fails for any reason we fall back to
|
||||
clicking the originally supplied ``xpath``.
|
||||
"""
|
||||
|
||||
new_xpath = xpath
|
||||
|
||||
if intention and data:
|
||||
try:
|
||||
# Build the element tree of the current page for the prompt
|
||||
context = skyvern_context.ensure_context()
|
||||
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
|
||||
refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots()
|
||||
element_tree = refreshed_page.build_element_tree()
|
||||
single_click_prompt = prompt_engine.load_prompt(
|
||||
template="single-click-action",
|
||||
navigation_goal=intention,
|
||||
navigation_payload_str=payload_str,
|
||||
current_url=self.page.url,
|
||||
elements=element_tree,
|
||||
local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(),
|
||||
user_context=getattr(context, "prompt", None),
|
||||
)
|
||||
json_response = await app.SINGLE_CLICK_AGENT_LLM_API_HANDLER(
|
||||
prompt=single_click_prompt,
|
||||
prompt_name="single-click-action",
|
||||
)
|
||||
actions = json_response.get("actions", [])
|
||||
if actions:
|
||||
new_xpath = actions[0].get("xpath", xpath) or xpath
|
||||
except Exception:
|
||||
# If anything goes wrong, fall back to the original xpath
|
||||
new_xpath = xpath
|
||||
|
||||
locator = self.page.locator(f"xpath={new_xpath}")
|
||||
await locator.click(timeout=5000)
|
||||
|
||||
@action_wrap(ActionType.INPUT_TEXT)
|
||||
|
||||
@@ -26,6 +26,8 @@ class SkyvernContext:
|
||||
frame_index_map: dict[Frame, int] = field(default_factory=dict)
|
||||
dropped_css_svg_element_map: dict[str, bool] = field(default_factory=dict)
|
||||
max_screenshot_scrolls: int | None = None
|
||||
script_id: str | None = None
|
||||
script_revision_id: str | None = None
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"SkyvernContext(request_id={self.request_id}, organization_id={self.organization_id}, task_id={self.task_id}, workflow_id={self.workflow_id}, workflow_run_id={self.workflow_run_id}, task_v2_id={self.task_v2_id}, max_steps_override={self.max_steps_override}, run_id={self.run_id})"
|
||||
|
||||
@@ -626,6 +626,7 @@ class BrowserState:
|
||||
proxy_location: ProxyLocation | None = None,
|
||||
task_id: str | None = None,
|
||||
workflow_run_id: str | None = None,
|
||||
script_id: str | None = None,
|
||||
organization_id: str | None = None,
|
||||
extra_http_headers: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
@@ -772,6 +773,7 @@ class BrowserState:
|
||||
proxy_location: ProxyLocation | None = None,
|
||||
task_id: str | None = None,
|
||||
workflow_run_id: str | None = None,
|
||||
script_id: str | None = None,
|
||||
organization_id: str | None = None,
|
||||
extra_http_headers: dict[str, str] | None = None,
|
||||
) -> Page:
|
||||
@@ -785,6 +787,7 @@ class BrowserState:
|
||||
proxy_location=proxy_location,
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
script_id=script_id,
|
||||
organization_id=organization_id,
|
||||
extra_http_headers=extra_http_headers,
|
||||
)
|
||||
@@ -800,6 +803,7 @@ class BrowserState:
|
||||
proxy_location=proxy_location,
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
script_id=script_id,
|
||||
organization_id=organization_id,
|
||||
extra_http_headers=extra_http_headers,
|
||||
)
|
||||
@@ -814,6 +818,7 @@ class BrowserState:
|
||||
proxy_location=proxy_location,
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
script_id=script_id,
|
||||
organization_id=organization_id,
|
||||
extra_http_headers=extra_http_headers,
|
||||
)
|
||||
|
||||
@@ -30,6 +30,7 @@ class BrowserManager:
|
||||
url: str | None = None,
|
||||
task_id: str | None = None,
|
||||
workflow_run_id: str | None = None,
|
||||
script_id: str | None = None,
|
||||
organization_id: str | None = None,
|
||||
extra_http_headers: dict[str, str] | None = None,
|
||||
) -> BrowserState:
|
||||
@@ -44,6 +45,7 @@ class BrowserManager:
|
||||
url=url,
|
||||
task_id=task_id,
|
||||
workflow_run_id=workflow_run_id,
|
||||
script_id=script_id,
|
||||
organization_id=organization_id,
|
||||
extra_http_headers=extra_http_headers,
|
||||
)
|
||||
@@ -377,3 +379,50 @@ class BrowserManager:
|
||||
)
|
||||
|
||||
return browser_state_to_close
|
||||
|
||||
async def get_or_create_for_script(
|
||||
self,
|
||||
script_id: str | None = None,
|
||||
browser_session_id: str | None = None,
|
||||
) -> BrowserState:
|
||||
browser_state = await self.get_for_script(script_id=script_id)
|
||||
if browser_state:
|
||||
return browser_state
|
||||
|
||||
if browser_session_id:
|
||||
LOG.info(
|
||||
"Getting browser state for script",
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
browser_state = await app.PERSISTENT_SESSIONS_MANAGER.get_browser_state(
|
||||
browser_session_id, organization_id=script_id
|
||||
)
|
||||
if browser_state is None:
|
||||
LOG.warning(
|
||||
"Browser state not found in persistent sessions manager",
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
else:
|
||||
page = await browser_state.get_working_page()
|
||||
if not page:
|
||||
LOG.warning("Browser state has no page to run the script", script_id=script_id)
|
||||
proxy_location = ProxyLocation.RESIDENTIAL
|
||||
if not browser_state:
|
||||
browser_state = await self._create_browser_state(
|
||||
proxy_location=proxy_location,
|
||||
script_id=script_id,
|
||||
)
|
||||
|
||||
if script_id:
|
||||
self.pages[script_id] = browser_state
|
||||
await browser_state.get_or_create_page(
|
||||
proxy_location=proxy_location,
|
||||
script_id=script_id,
|
||||
)
|
||||
|
||||
return browser_state
|
||||
|
||||
async def get_for_script(self, script_id: str | None = None) -> BrowserState | None:
|
||||
if script_id and script_id in self.pages:
|
||||
return self.pages[script_id]
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user