import asyncio import json import os import re import subprocess import urllib.parse from enum import StrEnum import structlog import tldextract from pydantic import BaseModel from skyvern.config import settings from skyvern.exceptions import ( BitwardenAccessDeniedError, BitwardenListItemsError, BitwardenLoginError, BitwardenLogoutError, BitwardenSyncError, BitwardenUnlockError, ) LOG = structlog.get_logger() def is_valid_email(email: str | None) -> bool: if not email: return False pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(pattern, email) is not None class BitwardenConstants(StrEnum): BW_ORGANIZATION_ID = "BW_ORGANIZATION_ID" BW_COLLECTION_IDS = "BW_COLLECTION_IDS" CLIENT_ID = "BW_CLIENT_ID" CLIENT_SECRET = "BW_CLIENT_SECRET" MASTER_PASSWORD = "BW_MASTER_PASSWORD" URL = "BW_URL" BW_COLLECTION_ID = "BW_COLLECTION_ID" IDENTITY_KEY = "BW_IDENTITY_KEY" ITEM_ID = "BW_ITEM_ID" USERNAME = "BW_USERNAME" PASSWORD = "BW_PASSWORD" TOTP = "BW_TOTP" CREDIT_CARD_HOLDER_NAME = "BW_CREDIT_CARD_HOLDER_NAME" CREDIT_CARD_NUMBER = "BW_CREDIT_CARD_NUMBER" CREDIT_CARD_EXPIRATION_MONTH = "BW_CREDIT_CARD_EXPIRATION_MONTH" CREDIT_CARD_EXPIRATION_YEAR = "BW_CREDIT_CARD_EXPIRATION_YEAR" CREDIT_CARD_CVV = "BW_CREDIT_CARD_CVV" CREDIT_CARD_BRAND = "BW_CREDIT_CARD_BRAND" class BitwardenQueryResult(BaseModel): credential: dict[str, str] uris: list[str] class BitwardenService: @staticmethod def run_command( command: list[str], additional_env: dict[str, str] | None = None, timeout: int = 60 ) -> subprocess.CompletedProcess: """ Run a CLI command with the specified additional environment variables and return the result. """ env = os.environ.copy() # Copy the current environment # Make sure node isn't returning warnings. Warnings are sent through stderr and we raise exceptions on stderr. env["NODE_NO_WARNINGS"] = "1" if additional_env: env.update(additional_env) # Update with any additional environment variables try: return subprocess.run(command, capture_output=True, text=True, env=env, timeout=timeout) except subprocess.TimeoutExpired as e: LOG.error(f"Bitwarden command timed out after {timeout} seconds", stdout=e.stdout, stderr=e.stderr) raise e @staticmethod def _extract_session_key(unlock_cmd_output: str) -> str | None: # Split the text by lines lines = unlock_cmd_output.split("\n") # Look for the line containing the BW_SESSION for line in lines: if 'BW_SESSION="' in line: # Find the start and end positions of the session key start = line.find('BW_SESSION="') + len('BW_SESSION="') end = line.rfind('"', start) return line[start:end] return None @staticmethod async def get_secret_value_from_url( client_id: str, client_secret: str, master_password: str, bw_organization_id: str | None, bw_collection_ids: list[str] | None, url: str, collection_id: str | None = None, max_retries: int = settings.BITWARDEN_MAX_RETRIES, timeout: int = settings.BITWARDEN_TIMEOUT_SECONDS, ) -> dict[str, str]: """ Get the secret value from the Bitwarden CLI. """ fail_reasons: list[str] = [] if not bw_organization_id and bw_collection_ids and collection_id not in bw_collection_ids: raise BitwardenAccessDeniedError() for i in range(max_retries): # FIXME: just simply double the timeout for the second try. maybe a better backoff policy when needed timeout = (i + 1) * timeout try: async with asyncio.timeout(timeout): return await BitwardenService._get_secret_value_from_url( client_id=client_id, client_secret=client_secret, master_password=master_password, bw_organization_id=bw_organization_id, bw_collection_ids=bw_collection_ids, url=url, collection_id=collection_id, timeout=timeout, ) except BitwardenAccessDeniedError as e: raise e except Exception as e: LOG.info("Failed to get secret value from Bitwarden", tried_times=i + 1, exc_info=True) fail_reasons.append(f"{type(e).__name__}: {str(e)}") else: raise BitwardenListItemsError( f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}" ) @staticmethod def extract_totp_secret(totp_value: str) -> str: """ Extract the TOTP secret from either a raw secret or a TOTP URI. Args: totp_value: Raw TOTP secret or URI (otpauth://totp/...) Returns: The extracted TOTP secret Example: >>> BitwardenService.extract_totp_secret("AAAAAABBBBBBB") "AAAAAABBBBBBB" >>> BitwardenService.extract_totp_secret("otpauth://totp/user@domain.com?secret=AAAAAABBBBBBB") "AAAAAABBBBBBB" """ if not totp_value: return "" # Handle TOTP URI format if totp_value.startswith("otpauth://"): try: # Parse the URI to extract the secret query = urllib.parse.urlparse(totp_value).query params = dict(urllib.parse.parse_qsl(query)) return params.get("secret", "") except Exception: LOG.error( "Failed to parse TOTP URI", totp_value=totp_value, exc_info=True, ) return "" return totp_value @staticmethod async def _get_secret_value_from_url( client_id: str, client_secret: str, master_password: str, bw_organization_id: str | None, bw_collection_ids: list[str] | None, url: str, collection_id: str | None = None, timeout: int = 60, ) -> dict[str, str]: """ Get the secret value from the Bitwarden CLI. """ try: BitwardenService.login(client_id, client_secret) BitwardenService.sync() session_key = BitwardenService.unlock(master_password) # Extract the domain from the URL and search for items in Bitwarden with that domain extract_url = tldextract.extract(url) domain = extract_url.domain list_command = [ "bw", "list", "items", "--search", domain, "--session", session_key, ] if bw_organization_id: LOG.info( "Organization ID is provided, filtering items by organization ID", bw_organization_id=bw_organization_id, ) list_command.extend(["--organizationid", bw_organization_id]) elif collection_id: LOG.info("Collection ID is provided, filtering items by collection ID", collection_id=collection_id) list_command.extend(["--collectionid", collection_id]) else: LOG.error("No collection ID or organization ID provided -- this is required") raise BitwardenListItemsError("No collection ID or organization ID provided -- this is required") items_result = BitwardenService.run_command(list_command, timeout=timeout) if items_result.stderr and "Event post failed" not in items_result.stderr: raise BitwardenListItemsError(items_result.stderr) # Parse the items and extract credentials try: items = json.loads(items_result.stdout) except json.JSONDecodeError: raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout) # Since Bitwarden can't AND multiple filters, we only use organization id in the list command # but we still need to filter the items by collection id here if bw_organization_id and collection_id: filtered_items = [] for item in items: if "collectionIds" in item and collection_id in item["collectionIds"]: filtered_items.append(item) items = filtered_items if not items: collection_id_str = f" in collection with ID: {collection_id}" if collection_id else "" raise BitwardenListItemsError(f"No items found in Bitwarden for URL: {url}{collection_id_str}") bitwarden_result: list[BitwardenQueryResult] = [] for item in items: if "login" not in item: continue login = item["login"] totp = BitwardenService.extract_totp_secret(login.get("totp", "")) bitwarden_result.append( BitwardenQueryResult( credential={ BitwardenConstants.USERNAME: login.get("username", ""), BitwardenConstants.PASSWORD: login.get("password", ""), BitwardenConstants.TOTP: totp, }, uris=[uri.get("uri") for uri in login.get("uris", []) if "uri" in uri], ) ) if len(bitwarden_result) == 0: return {} if len(bitwarden_result) == 1: return bitwarden_result[0].credential # Choose multiple credentials according to the defined rule, # if no cred matches the rule, return the first one. # TODO: For now hard code to choose the first matched result for single_result in bitwarden_result: # check the username is a valid email if is_valid_email(single_result.credential.get(BitwardenConstants.USERNAME)): for uri in single_result.uris: # check if the register_domain is the same if extract_url.registered_domain == tldextract.extract(uri).registered_domain: return single_result.credential LOG.warning("No credential in Bitwarden matches the rule, returning the first match") return bitwarden_result[0].credential finally: # Step 4: Log out BitwardenService.logout() @staticmethod async def get_sensitive_information_from_identity( client_id: str, client_secret: str, master_password: str, bw_organization_id: str | None, bw_collection_ids: list[str] | None, collection_id: str, identity_key: str, identity_fields: list[str], remaining_retries: int = settings.BITWARDEN_MAX_RETRIES, timeout: int = settings.BITWARDEN_TIMEOUT_SECONDS, fail_reasons: list[str] = [], ) -> dict[str, str]: """ Get the secret value from the Bitwarden CLI. """ if not bw_organization_id and bw_collection_ids and collection_id not in bw_collection_ids: raise BitwardenAccessDeniedError() try: async with asyncio.timeout(timeout): return await BitwardenService._get_sensitive_information_from_identity( client_id=client_id, client_secret=client_secret, master_password=master_password, bw_organization_id=bw_organization_id, bw_collection_ids=bw_collection_ids, collection_id=collection_id, identity_key=identity_key, identity_fields=identity_fields, ) except BitwardenAccessDeniedError as e: raise e except Exception as e: if remaining_retries <= 0: raise BitwardenListItemsError( f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}" ) remaining_retries -= 1 LOG.info("Retrying to get sensitive information from Bitwarden", remaining_retries=remaining_retries) return await BitwardenService.get_sensitive_information_from_identity( client_id=client_id, client_secret=client_secret, master_password=master_password, bw_organization_id=bw_organization_id, bw_collection_ids=bw_collection_ids, collection_id=collection_id, identity_key=identity_key, identity_fields=identity_fields, remaining_retries=remaining_retries, # Double the timeout for the next retry timeout=timeout * 2, fail_reasons=fail_reasons + [f"{type(e).__name__}: {str(e)}"], ) @staticmethod async def _get_sensitive_information_from_identity( client_id: str, client_secret: str, master_password: str, collection_id: str, identity_key: str, identity_fields: list[str], bw_organization_id: str | None, bw_collection_ids: list[str] | None, ) -> dict[str, str]: """ Get the sensitive information from the Bitwarden CLI. """ try: BitwardenService.login(client_id, client_secret) BitwardenService.sync() session_key = BitwardenService.unlock(master_password) if not bw_organization_id and not collection_id: raise BitwardenAccessDeniedError() # Step 3: Retrieve the items list_command = [ "bw", "list", "items", "--search", identity_key, "--session", session_key, "--collectionid", collection_id, ] if bw_organization_id: list_command.extend(["--organizationid", bw_organization_id]) items_result = BitwardenService.run_command(list_command) # Parse the items and extract sensitive information try: items = json.loads(items_result.stdout) except json.JSONDecodeError: raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout) if not items: raise BitwardenListItemsError( f"No items found in Bitwarden for identity key: {identity_key} in collection with ID: {collection_id}" ) # Filter the identity items # https://bitwarden.com/help/cli/#create lists the type of the identity items as 4 # We may want to filter it by type in the future, but for now we just take the first item and check its identity fields # identity_items = [item for item in items if item["type"] == 4] identity_item = items[0] sensitive_information: dict[str, str] = {} for field in identity_fields: # The identity item may store sensitive information in custom fields or default fields # Custom fields are prioritized over default fields # TODO (kerem): Make this case insensitive? for item in identity_item["fields"]: if item["name"] == field: sensitive_information[field] = item["value"] break if ( "identity" in identity_item and field in identity_item["identity"] and field not in sensitive_information ): sensitive_information[field] = identity_item["identity"][field] return sensitive_information finally: # Step 4: Log out BitwardenService.logout() @staticmethod def login(client_id: str, client_secret: str) -> None: """ Log in to the Bitwarden CLI. """ env = { "BW_CLIENTID": client_id, "BW_CLIENTSECRET": client_secret, } login_command = ["bw", "login", "--apikey"] login_result = BitwardenService.run_command(login_command, env) # Validate the login result if login_result.stdout and "You are logged in!" not in login_result.stdout: raise BitwardenLoginError(f"Failed to log in. stdout: {login_result.stdout} stderr: {login_result.stderr}") if login_result.stderr and "You are already logged in as" not in login_result.stderr: raise BitwardenLoginError(f"Failed to log in. stdout: {login_result.stdout} stderr: {login_result.stderr}") LOG.info("Bitwarden login successful") @staticmethod def unlock(master_password: str) -> str: """ Unlock the Bitwarden CLI. """ env = { "BW_PASSWORD": master_password, } unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"] unlock_result = BitwardenService.run_command(unlock_command, env) # Validate the unlock result if unlock_result.stdout and "Your vault is now unlocked!" not in unlock_result.stdout: raise BitwardenUnlockError( f"Failed to unlock vault. stdout: {unlock_result.stdout} stderr: {unlock_result.stderr}" ) # Extract session key try: session_key = BitwardenService._extract_session_key(unlock_result.stdout) except Exception as e: raise BitwardenUnlockError(f"Unable to extract session key: {str(e)}") if not session_key: raise BitwardenUnlockError("Session key is empty.") return session_key @staticmethod def sync() -> None: """ Sync the Bitwarden CLI. """ sync_command = ["bw", "sync"] LOG.info("Bitwarden CLI sync started") sync_result = BitwardenService.run_command(sync_command) LOG.info("Bitwarden CLI sync completed") if sync_result.stderr: raise BitwardenSyncError(sync_result.stderr) @staticmethod def logout() -> None: """ Log out of the Bitwarden CLI. """ logout_command = ["bw", "logout"] logout_result = BitwardenService.run_command(logout_command) if logout_result.stderr and "You are not logged in." not in logout_result.stderr: raise BitwardenLogoutError(logout_result.stderr) @staticmethod async def _get_credit_card_data( client_id: str, client_secret: str, master_password: str, bw_organization_id: str | None, bw_collection_ids: list[str] | None, collection_id: str, item_id: str, ) -> dict[str, str]: """ Get the credit card data from the Bitwarden CLI. """ try: BitwardenService.login(client_id, client_secret) BitwardenService.sync() session_key = BitwardenService.unlock(master_password) # Step 3: Get the item get_command = [ "bw", "get", "item", item_id, "--session", session_key, ] # Bitwarden CLI doesn't support filtering by organization ID or collection ID for credit card data so we just raise an error if no collection ID or organization ID is provided if not bw_organization_id and not collection_id: LOG.error("No collection ID or organization ID provided -- this is required") raise BitwardenAccessDeniedError() item_result = BitwardenService.run_command(get_command) # Parse the item and extract credit card data try: item = json.loads(item_result.stdout) except json.JSONDecodeError: raise BitwardenListItemsError(f"Failed to parse item JSON for item ID: {item_id}") if not item: raise BitwardenListItemsError(f"No item found in Bitwarden for item ID: {item_id}") # Check if the bw_organization_id matches if bw_organization_id: item_organization_id = item.get("organizationId") if item_organization_id != bw_organization_id: raise BitwardenAccessDeniedError() if bw_collection_ids: item_collection_ids = item.get("collectionIds") if item_collection_ids and collection_id not in bw_collection_ids: raise BitwardenAccessDeniedError() # Check if the item is a credit card # https://bitwarden.com/help/cli/#create lists the type of the credit card items as 3 if item["type"] != 3: raise BitwardenListItemsError(f"Item with ID: {item_id} is not a credit card type") credit_card_data = item["card"] mapped_credit_card_data: dict[str, str] = { BitwardenConstants.CREDIT_CARD_HOLDER_NAME: credit_card_data["cardholderName"], BitwardenConstants.CREDIT_CARD_NUMBER: credit_card_data["number"], BitwardenConstants.CREDIT_CARD_EXPIRATION_MONTH: credit_card_data["expMonth"], BitwardenConstants.CREDIT_CARD_EXPIRATION_YEAR: credit_card_data["expYear"], BitwardenConstants.CREDIT_CARD_CVV: credit_card_data["code"], BitwardenConstants.CREDIT_CARD_BRAND: credit_card_data["brand"], } return mapped_credit_card_data finally: # Step 4: Log out BitwardenService.logout() @staticmethod async def get_credit_card_data( client_id: str, client_secret: str, master_password: str, bw_organization_id: str | None, bw_collection_ids: list[str] | None, collection_id: str, item_id: str, remaining_retries: int = settings.BITWARDEN_MAX_RETRIES, fail_reasons: list[str] = [], ) -> dict[str, str]: """ Get the credit card data from the Bitwarden CLI. """ try: async with asyncio.timeout(settings.BITWARDEN_TIMEOUT_SECONDS): return await BitwardenService._get_credit_card_data( client_id=client_id, client_secret=client_secret, master_password=master_password, bw_organization_id=bw_organization_id, bw_collection_ids=bw_collection_ids, collection_id=collection_id, item_id=item_id, ) except BitwardenAccessDeniedError as e: raise e except Exception as e: if remaining_retries <= 0: raise BitwardenListItemsError( f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}" ) remaining_retries -= 1 LOG.info("Retrying to get credit card data from Bitwarden", remaining_retries=remaining_retries) return await BitwardenService.get_credit_card_data( client_id=client_id, client_secret=client_secret, master_password=master_password, bw_organization_id=bw_organization_id, bw_collection_ids=bw_collection_ids, collection_id=collection_id, item_id=item_id, remaining_retries=remaining_retries, fail_reasons=fail_reasons + [f"{type(e).__name__}: {str(e)}"], )