Credit Card Parameter (#903)
This commit is contained in:
@@ -39,11 +39,19 @@ class BitwardenConstants(StrEnum):
|
||||
URL = "BW_URL"
|
||||
BW_COLLECTION_ID = "BW_COLLECTION_ID"
|
||||
IDENTITY_KEY = "BW_IDENTITY_KEY"
|
||||
ITEM_ID = "BW_ITEM_ID"
|
||||
|
||||
USERNAME = "BW_USERNAME"
|
||||
PASSWORD = "BW_PASSWORD"
|
||||
TOTP = "BW_TOTP"
|
||||
|
||||
CREDIT_CARD_HOLDER_NAME = "BW_CREDIT_CARD_HOLDER_NAME"
|
||||
CREDIT_CARD_NUMBER = "BW_CREDIT_CARD_NUMBER"
|
||||
CREDIT_CARD_EXPIRATION_MONTH = "BW_CREDIT_CARD_EXPIRATION_MONTH"
|
||||
CREDIT_CARD_EXPIRATION_YEAR = "BW_CREDIT_CARD_EXPIRATION_YEAR"
|
||||
CREDIT_CARD_CVV = "BW_CREDIT_CARD_CVV"
|
||||
CREDIT_CARD_BRAND = "BW_CREDIT_CARD_BRAND"
|
||||
|
||||
|
||||
class BitwardenService:
|
||||
@staticmethod
|
||||
@@ -94,6 +102,8 @@ class BitwardenService:
|
||||
"""
|
||||
Get the secret value from the Bitwarden CLI.
|
||||
"""
|
||||
if not bw_organization_id and bw_collection_ids and collection_id not in bw_collection_ids:
|
||||
raise BitwardenAccessDeniedError()
|
||||
try:
|
||||
async with asyncio.timeout(timeout):
|
||||
return await BitwardenService._get_secret_value_from_url(
|
||||
@@ -105,6 +115,8 @@ class BitwardenService:
|
||||
url=url,
|
||||
collection_id=collection_id,
|
||||
)
|
||||
except BitwardenAccessDeniedError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
if remaining_retries <= 0:
|
||||
raise BitwardenListItemsError(
|
||||
@@ -140,8 +152,6 @@ class BitwardenService:
|
||||
"""
|
||||
Get the secret value from the Bitwarden CLI.
|
||||
"""
|
||||
if not bw_organization_id and bw_collection_ids and collection_id not in bw_collection_ids:
|
||||
raise BitwardenAccessDeniedError()
|
||||
try:
|
||||
BitwardenService.login(client_id, client_secret)
|
||||
BitwardenService.sync()
|
||||
@@ -255,6 +265,8 @@ class BitwardenService:
|
||||
"""
|
||||
Get the secret value from the Bitwarden CLI.
|
||||
"""
|
||||
if not bw_organization_id and bw_collection_ids and collection_id not in bw_collection_ids:
|
||||
raise BitwardenAccessDeniedError()
|
||||
try:
|
||||
async with asyncio.timeout(timeout):
|
||||
return await BitwardenService._get_sensitive_information_from_identity(
|
||||
@@ -267,6 +279,8 @@ class BitwardenService:
|
||||
identity_key=identity_key,
|
||||
identity_fields=identity_fields,
|
||||
)
|
||||
except BitwardenAccessDeniedError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
if remaining_retries <= 0:
|
||||
raise BitwardenListItemsError(
|
||||
@@ -304,8 +318,6 @@ class BitwardenService:
|
||||
"""
|
||||
Get the sensitive information from the Bitwarden CLI.
|
||||
"""
|
||||
if not bw_organization_id and bw_collection_ids and collection_id not in bw_collection_ids:
|
||||
raise BitwardenAccessDeniedError()
|
||||
try:
|
||||
BitwardenService.login(client_id, client_secret)
|
||||
BitwardenService.sync()
|
||||
@@ -438,3 +450,123 @@ class BitwardenService:
|
||||
logout_result = BitwardenService.run_command(logout_command)
|
||||
if logout_result.stderr and "You are not logged in." not in logout_result.stderr:
|
||||
raise BitwardenLogoutError(logout_result.stderr)
|
||||
|
||||
@staticmethod
|
||||
async def _get_credit_card_data(
|
||||
client_id: str,
|
||||
client_secret: str,
|
||||
master_password: str,
|
||||
bw_organization_id: str | None,
|
||||
bw_collection_ids: list[str] | None,
|
||||
collection_id: str,
|
||||
item_id: str,
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Get the credit card data from the Bitwarden CLI.
|
||||
"""
|
||||
try:
|
||||
BitwardenService.login(client_id, client_secret)
|
||||
BitwardenService.sync()
|
||||
session_key = BitwardenService.unlock(master_password)
|
||||
|
||||
# Step 3: Get the item
|
||||
get_command = [
|
||||
"bw",
|
||||
"get",
|
||||
"item",
|
||||
item_id,
|
||||
"--session",
|
||||
session_key,
|
||||
]
|
||||
item_result = BitwardenService.run_command(get_command)
|
||||
|
||||
# Parse the item and extract credit card data
|
||||
try:
|
||||
item = json.loads(item_result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
raise BitwardenListItemsError(f"Failed to parse item JSON for item ID: {item_id}")
|
||||
|
||||
if not item:
|
||||
raise BitwardenListItemsError(f"No item found in Bitwarden for item ID: {item_id}")
|
||||
|
||||
# Check if the bw_organization_id matches
|
||||
if bw_organization_id:
|
||||
item_organization_id = item.get("organizationId")
|
||||
if item_organization_id != bw_organization_id:
|
||||
raise BitwardenAccessDeniedError()
|
||||
|
||||
if bw_collection_ids:
|
||||
item_collection_ids = item.get("collectionIds")
|
||||
if item_collection_ids and collection_id not in bw_collection_ids:
|
||||
raise BitwardenAccessDeniedError()
|
||||
|
||||
# Check if the item is a credit card
|
||||
# https://bitwarden.com/help/cli/#create lists the type of the credit card items as 3
|
||||
if item["type"] != 3:
|
||||
raise BitwardenListItemsError(f"Item with ID: {item_id} is not a credit card type")
|
||||
|
||||
credit_card_data = item["card"]
|
||||
|
||||
mapped_credit_card_data: dict[str, str] = {
|
||||
BitwardenConstants.CREDIT_CARD_HOLDER_NAME: credit_card_data["cardholderName"],
|
||||
BitwardenConstants.CREDIT_CARD_NUMBER: credit_card_data["number"],
|
||||
BitwardenConstants.CREDIT_CARD_EXPIRATION_MONTH: credit_card_data["expMonth"],
|
||||
BitwardenConstants.CREDIT_CARD_EXPIRATION_YEAR: credit_card_data["expYear"],
|
||||
BitwardenConstants.CREDIT_CARD_CVV: credit_card_data["code"],
|
||||
BitwardenConstants.CREDIT_CARD_BRAND: credit_card_data["brand"],
|
||||
}
|
||||
|
||||
return mapped_credit_card_data
|
||||
finally:
|
||||
# Step 4: Log out
|
||||
BitwardenService.logout()
|
||||
|
||||
@staticmethod
|
||||
async def get_credit_card_data(
|
||||
client_id: str,
|
||||
client_secret: str,
|
||||
master_password: str,
|
||||
bw_organization_id: str | None,
|
||||
bw_collection_ids: list[str] | None,
|
||||
collection_id: str,
|
||||
item_id: str,
|
||||
remaining_retries: int = settings.BITWARDEN_MAX_RETRIES,
|
||||
fail_reasons: list[str] = [],
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Get the credit card data from the Bitwarden CLI.
|
||||
"""
|
||||
if not bw_organization_id and not bw_collection_ids:
|
||||
raise BitwardenAccessDeniedError()
|
||||
try:
|
||||
async with asyncio.timeout(settings.BITWARDEN_TIMEOUT_SECONDS):
|
||||
return await BitwardenService._get_credit_card_data(
|
||||
client_id=client_id,
|
||||
client_secret=client_secret,
|
||||
master_password=master_password,
|
||||
bw_organization_id=bw_organization_id,
|
||||
bw_collection_ids=bw_collection_ids,
|
||||
collection_id=collection_id,
|
||||
item_id=item_id,
|
||||
)
|
||||
except BitwardenAccessDeniedError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
if remaining_retries <= 0:
|
||||
raise BitwardenListItemsError(
|
||||
f"Bitwarden CLI failed after all retry attempts. Fail reasons: {fail_reasons}"
|
||||
)
|
||||
|
||||
remaining_retries -= 1
|
||||
LOG.info("Retrying to get credit card data from Bitwarden", remaining_retries=remaining_retries)
|
||||
return await BitwardenService.get_credit_card_data(
|
||||
client_id=client_id,
|
||||
client_secret=client_secret,
|
||||
master_password=master_password,
|
||||
bw_organization_id=bw_organization_id,
|
||||
bw_collection_ids=bw_collection_ids,
|
||||
collection_id=collection_id,
|
||||
item_id=item_id,
|
||||
remaining_retries=remaining_retries,
|
||||
fail_reasons=fail_reasons + [f"{type(e).__name__}: {str(e)}"],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user