From 401d304dd087687c086fe4951921d763695dbacb Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 2 Jan 2025 08:07:36 -0800 Subject: [PATCH] Handle 2FA codes that have the otpauth URIs (#1465) --- skyvern/forge/sdk/services/bitwarden.py | 66 ++++++++++++++++++++----- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/skyvern/forge/sdk/services/bitwarden.py b/skyvern/forge/sdk/services/bitwarden.py index 0ac7fce2..bf4fd4e6 100644 --- a/skyvern/forge/sdk/services/bitwarden.py +++ b/skyvern/forge/sdk/services/bitwarden.py @@ -3,6 +3,7 @@ import json import os import re import subprocess +import urllib.parse from enum import StrEnum import structlog @@ -137,6 +138,43 @@ class BitwardenService: f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}" ) + @staticmethod + def extract_totp_secret(totp_value: str) -> str: + """ + Extract the TOTP secret from either a raw secret or a TOTP URI. + + Args: + totp_value: Raw TOTP secret or URI (otpauth://totp/...) + + Returns: + The extracted TOTP secret + + Example: + >>> BitwardenService.extract_totp_secret("AAAAAABBBBBBB") + "AAAAAABBBBBBB" + >>> BitwardenService.extract_totp_secret("otpauth://totp/user@domain.com?secret=AAAAAABBBBBBB") + "AAAAAABBBBBBB" + """ + if not totp_value: + return "" + + # Handle TOTP URI format + if totp_value.startswith("otpauth://"): + try: + # Parse the URI to extract the secret + query = urllib.parse.urlparse(totp_value).query + params = dict(urllib.parse.parse_qsl(query)) + return params.get("secret", "") + except Exception: + LOG.error( + "Failed to parse TOTP URI", + totp_value=totp_value, + exc_info=True, + ) + return "" + + return totp_value + @staticmethod async def _get_secret_value_from_url( client_id: str, @@ -204,18 +242,24 @@ class BitwardenService: collection_id_str = f" in collection with ID: {collection_id}" if collection_id else "" raise BitwardenListItemsError(f"No items found in Bitwarden for URL: {url}{collection_id_str}") - bitwarden_result: list[BitwardenQueryResult] = [ - BitwardenQueryResult( - credential={ - BitwardenConstants.USERNAME: item.get("login", {}).get("username", ""), - BitwardenConstants.PASSWORD: item.get("login", {}).get("password", ""), - BitwardenConstants.TOTP: item.get("login", {}).get("totp", "") or "", - }, - uris=[uri.get("uri") for uri in item.get("login", {}).get("uris", []) if "uri" in uri], + bitwarden_result: list[BitwardenQueryResult] = [] + for item in items: + if "login" not in item: + continue + + login = item["login"] + totp = BitwardenService.extract_totp_secret(login.get("totp", "")) + + bitwarden_result.append( + BitwardenQueryResult( + credential={ + BitwardenConstants.USERNAME: login.get("username", ""), + BitwardenConstants.PASSWORD: login.get("password", ""), + BitwardenConstants.TOTP: totp, + }, + uris=[uri.get("uri") for uri in login.get("uris", []) if "uri" in uri], + ) ) - for item in items - if "login" in item - ] if len(bitwarden_result) == 0: return {}