Pedro/support_multi_field_6_digit_totp (#3622)
This commit is contained in:
@@ -1202,14 +1202,47 @@ class ForgeAgent:
|
||||
# Do not verify the complete action when complete_verification is False
|
||||
# set verified to True will skip the completion verification
|
||||
action.verified = True
|
||||
|
||||
# Pass TOTP secret to handler for multi-field TOTP sequences
|
||||
# Handler will generate TOTP at execution time
|
||||
if (
|
||||
action.action_type == ActionType.INPUT_TEXT
|
||||
and self._is_multi_field_totp_sequence(actions)
|
||||
and (totp_secret := skyvern_context.ensure_context().totp_codes.get(f"{task.task_id}_secret"))
|
||||
):
|
||||
# Pass TOTP secret to handler for execution-time generation
|
||||
action.totp_timing_info = {
|
||||
"is_totp_sequence": True,
|
||||
"action_index": action_idx,
|
||||
"totp_secret": totp_secret,
|
||||
"is_retry": step.retry_index > 0,
|
||||
}
|
||||
|
||||
results = await ActionHandler.handle_action(scraped_page, task, step, current_page, action)
|
||||
await app.AGENT_FUNCTION.post_action_execution()
|
||||
detailed_agent_step_output.actions_and_results[action_idx] = (
|
||||
action,
|
||||
results,
|
||||
)
|
||||
# wait random time between actions to avoid detection
|
||||
await asyncio.sleep(random.uniform(0.5, 1.0))
|
||||
|
||||
# Determine wait time between actions
|
||||
wait_time = random.uniform(0.5, 1.0)
|
||||
|
||||
# For multi-field TOTP sequences, use zero delay between all digits for fast execution
|
||||
if action.action_type == ActionType.INPUT_TEXT and self._is_multi_field_totp_sequence(actions):
|
||||
current_text = action.text if hasattr(action, "text") else None
|
||||
|
||||
if current_text and len(current_text) == 1 and current_text.isdigit():
|
||||
# Zero delay between all TOTP digits for fast execution
|
||||
wait_time = 0.0
|
||||
LOG.debug(
|
||||
"TOTP: zero delay for digit",
|
||||
task_id=task.task_id,
|
||||
action_idx=action_idx,
|
||||
digit=current_text,
|
||||
)
|
||||
|
||||
await asyncio.sleep(wait_time)
|
||||
await self.record_artifacts_after_action(task, step, browser_state, engine)
|
||||
for result in results:
|
||||
result.step_retry_number = step.retry_index
|
||||
@@ -1306,6 +1339,21 @@ class ForgeAgent:
|
||||
action_results=action_results,
|
||||
)
|
||||
|
||||
# Clean up TOTP cache after multi-field TOTP sequence completion
|
||||
if self._is_multi_field_totp_sequence(actions):
|
||||
context = skyvern_context.ensure_context()
|
||||
cache_key = f"{task.task_id}_totp_cache"
|
||||
if cache_key in context.totp_codes:
|
||||
context.totp_codes.pop(cache_key)
|
||||
LOG.debug(
|
||||
"Cleaned up TOTP cache after multi-field sequence completion",
|
||||
task_id=task.task_id,
|
||||
)
|
||||
|
||||
secret_key = f"{task.task_id}_secret"
|
||||
if secret_key in context.totp_codes:
|
||||
context.totp_codes.pop(secret_key)
|
||||
|
||||
# Check if Skyvern already returned a complete action, if so, don't run user goal check
|
||||
has_decisive_action = False
|
||||
if detailed_agent_step_output and detailed_agent_step_output.actions_and_results:
|
||||
@@ -2065,7 +2113,7 @@ class ForgeAgent:
|
||||
await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else starting_url
|
||||
)
|
||||
final_navigation_payload = self._build_navigation_payload(
|
||||
task, expire_verification_code=expire_verification_code
|
||||
task, expire_verification_code=expire_verification_code, step=step, scraped_page=scraped_page
|
||||
)
|
||||
|
||||
task_type = task.task_type if task.task_type else TaskType.general
|
||||
@@ -2167,12 +2215,124 @@ class ForgeAgent:
|
||||
|
||||
return full_prompt, use_caching
|
||||
|
||||
def _should_process_totp(self, scraped_page: ScrapedPage | None) -> bool:
|
||||
"""Detect TOTP pages by checking for multiple input fields or verification keywords."""
|
||||
if not scraped_page:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Count input fields that could be for TOTP (more flexible than maxlength="1")
|
||||
input_fields = [
|
||||
element
|
||||
for element in scraped_page.elements
|
||||
if element.get("tagName", "").lower() == "input"
|
||||
and element.get("attributes", {}).get("type", "text").lower() in ["text", "number", "tel"]
|
||||
]
|
||||
|
||||
# Check for multiple input fields (potential multi-field TOTP)
|
||||
if len(input_fields) >= 4:
|
||||
# Additional check: look for patterns that suggest multi-field TOTP
|
||||
# Check if inputs are close together or have similar attributes
|
||||
has_maxlength_1 = any(elem.get("attributes", {}).get("maxlength") == "1" for elem in input_fields)
|
||||
|
||||
# Check for input fields with numeric patterns (type="number", pattern for digits)
|
||||
has_numeric_patterns = any(
|
||||
elem.get("attributes", {}).get("type") == "number"
|
||||
or elem.get("attributes", {}).get("pattern", "").isdigit()
|
||||
or "digit" in elem.get("attributes", {}).get("pattern", "").lower()
|
||||
for elem in input_fields
|
||||
)
|
||||
|
||||
if has_maxlength_1 or has_numeric_patterns:
|
||||
return True
|
||||
|
||||
# Check for TOTP-related keywords in page content
|
||||
page_text = scraped_page.html.lower() if scraped_page.html else ""
|
||||
totp_keywords = [
|
||||
"verification code",
|
||||
"authentication code",
|
||||
"security code",
|
||||
"2fa",
|
||||
"two-factor",
|
||||
"totp",
|
||||
"authenticator",
|
||||
"verification",
|
||||
"enter code",
|
||||
"verification number",
|
||||
"security number",
|
||||
]
|
||||
|
||||
keyword_matches = sum(1 for keyword in totp_keywords if keyword in page_text)
|
||||
|
||||
# If we have multiple TOTP keywords and multiple input fields, likely TOTP
|
||||
if keyword_matches >= 2 and len(input_fields) >= 6:
|
||||
return True
|
||||
|
||||
# Strong single keyword match with multiple inputs
|
||||
strong_keywords = ["verification code", "authentication code", "2fa", "two-factor"]
|
||||
if any(keyword in page_text for keyword in strong_keywords) and len(input_fields) >= 3:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _is_multi_field_totp_sequence(self, actions: list) -> bool:
|
||||
"""
|
||||
Check if the action sequence represents a multi-field TOTP input (6 single-digit fields).
|
||||
|
||||
Args:
|
||||
actions: List of actions to analyze
|
||||
|
||||
Returns:
|
||||
bool: True if this is a multi-field TOTP sequence
|
||||
"""
|
||||
# Must have at least 4 actions (minimum for TOTP)
|
||||
if len(actions) < 4:
|
||||
return False
|
||||
|
||||
# Check if we have multiple consecutive single-digit INPUT_TEXT actions
|
||||
consecutive_single_digits = 0
|
||||
max_consecutive = 0
|
||||
|
||||
for action in actions:
|
||||
if (
|
||||
action.action_type == ActionType.INPUT_TEXT
|
||||
and hasattr(action, "text")
|
||||
and action.text
|
||||
and len(action.text) == 1
|
||||
and action.text.isdigit()
|
||||
):
|
||||
consecutive_single_digits += 1
|
||||
max_consecutive = max(max_consecutive, consecutive_single_digits)
|
||||
else:
|
||||
# If we hit a non-single-digit action, reset consecutive counter
|
||||
consecutive_single_digits = 0
|
||||
|
||||
# Consider it a multi-field TOTP if we have 4+ consecutive single-digit inputs
|
||||
# This is more reliable than just counting total single digits
|
||||
# We use 4+ as the threshold to avoid false positives with single TOTP fields
|
||||
is_multi_field_totp = max_consecutive >= 4
|
||||
|
||||
if is_multi_field_totp:
|
||||
LOG.debug(
|
||||
"Detected multi-field TOTP sequence",
|
||||
max_consecutive=max_consecutive,
|
||||
total_actions=len(actions),
|
||||
)
|
||||
|
||||
return is_multi_field_totp
|
||||
|
||||
def _build_navigation_payload(
|
||||
self,
|
||||
task: Task,
|
||||
expire_verification_code: bool = False,
|
||||
step: Step | None = None,
|
||||
scraped_page: ScrapedPage | None = None,
|
||||
) -> dict[str, Any] | list | str | None:
|
||||
final_navigation_payload = task.navigation_payload
|
||||
|
||||
current_context = skyvern_context.ensure_context()
|
||||
verification_code = current_context.totp_codes.get(task.task_id)
|
||||
if (task.totp_verification_url or task.totp_identifier) and verification_code:
|
||||
@@ -2190,6 +2350,32 @@ class ForgeAgent:
|
||||
)
|
||||
if expire_verification_code:
|
||||
current_context.totp_codes.pop(task.task_id)
|
||||
|
||||
# Store TOTP secrets and provide placeholder TOTP for LLM to see format
|
||||
# Only when on a TOTP page to avoid premature processing
|
||||
if (
|
||||
task.workflow_run_id
|
||||
and step
|
||||
and isinstance(final_navigation_payload, dict)
|
||||
and self._should_process_totp(scraped_page)
|
||||
):
|
||||
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(task.workflow_run_id)
|
||||
|
||||
for key, value in list(final_navigation_payload.items()):
|
||||
if isinstance(value, dict) and "totp" in value:
|
||||
totp_placeholder = value.get("totp")
|
||||
if totp_placeholder and isinstance(totp_placeholder, str):
|
||||
totp_secret_key = workflow_run_context.totp_secret_value_key(totp_placeholder)
|
||||
totp_secret = workflow_run_context.get_original_secret_value_or_none(totp_secret_key)
|
||||
|
||||
if totp_secret:
|
||||
# Store TOTP secret for handler to use during execution
|
||||
current_context = skyvern_context.ensure_context()
|
||||
current_context.totp_codes[f"{task.task_id}_secret"] = totp_secret
|
||||
|
||||
# Send a placeholder TOTP for the LLM to see the format
|
||||
final_navigation_payload[key]["totp"] = "123456"
|
||||
|
||||
return final_navigation_payload
|
||||
|
||||
async def _get_action_results(self, task: Task, current_step: Step | None = None) -> str:
|
||||
|
||||
Reference in New Issue
Block a user