Make BitwardenService async (#1772)

This commit is contained in:
Shuchang Zheng
2025-02-15 05:32:35 +08:00
committed by GitHub
parent fe383da57e
commit 4ba32fa7ee

View File

@@ -2,7 +2,6 @@ import asyncio
import json
import os
import re
import subprocess
import urllib.parse
from enum import StrEnum
@@ -59,11 +58,17 @@ class BitwardenQueryResult(BaseModel):
uris: list[str]
class RunCommandResult(BaseModel):
stdout: str
stderr: str
returncode: int
class BitwardenService:
@staticmethod
def run_command(
async def run_command(
command: list[str], additional_env: dict[str, str] | None = None, timeout: int = 60
) -> subprocess.CompletedProcess:
) -> RunCommandResult:
"""
Run a CLI command with the specified additional environment variables and return the result.
"""
@@ -74,9 +79,21 @@ class BitwardenService:
env.update(additional_env) # Update with any additional environment variables
try:
return subprocess.run(command, capture_output=True, text=True, env=env, timeout=timeout)
except subprocess.TimeoutExpired as e:
LOG.error(f"Bitwarden command timed out after {timeout} seconds", stdout=e.stdout, stderr=e.stderr)
async with asyncio.timeout(timeout):
shell_subprocess = await asyncio.create_subprocess_shell(
" ".join(command),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, stderr = await shell_subprocess.communicate()
return RunCommandResult(
stdout=stdout.decode(),
stderr=stderr.decode(),
returncode=shell_subprocess.returncode,
)
except asyncio.TimeoutError as e:
LOG.error(f"Bitwarden command timed out after {timeout} seconds", exc_info=True)
raise e
@staticmethod
@@ -190,9 +207,9 @@ class BitwardenService:
Get the secret value from the Bitwarden CLI.
"""
try:
BitwardenService.login(client_id, client_secret)
BitwardenService.sync()
session_key = BitwardenService.unlock(master_password)
await BitwardenService.login(client_id, client_secret)
await BitwardenService.sync()
session_key = await BitwardenService.unlock(master_password)
# Extract the domain from the URL and search for items in Bitwarden with that domain
extract_url = tldextract.extract(url)
@@ -218,7 +235,7 @@ class BitwardenService:
else:
LOG.error("No collection ID or organization ID provided -- this is required")
raise BitwardenListItemsError("No collection ID or organization ID provided -- this is required")
items_result = BitwardenService.run_command(list_command, timeout=timeout)
items_result = await BitwardenService.run_command(list_command, timeout=timeout)
if items_result.stderr and "Event post failed" not in items_result.stderr:
raise BitwardenListItemsError(items_result.stderr)
@@ -281,7 +298,7 @@ class BitwardenService:
return bitwarden_result[0].credential
finally:
# Step 4: Log out
BitwardenService.logout()
await BitwardenService.logout()
@staticmethod
async def get_sensitive_information_from_identity(
@@ -354,9 +371,9 @@ class BitwardenService:
Get the sensitive information from the Bitwarden CLI.
"""
try:
BitwardenService.login(client_id, client_secret)
BitwardenService.sync()
session_key = BitwardenService.unlock(master_password)
await BitwardenService.login(client_id, client_secret)
await BitwardenService.sync()
session_key = await BitwardenService.unlock(master_password)
if not bw_organization_id and not collection_id:
raise BitwardenAccessDeniedError()
@@ -375,7 +392,7 @@ class BitwardenService:
]
if bw_organization_id:
list_command.extend(["--organizationid", bw_organization_id])
items_result = BitwardenService.run_command(list_command)
items_result = await BitwardenService.run_command(list_command)
# Parse the items and extract sensitive information
try:
@@ -416,10 +433,10 @@ class BitwardenService:
finally:
# Step 4: Log out
BitwardenService.logout()
await BitwardenService.logout()
@staticmethod
def login(client_id: str, client_secret: str) -> None:
async def login(client_id: str, client_secret: str) -> None:
"""
Log in to the Bitwarden CLI.
"""
@@ -428,7 +445,7 @@ class BitwardenService:
"BW_CLIENTSECRET": client_secret,
}
login_command = ["bw", "login", "--apikey"]
login_result = BitwardenService.run_command(login_command, env)
login_result = await BitwardenService.run_command(login_command, env)
# Validate the login result
if login_result.stdout and "You are logged in!" not in login_result.stdout:
@@ -440,7 +457,7 @@ class BitwardenService:
LOG.info("Bitwarden login successful")
@staticmethod
def unlock(master_password: str) -> str:
async def unlock(master_password: str) -> str:
"""
Unlock the Bitwarden CLI.
"""
@@ -448,7 +465,7 @@ class BitwardenService:
"BW_PASSWORD": master_password,
}
unlock_command = ["bw", "unlock", "--passwordenv", "BW_PASSWORD"]
unlock_result = BitwardenService.run_command(unlock_command, env)
unlock_result = await BitwardenService.run_command(unlock_command, env)
# Validate the unlock result
if unlock_result.stdout and "Your vault is now unlocked!" not in unlock_result.stdout:
@@ -468,24 +485,24 @@ class BitwardenService:
return session_key
@staticmethod
def sync() -> None:
async def sync() -> None:
"""
Sync the Bitwarden CLI.
"""
sync_command = ["bw", "sync"]
LOG.info("Bitwarden CLI sync started")
sync_result = BitwardenService.run_command(sync_command)
sync_result = await BitwardenService.run_command(sync_command)
LOG.info("Bitwarden CLI sync completed")
if sync_result.stderr:
raise BitwardenSyncError(sync_result.stderr)
@staticmethod
def logout() -> None:
async def logout() -> None:
"""
Log out of the Bitwarden CLI.
"""
logout_command = ["bw", "logout"]
logout_result = BitwardenService.run_command(logout_command)
logout_result = await BitwardenService.run_command(logout_command)
if logout_result.stderr and "You are not logged in." not in logout_result.stderr:
raise BitwardenLogoutError(logout_result.stderr)
@@ -503,9 +520,9 @@ class BitwardenService:
Get the credit card data from the Bitwarden CLI.
"""
try:
BitwardenService.login(client_id, client_secret)
BitwardenService.sync()
session_key = BitwardenService.unlock(master_password)
await BitwardenService.login(client_id, client_secret)
await BitwardenService.sync()
session_key = await BitwardenService.unlock(master_password)
# Step 3: Get the item
get_command = [
@@ -522,7 +539,7 @@ class BitwardenService:
LOG.error("No collection ID or organization ID provided -- this is required")
raise BitwardenAccessDeniedError()
item_result = BitwardenService.run_command(get_command)
item_result = await BitwardenService.run_command(get_command)
# Parse the item and extract credit card data
try:
@@ -563,7 +580,7 @@ class BitwardenService:
return mapped_credit_card_data
finally:
# Step 4: Log out
BitwardenService.logout()
await BitwardenService.logout()
@staticmethod
async def get_credit_card_data(