import asyncio import json import os import re import uuid from typing import Any, Awaitable, Callable, List import structlog from deprecation import deprecated from playwright.async_api import Locator, Page from skyvern.constants import REPO_ROOT_DIR from skyvern.exceptions import ImaginaryFileUrl, MissingElement, MissingFileUrl, MultipleElementsFound from skyvern.forge import app from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.files import download_file from skyvern.forge.sdk.models import Step from skyvern.forge.sdk.schemas.tasks import Task from skyvern.forge.sdk.services.bitwarden import BitwardenConstants from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.webeye.actions import actions from skyvern.webeye.actions.actions import ( Action, ActionType, ClickAction, ScrapeResult, SelectOptionAction, UploadFileAction, WebAction, ) from skyvern.webeye.actions.responses import ActionFailure, ActionResult, ActionSuccess from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.scraper.scraper import ScrapedPage LOG = structlog.get_logger() class ActionHandler: _handled_action_types: dict[ ActionType, Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]] ] = {} _setup_action_types: dict[ ActionType, Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]] ] = {} _teardown_action_types: dict[ ActionType, Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]] ] = {} @classmethod def register_action_type( cls, action_type: ActionType, handler: Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ) -> None: cls._handled_action_types[action_type] = handler @classmethod def register_setup_for_action_type( cls, action_type: ActionType, handler: Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ) -> None: cls._setup_action_types[action_type] = handler @classmethod def register_teardown_for_action_type( cls, action_type: ActionType, handler: Callable[[Action, Page, ScrapedPage, Task, Step], Awaitable[list[ActionResult]]], ) -> None: cls._teardown_action_types[action_type] = handler @staticmethod async def handle_action( scraped_page: ScrapedPage, task: Task, step: Step, browser_state: BrowserState, action: Action, ) -> list[ActionResult]: LOG.info("Handling action", action=action) page = await browser_state.get_or_create_page() try: if action.action_type in ActionHandler._handled_action_types: actions_result: list[ActionResult] = [] # do setup before action handler if setup := ActionHandler._setup_action_types.get(action.action_type): results = await setup(action, page, scraped_page, task, step) actions_result.extend(results) if results and results[-1] != ActionSuccess: return actions_result # do the handler handler = ActionHandler._handled_action_types[action.action_type] results = await handler(action, page, scraped_page, task, step) actions_result.extend(results) if not results or type(actions_result[-1]) != ActionSuccess: return actions_result # do the teardown teardown = ActionHandler._teardown_action_types.get(action.action_type) if not teardown: return actions_result results = await teardown(action, page, scraped_page, task, step) actions_result.extend(results) return actions_result else: LOG.error("Unsupported action type in handler", action=action, type=type(action)) return [ActionFailure(Exception(f"Unsupported action type: {type(action)}"))] except MissingElement as e: LOG.info("Known exceptions", action=action, exception_type=type(e), exception_message=str(e)) return [ActionFailure(e)] except MultipleElementsFound as e: LOG.exception( "Cannot handle multiple elements with the same xpath in one action.", action=action, ) return [ActionFailure(e)] except Exception as e: LOG.exception("Unhandled exception in action handler", action=action) return [ActionFailure(e)] async def handle_solve_captcha_action( action: actions.SolveCaptchaAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: LOG.warning( "Please solve the captcha on the page, you have 30 seconds", action=action, ) await asyncio.sleep(30) return [ActionSuccess()] async def handle_click_action( action: actions.ClickAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: xpath = await validate_actions_in_dom(action, page, scraped_page) await asyncio.sleep(0.3) return await chain_click( task, page, action, xpath, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS ) async def handle_input_text_action( action: actions.InputTextAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: xpath = await validate_actions_in_dom(action, page, scraped_page) locator = page.locator(f"xpath={xpath}") current_text = await locator.input_value() if current_text == action.text: return [ActionSuccess()] await locator.clear() text = get_actual_value_of_parameter_if_secret(task, action.text) await locator.fill(text, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS) return [ActionSuccess()] async def handle_upload_file_action( action: actions.UploadFileAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: if not action.file_url: LOG.warning("InputFileAction has no file_url", action=action) return [ActionFailure(MissingFileUrl())] # ************************************************************************************************************** # # After this point if the file_url is a secret, it will be replaced with the actual value # In order to make sure we don't log the secret value, we log the action with the original value action.file_url # ************************************************************************************************************** # file_url = get_actual_value_of_parameter_if_secret(task, action.file_url) if file_url not in str(task.navigation_payload): LOG.warning( "LLM might be imagining the file url, which is not in navigation payload", action=action, file_url=action.file_url, ) return [ActionFailure(ImaginaryFileUrl(action.file_url))] xpath = await validate_actions_in_dom(action, page, scraped_page) file_path = await download_file(file_url) locator = page.locator(f"xpath={xpath}") is_file_input = await is_file_input_element(locator) if is_file_input: LOG.info("Taking UploadFileAction. Found file input tag", action=action) if file_path: await page.locator(f"xpath={xpath}").set_input_files( file_path, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS ) # Sleep for 10 seconds after uploading a file to let the page process it await asyncio.sleep(10) return [ActionSuccess()] else: return [ActionFailure(Exception(f"Failed to download file from {action.file_url}"))] else: LOG.info("Taking UploadFileAction. Found non file input tag", action=action) # treat it as a click action action.is_upload_file_tag = False return await chain_click( task, page, action, xpath, timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS ) @deprecated("This function is deprecated. Downloads are handled by the click action handler now.") async def handle_download_file_action( action: actions.DownloadFileAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: xpath = await validate_actions_in_dom(action, page, scraped_page) file_name = f"{action.file_name or uuid.uuid4()}" full_file_path = f"{REPO_ROOT_DIR}/downloads/{task.workflow_run_id or task.task_id}/{file_name}" try: # Start waiting for the download async with page.expect_download() as download_info: await asyncio.sleep(0.3) await page.click( f"xpath={xpath}", timeout=SettingsManager.get_settings().BROWSER_ACTION_TIMEOUT_MS, modifiers=["Alt"] ) download = await download_info.value # Create download folders if they don't exist download_folder = f"{REPO_ROOT_DIR}/downloads/{task.workflow_run_id or task.task_id}" os.makedirs(download_folder, exist_ok=True) # Wait for the download process to complete and save the downloaded file await download.save_as(full_file_path) except Exception as e: LOG.exception( "DownloadFileAction: Failed to download file", action=action, full_file_path=full_file_path, ) return [ActionFailure(e)] return [ActionSuccess(data={"file_path": full_file_path})] async def handle_null_action( action: actions.NullAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: return [ActionSuccess()] async def handle_select_option_action( action: actions.SelectOptionAction, page: Page, scraped_page: ScrapedPage, task: Task, step: Step ) -> list[ActionResult]: xpath = await validate_actions_in_dom(action, page, scraped_page) locator = page.locator(f"xpath={xpath}") tag_name = await get_tag_name_lowercase(locator) element_dict = scraped_page.id_to_element_dict[action.element_id] LOG.info("SelectOptionAction", action=action, tag_name=tag_name, element_dict=element_dict) # if element is not a select option, prioritize clicking the linked element if any if tag_name != "select" and "linked_element" in element_dict: LOG.info( "SelectOptionAction is not on a select tag and found a linked element", action=action, linked_element=element_dict["linked_element"], ) listbox_click_success = await click_listbox_option(scraped_page, page, action, element_dict["linked_element"]) if listbox_click_success: LOG.info( "Successfully clicked linked element", action=action, linked_element=element_dict["linked_element"], ) return [ActionSuccess()] LOG.warning("Failed to click linked element", action=action, linked_element=element_dict["linked_element"]) # check if the element is an a tag first. If yes, click it instead of selecting the option if tag_name == "label": # TODO: this is a hack to handle the case where the label is the only thing that's clickable # it's a label, look for the anchor tag child_anchor_xpath = get_anchor_to_click(scraped_page, action.element_id) if child_anchor_xpath: LOG.info( "SelectOptionAction is a label tag. Clicking the anchor tag instead of selecting the option", action=action, child_anchor_xpath=child_anchor_xpath, ) click_action = ClickAction(element_id=action.element_id) return await chain_click(task, page, click_action, child_anchor_xpath) # handler the select action on select_element_id = get_select_id_in_label_children(scraped_page, action.element_id) if select_element_id is not None: LOG.info( "SelectOptionAction is on