From ea2e73f8ccef8ff836b076b562f78a7c01549560 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Wed, 15 Oct 2025 19:50:44 -0700 Subject: [PATCH] SkyvernPage actions: Support selector and xpath at the same time. Migrate generate script to use click(selector=f"xpath={xpath}") (#3729) --- .../script_generations/generate_script.py | 32 +- .../core/script_generations/skyvern_page.py | 385 +++++++++++++----- skyvern/services/script_service.py | 7 +- 3 files changed, 318 insertions(+), 106 deletions(-) diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index b984834b..7b357576 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -27,6 +27,7 @@ from skyvern.schemas.workflows import FileStorageType from skyvern.webeye.actions.action_types import ActionType LOG = structlog.get_logger(__name__) +GENERATE_CODE_AI_MODE = "proactive" # --------------------------------------------------------------------- # @@ -227,7 +228,7 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: """ Turn one Action dict into: - await page.(xpath=..., intention=..., data=context.parameters) + await page.(selector=..., intention=..., data=context.parameters) Or if assign_to_output is True for extract actions: @@ -239,8 +240,8 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: if method in ACTIONS_WITH_XPATH: args.append( cst.Arg( - keyword=cst.Name("xpath"), - value=_value(act["xpath"]), + keyword=cst.Name("selector"), + value=_value(f"xpath={act['xpath']}"), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -248,7 +249,18 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: ) ) - if method in ["type", "fill"]: + if method == "click": + args.append( + cst.Arg( + keyword=cst.Name("ai"), + value=_value(GENERATE_CODE_AI_MODE), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) + elif method in ["type", "fill"]: # Use context.parameters if field_name is available, otherwise fallback to direct value if act.get("field_name"): text_value = cst.Subscript( @@ -273,8 +285,8 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: ) args.append( cst.Arg( - keyword=cst.Name("ai_infer"), - value=cst.Name("True"), + keyword=cst.Name("ai"), + value=_value(GENERATE_CODE_AI_MODE), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -330,8 +342,8 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: ) args.append( cst.Arg( - keyword=cst.Name("ai_infer"), - value=cst.Name("True"), + keyword=cst.Name("ai"), + value=_value(GENERATE_CODE_AI_MODE), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -361,8 +373,8 @@ def _action_to_stmt(act: dict[str, Any], task: dict[str, Any], assign_to_output: ) args.append( cst.Arg( - keyword=cst.Name("ai_infer"), - value=cst.Name("True"), + keyword=cst.Name("ai"), + value=_value(GENERATE_CODE_AI_MODE), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), diff --git a/skyvern/core/script_generations/skyvern_page.py b/skyvern/core/script_generations/skyvern_page.py index 4728078a..c0b36801 100644 --- a/skyvern/core/script_generations/skyvern_page.py +++ b/skyvern/core/script_generations/skyvern_page.py @@ -36,6 +36,7 @@ from skyvern.webeye.actions.actions import ( ) from skyvern.webeye.actions.handler import ( ActionHandler, + handle_click_action, handle_complete_action, handle_input_text_action, handle_select_option_action, @@ -72,8 +73,8 @@ class ActionCall: error: Exception | None = None # populated if failed -async def _get_element_id_by_xpath(xpath: str, page: Page) -> str | None: - locator = page.locator(f"xpath={xpath}") +async def _get_element_id_by_selector(selector: str, page: Page) -> str | None: + locator = page.locator(selector) element_id = await locator.get_attribute("unique_id") return element_id @@ -353,9 +354,12 @@ class SkyvernPage: # Create action record. TODO: store more action fields kwargs = kwargs or {} # we're using "value" instead of "text" for input text actions interface - xpath = kwargs.get("xpath") + xpath = None if action_type == ActionType.CLICK: - xpath = call_result or xpath + if isinstance(call_result, str) and "xpath=" in call_result: + xpath_split_list = call_result.split("xpath=") + if len(xpath_split_list) > 1: + xpath = xpath_split_list[1] text = None select_option = None response: str | None = kwargs.get("response") @@ -470,59 +474,120 @@ class SkyvernPage: # If screenshot creation fails, don't block execution pass + async def _ai_click( + self, + selector: str, + intention: str, + data: str | dict[str, Any] | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + ) -> str: + try: + # Build the element tree of the current page for the prompt + context = skyvern_context.ensure_context() + payload_str = _get_context_data(data) + refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots() + element_tree = refreshed_page.build_element_tree() + single_click_prompt = prompt_engine.load_prompt( + template="single-click-action", + navigation_goal=intention, + navigation_payload_str=payload_str, + current_url=self.page.url, + elements=element_tree, + local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(), + # user_context=getattr(context, "prompt", None), + ) + json_response = await app.SINGLE_CLICK_AGENT_LLM_API_HANDLER( + prompt=single_click_prompt, + prompt_name="single-click-action", + organization_id=context.organization_id, + ) + actions_json = json_response.get("actions", []) + if actions_json: + organization_id = context.organization_id if context else None + task_id = context.task_id if context else None + step_id = context.step_id if context else None + task = await app.DATABASE.get_task(task_id, organization_id) if task_id and organization_id else None + step = await app.DATABASE.get_step(step_id, organization_id) if step_id and organization_id else None + if organization_id and task and step: + actions = parse_actions( + task, step.step_id, step.order, self.scraped_page, json_response.get("actions", []) + ) + action = actions[0] + result = await handle_click_action(action, self.page, self.scraped_page, task, step) + if result and result[-1].success is False: + raise Exception(result[-1].exception_message) + xpath = action.get_xpath() + selector = f"xpath={xpath}" if xpath else selector + return selector + except Exception: + LOG.exception( + f"Failed to do ai click. Falling back to original selector={selector}, intention={intention}, data={data}" + ) + + locator = self.page.locator(selector) + await locator.click(timeout=timeout) + return selector + ######### Public Interfaces ######### @action_wrap(ActionType.CLICK) - async def click(self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> str: - """Click an element identified by ``xpath``. + async def click( + self, + selector: str, + intention: str | None = None, + ai: str | None = "fallback", + data: str | dict[str, Any] | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + ) -> str: + """Click an element identified by ``selector``. When ``intention`` and ``data`` are provided a new click action is generated via the ``single-click-action`` prompt. The model returns a - fresh xpath based on the current DOM and the updated data for this run. - The browser then clicks the element using this newly generated xpath. + fresh "xpath=..." selector based on the current DOM and the updated data for this run. + The browser then clicks the element using this newly generated xpath selector. If the prompt generation or parsing fails for any reason we fall back to - clicking the originally supplied ``xpath``. + clicking the originally supplied ``selector``. """ - new_xpath = xpath - - if intention: + if ai == "fallback": + # try to click the element with the original selector first + error_to_raise = None try: - # Build the element tree of the current page for the prompt - context = skyvern_context.ensure_context() - payload_str = _get_context_data(data) - refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots() - element_tree = refreshed_page.build_element_tree() - single_click_prompt = prompt_engine.load_prompt( - template="single-click-action", - navigation_goal=intention, - navigation_payload_str=payload_str, - current_url=self.page.url, - elements=element_tree, - local_datetime=datetime.now(context.tz_info or datetime.now().astimezone().tzinfo).isoformat(), - # user_context=getattr(context, "prompt", None), - ) - json_response = await app.SINGLE_CLICK_AGENT_LLM_API_HANDLER( - prompt=single_click_prompt, - prompt_name="single-click-action", - organization_id=context.organization_id, - ) - actions = json_response.get("actions", []) - if actions: - new_xpath = actions[0].get("xpath", xpath) or xpath - except Exception: - # If anything goes wrong, fall back to the original xpath - new_xpath = xpath + locator = self.page.locator(selector) + await locator.click(timeout=timeout) + return selector + except Exception as e: + error_to_raise = e - locator = self.page.locator(f"xpath={new_xpath}") - await locator.click(timeout=5000) - return new_xpath + # if the original selector doesn't work, try to click the element with the ai generated selector + if intention: + return await self._ai_click( + selector=selector, + intention=intention, + data=data, + timeout=timeout, + ) + if error_to_raise: + raise error_to_raise + else: + return selector + elif ai == "proactive": + if intention: + return await self._ai_click( + selector=selector, + intention=intention, + data=data, + timeout=timeout, + ) + locator = self.page.locator(selector) + await locator.click(timeout=timeout) + return selector @action_wrap(ActionType.INPUT_TEXT) async def fill( self, - xpath: str, + selector: str, value: str, - ai_infer: bool = False, + ai: str | None = "fallback", intention: str | None = None, data: str | dict[str, Any] | None = None, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, @@ -530,9 +595,9 @@ class SkyvernPage: totp_url: str | None = None, ) -> str: return await self._input_text( - xpath=xpath, + selector=selector, value=value, - ai_infer=ai_infer, + ai=ai, intention=intention, data=data, timeout=timeout, @@ -543,9 +608,9 @@ class SkyvernPage: @action_wrap(ActionType.INPUT_TEXT) async def type( self, - xpath: str, + selector: str, value: str, - ai_infer: bool = False, + ai: str | None = "fallback", intention: str | None = None, data: str | dict[str, Any] | None = None, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, @@ -553,9 +618,9 @@ class SkyvernPage: totp_url: str | None = None, ) -> str: return await self._input_text( - xpath=xpath, + selector=selector, value=value, - ai_infer=ai_infer, + ai=ai, intention=intention, data=data, timeout=timeout, @@ -563,28 +628,16 @@ class SkyvernPage: totp_url=totp_url, ) - async def _input_text( + async def _ai_input_text( self, - xpath: str, + selector: str, value: str, - ai_infer: bool = False, - intention: str | None = None, + intention: str, data: str | dict[str, Any] | None = None, - timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, totp_identifier: str | None = None, totp_url: str | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, ) -> str: - """Input text into an element identified by ``xpath``. - - When ``intention`` and ``data`` are provided a new input text action is - generated via the `script-generation-input-text-generatiion` prompt. The model returns a - fresh text based on the current DOM and the updated data for this run. - The browser then inputs the text using this newly generated text. - - If the prompt generation or parsing fails for any reason we fall back to - inputting the originally supplied ``text``. - """ - # format the text with the actual value of the parameter if it's a secret when running a workflow context = skyvern_context.current() value = value or "" transformed_value = value @@ -595,7 +648,7 @@ class SkyvernPage: workflow_run_id = context.workflow_run_id if context else None task = await app.DATABASE.get_task(task_id, organization_id) if task_id and organization_id else None step = await app.DATABASE.get_step(step_id, organization_id) if step_id and organization_id else None - if ai_infer and intention: + if intention: try: prompt = context.prompt if context else None data = data or {} @@ -624,8 +677,8 @@ class SkyvernPage: refreshed_page = await self.scraped_page.generate_scraped_page_without_screenshots() self.scraped_page = refreshed_page - # get the element_id by the xpath - element_id = await _get_element_id_by_xpath(xpath, self.page) + # get the element_id by the selector + element_id = await _get_element_id_by_selector(selector, self.page) script_generation_input_text_prompt = prompt_engine.load_prompt( template="script-generation-input-text-generatiion", intention=intention, @@ -639,10 +692,10 @@ class SkyvernPage: ) value = json_response.get("answer", value) except Exception: - LOG.exception(f"Failed to adapt value for input text action on xpath={xpath}, value={value}") + LOG.exception(f"Failed to adapt value for input text action on selector={selector}, value={value}") if context and context.workflow_run_id: - transformed_value = await _get_actual_value_of_parameter_if_secret(context.workflow_run_id, value) + transformed_value = await _get_actual_value_of_parameter_if_secret(context.workflow_run_id, str(value)) if element_id and organization_id and task and step: action = InputTextAction( @@ -661,20 +714,78 @@ class SkyvernPage: if result and result[-1].success is False: raise Exception(result[-1].exception_message) else: - locator = self.page.locator(f"xpath={xpath}") + locator = self.page.locator(selector) await handler_utils.input_sequentially(locator, transformed_value, timeout=timeout) return value - @action_wrap(ActionType.UPLOAD_FILE) - async def upload_file( + async def _input_text( self, - xpath: str, - files: str, - ai_infer: bool = False, + selector: str, + value: str, + ai: str | None = "fallback", intention: str | None = None, data: str | dict[str, Any] | None = None, + totp_identifier: str | None = None, + totp_url: str | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, ) -> str: - if ai_infer and intention: + """Input text into an element identified by ``selector``. + + When ``intention`` and ``data`` are provided a new input text action is + generated via the `script-generation-input-text-generation` prompt. The model returns a + fresh text based on the current DOM and the updated data for this run. + The browser then inputs the text using this newly generated text. + + If the prompt generation or parsing fails for any reason we fall back to + inputting the originally supplied ``text``. + """ + # format the text with the actual value of the parameter if it's a secret when running a workflow + if ai == "fallback": + error_to_raise = None + try: + locator = self.page.locator(selector) + await handler_utils.input_sequentially(locator, value, timeout=timeout) + except Exception as e: + error_to_raise = e + + if intention: + return await self._ai_input_text( + selector=selector, + value=value, + intention=intention, + data=data, + totp_identifier=totp_identifier, + totp_url=totp_url, + timeout=timeout, + ) + if error_to_raise: + raise error_to_raise + else: + return value + elif ai == "proactive" and intention: + return await self._ai_input_text( + selector=selector, + value=value, + intention=intention, + data=data, + totp_identifier=totp_identifier, + totp_url=totp_url, + timeout=timeout, + ) + locator = self.page.locator(selector) + await handler_utils.input_sequentially(locator, value, timeout=timeout) + return value + + async def _ai_upload_file( + self, + selector: str, + files: str, + file_path: str, + intention: str, + data: str | dict[str, Any] | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + ) -> str: + if intention: try: context = skyvern_context.current() prompt = context.prompt if context else None @@ -692,28 +803,71 @@ class SkyvernPage: ) files = json_response.get("answer", files) except Exception: - LOG.exception(f"Failed to adapt value for input text action on xpath={xpath}, file={files}") - file_path = await download_file(files) - locator = self.page.locator(f"xpath={xpath}") - await locator.set_input_files(file_path) + LOG.exception(f"Failed to adapt value for input text action on selector={selector}, file={files}") + if not files: + raise ValueError("file url must be provided") + locator = self.page.locator(selector) + await locator.set_input_files(file_path, timeout=timeout) return files - @action_wrap(ActionType.SELECT_OPTION) - async def select_option( + @action_wrap(ActionType.UPLOAD_FILE) + async def upload_file( self, - xpath: str, - value: str, - ai_infer: bool = False, + selector: str, + files: str, + ai: str | None = "fallback", intention: str | None = None, data: str | dict[str, Any] | None = None, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + ) -> str: + file_path = await download_file(files) + if ai == "fallback": + error_to_raise = None + try: + locator = self.page.locator(selector) + await locator.set_input_files(file_path) + except Exception as e: + error_to_raise = e + if intention: + return await self._ai_upload_file( + selector=selector, + files=files, + file_path=file_path, + intention=intention, + data=data, + timeout=timeout, + ) + if error_to_raise: + raise error_to_raise + else: + return files + elif ai == "proactive" and intention: + return await self._ai_upload_file( + selector=selector, + files=files, + file_path=file_path, + intention=intention, + data=data, + timeout=timeout, + ) + locator = self.page.locator(selector) + await locator.set_input_files(file_path, timeout=timeout) + return files + + async def _ai_select_option( + self, + selector: str, + value: str, + intention: str, + data: str | dict[str, Any] | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, ) -> str: option_value = value or "" context = skyvern_context.current() if context and context.task_id and context.step_id and context.organization_id: task = await app.DATABASE.get_task(context.task_id, organization_id=context.organization_id) step = await app.DATABASE.get_step(context.step_id, organization_id=context.organization_id) - if ai_infer and intention and task and step: + if intention and task and step: try: prompt = context.prompt if context else None # data = _get_context_data(data) @@ -735,7 +889,9 @@ class SkyvernPage: prompt_name="single-select-action", organization_id=context.organization_id if context else None, ) - actions = parse_actions(task, step.step_id, step.order, self.scraped_page, json_response["actions"]) + actions = parse_actions( + task, step.step_id, step.order, self.scraped_page, json_response.get("actions", []) + ) if actions: action = actions[0] if not action.option: @@ -750,15 +906,60 @@ class SkyvernPage: ) else: LOG.exception( - f"Failed to parse actions for select option action on xpath={xpath}, value={value}" + f"Failed to parse actions for select option action on selector={selector}, value={value}" ) except Exception: - LOG.exception(f"Failed to adapt value for select option action on xpath={xpath}, value={value}") + LOG.exception( + f"Failed to adapt value for select option action on selector={selector}, value={value}" + ) else: - locator = self.page.locator(f"xpath={xpath}") + locator = self.page.locator(selector) await locator.select_option(option_value, timeout=timeout) return option_value + @action_wrap(ActionType.SELECT_OPTION) + async def select_option( + self, + selector: str, + value: str | None = None, + label: str | None = None, + ai: str | None = "fallback", + intention: str | None = None, + data: str | dict[str, Any] | None = None, + timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, + ) -> str: + value = value or "" + if ai == "fallback": + error_to_raise = None + try: + locator = self.page.locator(selector) + await locator.select_option(value, timeout=timeout) + except Exception as e: + error_to_raise = e + if intention: + return await self._ai_select_option( + selector=selector, + value=value, + intention=intention, + data=data, + timeout=timeout, + ) + if error_to_raise: + raise error_to_raise + else: + return value + elif ai == "proactive" and intention: + return await self._ai_select_option( + selector=selector, + value=value, + intention=intention, + data=data, + timeout=timeout, + ) + locator = self.page.locator(selector) + await locator.select_option(value, timeout=timeout) + return value + @action_wrap(ActionType.WAIT) async def wait( self, seconds: float, intention: str | None = None, data: str | dict[str, Any] | None = None @@ -873,9 +1074,7 @@ class SkyvernPage: return result @action_wrap(ActionType.VERIFICATION_CODE) - async def verification_code( - self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None - ) -> None: + async def verification_code(self, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: return @action_wrap(ActionType.SCROLL) diff --git a/skyvern/services/script_service.py b/skyvern/services/script_service.py index 860e5061..919cc1f8 100644 --- a/skyvern/services/script_service.py +++ b/skyvern/services/script_service.py @@ -37,6 +37,7 @@ from skyvern.forge.sdk.workflow.models.block import ( ForLoopBlock, HttpRequestBlock, LoginBlock, + NavigationBlock, PDFParserBlock, SendEmailBlock, TaskBlock, @@ -1176,7 +1177,7 @@ async def run_task( if cache_key and cached_fn: # Auto-create workflow block run and task if workflow_run_id is available workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task( - block_type=BlockType.TASK, + block_type=BlockType.NAVIGATION, prompt=prompt, url=url, label=cache_key, @@ -1201,7 +1202,7 @@ async def run_task( except Exception as e: LOG.exception("Failed to run task block. Falling back to AI run.") await _fallback_to_ai_run( - block_type=BlockType.TASK, + block_type=BlockType.NAVIGATION, cache_key=cache_key, prompt=prompt, url=url, @@ -1216,7 +1217,7 @@ async def run_task( context.prompt = None else: block_validation_output = await _validate_and_get_output_parameter(label) - task_block = TaskBlock( + task_block = NavigationBlock( label=block_validation_output.label, output_parameter=block_validation_output.output_parameter, url=url,