From 019e244cd800c61b4ca5834054f1725aee51907d Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 5 Jun 2025 09:41:06 -0700 Subject: [PATCH] generate totp code right before input (#2608) --- skyvern/webeye/actions/handler.py | 33 ++++++++++++++++++++++--------- skyvern/webeye/utils/dom.py | 7 +++++++ 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index 0a4b4fc5..9ffa6bfa 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -96,11 +96,10 @@ from skyvern.webeye.scraper.scraper import ( json_to_html, trim_element_tree, ) -from skyvern.webeye.utils.dom import DomUtil, InteractiveElement, SkyvernElement +from skyvern.webeye.utils.dom import COMMON_INPUT_TAGS, DomUtil, InteractiveElement, SkyvernElement from skyvern.webeye.utils.page import SkyvernFrame LOG = structlog.get_logger() -COMMON_INPUT_TAGS = {"input", "textarea", "select"} class CustomSingleSelectResult: @@ -822,6 +821,9 @@ async def handle_input_text_action( if text is None: return [ActionFailure(FailedToFetchSecret())] + is_totp_value = text == BitwardenConstants.TOTP + is_secret_value = text != action.text + # dynamically validate the attr, since it could change into enabled after the previous actions if await skyvern_element.is_disabled(dynamic=True): LOG.warning( @@ -984,7 +986,7 @@ async def handle_input_text_action( await skyvern_element.get_locator().focus(timeout=timeout) # check the phone number format when type=tel and the text is not a secret value - if await skyvern_element.get_attr("type") == "tel" and text == action.text: + if not is_secret_value and await skyvern_element.get_attr("type") == "tel": try: text = await check_phone_number_format( value=text, @@ -1017,6 +1019,8 @@ async def handle_input_text_action( if not class_name or "blinking-cursor" not in class_name: return [ActionFailure(InvalidElementForTextInput(element_id=action.element_id, tag_name=tag_name))] + if is_totp_value: + text = generate_totp_value(task=task, parameter=action.text) await skyvern_element.press_fill(text=text) return [ActionSuccess()] @@ -1039,6 +1043,12 @@ async def handle_input_text_action( step_id=step.step_id, ) + if is_totp_value: + LOG.info("Skipping the auto completion logic since it's a TOTP input") + text = generate_totp_value(task=task, parameter=action.text) + await skyvern_element.input(text) + return [ActionSuccess()] + try: # TODO: not sure if this case will trigger auto-completion if tag_name not in COMMON_INPUT_TAGS: @@ -1840,15 +1850,20 @@ async def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) -> workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id) secret_value = workflow_run_context.get_original_secret_value_or_none(parameter) - - if secret_value == BitwardenConstants.TOTP: - totp_secret_key = workflow_run_context.totp_secret_value_key(parameter) - totp_secret = workflow_run_context.get_original_secret_value_or_none(totp_secret_key) - totp_secret_no_whitespace = "".join(totp_secret.split()) - secret_value = pyotp.TOTP(totp_secret_no_whitespace).now() return secret_value if secret_value is not None else parameter +def generate_totp_value(task: Task, parameter: str) -> str: + if task.workflow_run_id is None: + return parameter + + workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id) + totp_secret_key = workflow_run_context.totp_secret_value_key(parameter) + totp_secret = workflow_run_context.get_original_secret_value_or_none(totp_secret_key) + totp_secret_no_whitespace = "".join(totp_secret.split()) + return pyotp.TOTP(totp_secret_no_whitespace).now() + + async def chain_click( task: Task, scraped_page: ScrapedPage, diff --git a/skyvern/webeye/utils/dom.py b/skyvern/webeye/utils/dom.py index c9615b50..73804f41 100644 --- a/skyvern/webeye/utils/dom.py +++ b/skyvern/webeye/utils/dom.py @@ -28,6 +28,7 @@ from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, j from skyvern.webeye.utils.page import SkyvernFrame LOG = structlog.get_logger() +COMMON_INPUT_TAGS = {"input", "textarea", "select"} TEXT_INPUT_DELAY = 10 # 10ms between each character input TEXT_PRESS_MAX_LENGTH = 20 @@ -590,6 +591,12 @@ class SkyvernElement: async def press_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None: await self.get_locator().press_sequentially(text, delay=TEXT_INPUT_DELAY, timeout=timeout) + async def input(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None: + if self.get_tag_name().lower() not in COMMON_INPUT_TAGS: + await self.input_fill(text, timeout=timeout) + return + await self.input_sequentially(text=text, default_timeout=timeout) + async def input_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None: await self.get_locator().fill(text, timeout=timeout)