""" API key service for third-party access management. """ from sqlalchemy.orm import Session from typing import List, Optional from datetime import datetime import secrets import hashlib from ..models.api_key import APIKey from ...shared.config.logging_config import get_logger logger = get_logger(__name__) class APIKeyService: """Service for managing API keys.""" @staticmethod def generate_api_key() -> tuple[str, str]: """Generate a new API key and return (key, hash).""" # Generate key in format: hb_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx key = f"hb_{secrets.token_urlsafe(32)}" key_hash = hashlib.sha256(key.encode('utf-8')).hexdigest() key_prefix = key[:11] # "hb_" + 8 chars return key, key_hash, key_prefix @staticmethod def create_api_key( db: Session, name: str, scopes: List[str], created_by: int, description: Optional[str] = None, rate_limit: int = 100, expires_at: Optional[datetime] = None ) -> tuple[APIKey, str]: """Create a new API key and return (api_key_model, plain_key).""" key, key_hash, key_prefix = APIKeyService.generate_api_key() api_key = APIKey( name=name, key_hash=key_hash, key_prefix=key_prefix, scopes=scopes, rate_limit=rate_limit, created_by=created_by, description=description, expires_at=expires_at ) db.add(api_key) db.commit() db.refresh(api_key) logger.info(f'API key created: {api_key.id} - {name}') return api_key, key # Return plain key only once @staticmethod def verify_api_key(db: Session, api_key: str) -> Optional[APIKey]: """Verify an API key and return the APIKey model if valid.""" key_hash = hashlib.sha256(api_key.encode('utf-8')).hexdigest() api_key_model = db.query(APIKey).filter( APIKey.key_hash == key_hash, APIKey.is_active == True ).first() if api_key_model and api_key_model.is_valid: # Update last used timestamp api_key_model.last_used_at = datetime.utcnow() db.commit() return api_key_model return None @staticmethod def revoke_api_key(db: Session, key_id: int) -> bool: """Revoke an API key.""" api_key = db.query(APIKey).filter(APIKey.id == key_id).first() if api_key: api_key.is_active = False db.commit() logger.info(f'API key {key_id} revoked') return True return False @staticmethod def get_user_api_keys( db: Session, created_by: int, active_only: bool = True ) -> List[APIKey]: """Get all API keys created by a user.""" query = db.query(APIKey).filter(APIKey.created_by == created_by) if active_only: query = query.filter(APIKey.is_active == True) return query.order_by(APIKey.created_at.desc()).all() @staticmethod def update_api_key( db: Session, key_id: int, name: Optional[str] = None, scopes: Optional[List[str]] = None, description: Optional[str] = None, rate_limit: Optional[int] = None, expires_at: Optional[datetime] = None ) -> Optional[APIKey]: """Update an API key.""" api_key = db.query(APIKey).filter(APIKey.id == key_id).first() if not api_key: return None if name is not None: api_key.name = name if scopes is not None: api_key.scopes = scopes if description is not None: api_key.description = description if rate_limit is not None: api_key.rate_limit = rate_limit if expires_at is not None: api_key.expires_at = expires_at db.commit() db.refresh(api_key) logger.info(f'API key updated: {api_key.id} - {api_key.name}') return api_key api_key_service = APIKeyService()