Add gated admin impersonation controls for MCP API-key auth (#4822)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Marc Kelechava
2026-02-19 18:56:06 -08:00
committed by GitHub
parent 36e600eeb9
commit 71f2b7a201
5 changed files with 520 additions and 41 deletions

View File

@@ -3,6 +3,7 @@ from enum import StrEnum
class OrganizationAuthTokenType(StrEnum):
api = "api"
mcp_admin_impersonation = "mcp_admin_impersonation"
onepassword_service_account = "onepassword_service_account"
azure_client_secret_credential = "azure_client_secret_credential"
custom_credential_service = "custom_credential_service"

View File

@@ -1,6 +1,6 @@
import time
from dataclasses import dataclass
from typing import Annotated
from typing import Annotated, Sequence
import structlog
from asyncache import cached
@@ -176,6 +176,7 @@ async def _authenticate_user_helper(authorization: str) -> str:
async def resolve_org_from_api_key(
x_api_key: str,
db: AgentDB,
token_types: Sequence[OrganizationAuthTokenType] = (OrganizationAuthTokenType.api,),
) -> ApiKeyValidationResult:
"""Decode and validate the API key against the database."""
try:
@@ -202,12 +203,18 @@ async def resolve_org_from_api_key(
LOG.warning("Organization not found", organization_id=api_key_data.sub, **payload)
raise HTTPException(status_code=404, detail="Organization not found")
api_key_db_obj = await db.validate_org_auth_token(
organization_id=organization.organization_id,
token_type=OrganizationAuthTokenType.api,
token=x_api_key,
valid=None,
)
api_key_db_obj: OrganizationAuthToken | None = None
# Try token types in priority order and stop at the first valid match.
for token_type in token_types:
api_key_db_obj = await db.validate_org_auth_token(
organization_id=organization.organization_id,
token_type=token_type,
token=x_api_key,
valid=None,
)
if api_key_db_obj:
break
if not api_key_db_obj:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,