From 18cebb21e172dcde94f59689ce5fc12c7b9d38f9 Mon Sep 17 00:00:00 2001 From: LawyZheng Date: Wed, 9 Oct 2024 02:38:34 +0800 Subject: [PATCH] fix bitwarden credential search (#932) --- skyvern/forge/sdk/services/bitwarden.py | 52 ++++++++++++++++--------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/skyvern/forge/sdk/services/bitwarden.py b/skyvern/forge/sdk/services/bitwarden.py index 444c3745..93f8c60f 100644 --- a/skyvern/forge/sdk/services/bitwarden.py +++ b/skyvern/forge/sdk/services/bitwarden.py @@ -7,6 +7,7 @@ from enum import StrEnum import structlog import tldextract +from pydantic import BaseModel from skyvern.config import settings from skyvern.exceptions import ( @@ -53,6 +54,11 @@ class BitwardenConstants(StrEnum): CREDIT_CARD_BRAND = "BW_CREDIT_CARD_BRAND" +class BitwardenQueryResult(BaseModel): + credential: dict[str, str] + uris: list[str] + + class BitwardenService: @staticmethod def run_command(command: list[str], additional_env: dict[str, str] | None = None) -> subprocess.CompletedProcess: @@ -157,8 +163,9 @@ class BitwardenService: BitwardenService.sync() session_key = BitwardenService.unlock(master_password) - # Extract the domain(with suffix) from the URL and search for items in Bitwarden with that domain - domain = tldextract.extract(url).registered_domain + # Extract the domain from the URL and search for items in Bitwarden with that domain + extract_url = tldextract.extract(url) + domain = extract_url.domain list_command = [ "bw", "list", @@ -217,33 +224,40 @@ class BitwardenService: e=BitwardenTOTPError(totp_result.stderr), ) totp_code = totp_result.stdout - - credentials: list[dict[str, str]] = [ - { - BitwardenConstants.USERNAME: item["login"]["username"], - BitwardenConstants.PASSWORD: item["login"]["password"], - } + bitwarden_result: list[BitwardenQueryResult] = [ + BitwardenQueryResult( + credential={ + BitwardenConstants.USERNAME: item.get("login", {}).get("username", ""), + BitwardenConstants.PASSWORD: item.get("login", {}).get("password", ""), + }, + uris=[uri.get("uri") for uri in item.get("login", {}).get("uris", []) if "uri" in uri], + ) for item in items if "login" in item ] - if totp_code: - for credential in credentials: - credential[BitwardenConstants.TOTP] = totp_code - if len(credentials) == 0: + if totp_code: + for single_result in bitwarden_result: + single_result.credential[BitwardenConstants.TOTP] = totp_code + + if len(bitwarden_result) == 0: return {} - if len(credentials) == 1: - return credentials[0] + if len(bitwarden_result) == 1: + return bitwarden_result[0].credential # Choose multiple credentials according to the defined rule, # if no cred matches the rule, return the first one. - # TODO: For now hard code to choose the first valid email username - for cred in credentials: - if is_valid_email(cred.get(BitwardenConstants.USERNAME)): - return cred + # TODO: For now hard code to choose the first matched result + for single_result in bitwarden_result: + # check the username is a valid email + if is_valid_email(single_result.credential.get(BitwardenConstants.USERNAME)): + for uri in single_result.uris: + # check if the register_domain is the same + if extract_url.registered_domain == tldextract.extract(uri).registered_domain: + return single_result.credential LOG.warning("No credential in Bitwarden matches the rule, returning the first match") - return credentials[0] + return bitwarden_result[0].credential finally: # Step 4: Log out BitwardenService.logout()