Implement BitwardenSensitiveInformationParameter (#589)

This commit is contained in:
Kerem Yilmaz
2024-07-11 09:48:14 -07:00
committed by GitHub
parent 87d6e71768
commit 6f88ae31a0
10 changed files with 422 additions and 41 deletions

View File

@@ -29,6 +29,7 @@ class BitwardenConstants(StrEnum):
MASTER_PASSWORD = "BW_MASTER_PASSWORD"
URL = "BW_URL"
BW_COLLECTION_ID = "BW_COLLECTION_ID"
IDENTITY_KEY = "BW_IDENTITY_KEY"
USERNAME = "BW_USERNAME"
PASSWORD = "BW_PASSWORD"
@@ -75,49 +76,10 @@ class BitwardenService:
"""
Get the secret value from the Bitwarden CLI.
"""
# Step 1: Set up environment variables and log in
try:
env = {
"BW_CLIENTID": client_id,
"BW_CLIENTSECRET": client_secret,
"BW_PASSWORD": master_password,
}
login_command = ["bw", "login", "--apikey"]
login_result = BitwardenService.run_command(login_command, env)
BitwardenService.login(client_id, client_secret)
session_key = BitwardenService.unlock(master_password)
# Validate the login result
if login_result.stdout and "You are logged in!" not in login_result.stdout:
raise BitwardenLoginError(
f"Failed to log in. stdout: {login_result.stdout} stderr: {login_result.stderr}"
)
if login_result.stderr and "You are already logged in as" not in login_result.stderr:
raise BitwardenLoginError(
f"Failed to log in. stdout: {login_result.stdout} stderr: {login_result.stderr}"
)
LOG.info("Bitwarden login successful")
# Step 2: Unlock the vault
unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
unlock_result = BitwardenService.run_command(unlock_command, env)
# Validate the unlock result
if unlock_result.stdout and "Your vault is now unlocked!" not in unlock_result.stdout:
raise BitwardenUnlockError(
f"Failed to unlock vault. stdout: {unlock_result.stdout} stderr: {unlock_result.stderr}"
)
# Extract session key
try:
session_key = BitwardenService._extract_session_key(unlock_result.stdout)
except Exception as e:
raise BitwardenUnlockError(f"Unable to extract session key: {str(e)}")
if not session_key:
raise BitwardenUnlockError("Session key is empty.")
# Step 3: Retrieve the items
# Extract the domain from the URL and search for items in Bitwarden with that domain
domain = tldextract.extract(url).domain
list_command = [
@@ -187,6 +149,123 @@ class BitwardenService:
# Step 4: Log out
BitwardenService.logout()
@staticmethod
def get_sensitive_information_from_identity(
client_id: str,
client_secret: str,
master_password: str,
collection_id: str,
identity_key: str,
identity_fields: list[str],
) -> dict[str, str]:
"""
Get the sensitive information from the Bitwarden CLI.
"""
try:
BitwardenService.login(client_id, client_secret)
session_key = BitwardenService.unlock(master_password)
# Step 3: Retrieve the items
list_command = [
"bw",
"list",
"items",
"--search",
identity_key,
"--session",
session_key,
"--collectionid",
collection_id,
]
items_result = BitwardenService.run_command(list_command)
# Parse the items and extract sensitive information
try:
items = json.loads(items_result.stdout)
except json.JSONDecodeError:
raise BitwardenListItemsError("Failed to parse items JSON. Output: " + items_result.stdout)
if not items:
raise BitwardenListItemsError(
f"No items found in Bitwarden for identity key: {identity_key} in collection with ID: {collection_id}"
)
# Filter the identity items
# https://bitwarden.com/help/cli/#create lists the type of the identity items as 4
identity_items = [item for item in items if item["type"] == 4]
if len(identity_items) != 1:
raise BitwardenListItemsError(
f"Expected exactly one identity item, but found {len(identity_items)} items for identity key: {identity_key} in collection with ID: {collection_id}"
)
identity_item = identity_items[0]
sensitive_information: dict[str, str] = {}
for field in identity_fields:
# The identity item may store sensitive information in custom fields or default fields
# Custom fields are prioritized over default fields
# TODO (kerem): Make this case insensitive?
if field in identity_item["fields"]:
sensitive_information[field] = identity_item["fields"][field]["value"]
elif field in identity_item["identity"]:
sensitive_information[field] = identity_item["identity"][field]
return sensitive_information
finally:
# Step 4: Log out
BitwardenService.logout()
@staticmethod
def login(client_id: str, client_secret: str) -> None:
"""
Log in to the Bitwarden CLI.
"""
env = {
"BW_CLIENTID": client_id,
"BW_CLIENTSECRET": client_secret,
}
login_command = ["bw", "login", "--apikey"]
login_result = BitwardenService.run_command(login_command, env)
# Validate the login result
if login_result.stdout and "You are logged in!" not in login_result.stdout:
raise BitwardenLoginError(f"Failed to log in. stdout: {login_result.stdout} stderr: {login_result.stderr}")
if login_result.stderr and "You are already logged in as" not in login_result.stderr:
raise BitwardenLoginError(f"Failed to log in. stdout: {login_result.stdout} stderr: {login_result.stderr}")
LOG.info("Bitwarden login successful")
@staticmethod
def unlock(master_password: str) -> str:
"""
Unlock the Bitwarden CLI.
"""
env = {
"BW_PASSWORD": master_password,
}
unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
unlock_result = BitwardenService.run_command(unlock_command, env)
# Validate the unlock result
if unlock_result.stdout and "Your vault is now unlocked!" not in unlock_result.stdout:
raise BitwardenUnlockError(
f"Failed to unlock vault. stdout: {unlock_result.stdout} stderr: {unlock_result.stderr}"
)
# Extract session key
try:
session_key = BitwardenService._extract_session_key(unlock_result.stdout)
except Exception as e:
raise BitwardenUnlockError(f"Unable to extract session key: {str(e)}")
if not session_key:
raise BitwardenUnlockError("Session key is empty.")
return session_key
@staticmethod
def logout() -> None:
"""