support uri format otp secrete (#3126)
This commit is contained in:
@@ -2,7 +2,6 @@ import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import urllib.parse
|
||||
from enum import IntEnum, StrEnum
|
||||
from typing import Tuple
|
||||
|
||||
@@ -32,6 +31,7 @@ from skyvern.forge.sdk.schemas.credentials import (
|
||||
CreditCardCredential,
|
||||
PasswordCredential,
|
||||
)
|
||||
from skyvern.forge.sdk.services.credentials import parse_totp_secret
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
BITWARDEN_SERVER_BASE_URL = f"{settings.BITWARDEN_SERVER}:{settings.BITWARDEN_SERVER_PORT or 8002}"
|
||||
@@ -245,25 +245,7 @@ class BitwardenService:
|
||||
>>> BitwardenService.extract_totp_secret("otpauth://totp/user@domain.com?secret=AAAAAABBBBBBB")
|
||||
"AAAAAABBBBBBB"
|
||||
"""
|
||||
if not totp_value:
|
||||
return ""
|
||||
|
||||
# Handle TOTP URI format
|
||||
if totp_value.startswith("otpauth://"):
|
||||
try:
|
||||
# Parse the URI to extract the secret
|
||||
query = urllib.parse.urlparse(totp_value).query
|
||||
params = dict(urllib.parse.parse_qsl(query))
|
||||
return params.get("secret", "")
|
||||
except Exception:
|
||||
LOG.error(
|
||||
"Failed to parse TOTP URI",
|
||||
totp_value=totp_value,
|
||||
exc_info=True,
|
||||
)
|
||||
return ""
|
||||
|
||||
return totp_value
|
||||
return parse_totp_secret(totp_value)
|
||||
|
||||
@staticmethod
|
||||
async def _get_secret_value_from_url(
|
||||
|
||||
@@ -1,10 +1,39 @@
|
||||
import logging
|
||||
import re
|
||||
from enum import StrEnum
|
||||
from urllib.parse import unquote
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
import pyotp
|
||||
import structlog
|
||||
|
||||
from skyvern.exceptions import NoTOTPSecretFound
|
||||
|
||||
LOG = structlog.get_logger()
|
||||
|
||||
|
||||
class OnePasswordConstants(StrEnum):
|
||||
"""Constants for 1Password integration."""
|
||||
|
||||
TOTP = "OP_TOTP" # Special value to indicate a TOTP code
|
||||
|
||||
|
||||
def parse_totp_secret(totp_secret: str) -> str:
|
||||
if not totp_secret:
|
||||
return ""
|
||||
|
||||
totp_secret_no_whitespace = "".join(totp_secret.split())
|
||||
if len(totp_secret_no_whitespace) == 32:
|
||||
return totp_secret_no_whitespace
|
||||
|
||||
LOG.info("TOTP secret key is not 32 characters, try to parse it from URI format")
|
||||
try:
|
||||
totp_secret = pyotp.parse_uri(totp_secret_no_whitespace).secret
|
||||
totp_secret_no_whitespace = "".join(totp_secret.split())
|
||||
return totp_secret_no_whitespace
|
||||
except Exception:
|
||||
LOG.warning("Failed to parse TOTP secret key from URI format, going to extract secret by regex", exc_info=True)
|
||||
m = re.search(r"(?i)(?:^|[?&])secret=([^&#]+)", unquote(totp_secret_no_whitespace))
|
||||
if m is None:
|
||||
raise NoTOTPSecretFound()
|
||||
totp_secret = m.group(1)
|
||||
totp_secret_no_whitespace = "".join(totp_secret.split())
|
||||
return totp_secret_no_whitespace
|
||||
|
||||
Reference in New Issue
Block a user