generate totp code right before input (#2608)

This commit is contained in:
Shuchang Zheng
2025-06-05 09:41:06 -07:00
committed by GitHub
parent 71bba769ef
commit 019e244cd8
2 changed files with 31 additions and 9 deletions

View File

@@ -96,11 +96,10 @@ from skyvern.webeye.scraper.scraper import (
json_to_html,
trim_element_tree,
)
from skyvern.webeye.utils.dom import DomUtil, InteractiveElement, SkyvernElement
from skyvern.webeye.utils.dom import COMMON_INPUT_TAGS, DomUtil, InteractiveElement, SkyvernElement
from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()
COMMON_INPUT_TAGS = {"input", "textarea", "select"}
class CustomSingleSelectResult:
@@ -822,6 +821,9 @@ async def handle_input_text_action(
if text is None:
return [ActionFailure(FailedToFetchSecret())]
is_totp_value = text == BitwardenConstants.TOTP
is_secret_value = text != action.text
# dynamically validate the attr, since it could change into enabled after the previous actions
if await skyvern_element.is_disabled(dynamic=True):
LOG.warning(
@@ -984,7 +986,7 @@ async def handle_input_text_action(
await skyvern_element.get_locator().focus(timeout=timeout)
# check the phone number format when type=tel and the text is not a secret value
if await skyvern_element.get_attr("type") == "tel" and text == action.text:
if not is_secret_value and await skyvern_element.get_attr("type") == "tel":
try:
text = await check_phone_number_format(
value=text,
@@ -1017,6 +1019,8 @@ async def handle_input_text_action(
if not class_name or "blinking-cursor" not in class_name:
return [ActionFailure(InvalidElementForTextInput(element_id=action.element_id, tag_name=tag_name))]
if is_totp_value:
text = generate_totp_value(task=task, parameter=action.text)
await skyvern_element.press_fill(text=text)
return [ActionSuccess()]
@@ -1039,6 +1043,12 @@ async def handle_input_text_action(
step_id=step.step_id,
)
if is_totp_value:
LOG.info("Skipping the auto completion logic since it's a TOTP input")
text = generate_totp_value(task=task, parameter=action.text)
await skyvern_element.input(text)
return [ActionSuccess()]
try:
# TODO: not sure if this case will trigger auto-completion
if tag_name not in COMMON_INPUT_TAGS:
@@ -1840,15 +1850,20 @@ async def get_actual_value_of_parameter_if_secret(task: Task, parameter: str) ->
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id)
secret_value = workflow_run_context.get_original_secret_value_or_none(parameter)
if secret_value == BitwardenConstants.TOTP:
totp_secret_key = workflow_run_context.totp_secret_value_key(parameter)
totp_secret = workflow_run_context.get_original_secret_value_or_none(totp_secret_key)
totp_secret_no_whitespace = "".join(totp_secret.split())
secret_value = pyotp.TOTP(totp_secret_no_whitespace).now()
return secret_value if secret_value is not None else parameter
def generate_totp_value(task: Task, parameter: str) -> str:
if task.workflow_run_id is None:
return parameter
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id)
totp_secret_key = workflow_run_context.totp_secret_value_key(parameter)
totp_secret = workflow_run_context.get_original_secret_value_or_none(totp_secret_key)
totp_secret_no_whitespace = "".join(totp_secret.split())
return pyotp.TOTP(totp_secret_no_whitespace).now()
async def chain_click(
task: Task,
scraped_page: ScrapedPage,

View File

@@ -28,6 +28,7 @@ from skyvern.webeye.scraper.scraper import IncrementalScrapePage, ScrapedPage, j
from skyvern.webeye.utils.page import SkyvernFrame
LOG = structlog.get_logger()
COMMON_INPUT_TAGS = {"input", "textarea", "select"}
TEXT_INPUT_DELAY = 10 # 10ms between each character input
TEXT_PRESS_MAX_LENGTH = 20
@@ -590,6 +591,12 @@ class SkyvernElement:
async def press_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.get_locator().press_sequentially(text, delay=TEXT_INPUT_DELAY, timeout=timeout)
async def input(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
if self.get_tag_name().lower() not in COMMON_INPUT_TAGS:
await self.input_fill(text, timeout=timeout)
return
await self.input_sequentially(text=text, default_timeout=timeout)
async def input_fill(self, text: str, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS) -> None:
await self.get_locator().fill(text, timeout=timeout)