Files
Hotel-Booking/Backend/src/security/middleware/auth.py
Iliyan Angelov 9f1aeb32da updates
2025-12-04 15:10:07 +02:00

240 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import Depends, HTTPException, status, Request, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session, joinedload
from typing import Optional
from datetime import datetime
import os
from ...shared.config.database import get_db
from ...shared.config.settings import settings
from ...auth.models.user import User
from ...auth.models.role import Role
security = HTTPBearer(auto_error=False)
def get_jwt_secret() -> str:
"""
Get JWT secret securely, fail if not configured.
Never use hardcoded fallback secrets.
"""
# Remove default secret entirely - fail fast if not configured
jwt_secret = getattr(settings, 'JWT_SECRET', None) or os.getenv('JWT_SECRET', None)
# Fail fast if secret is not configured
if not jwt_secret:
error_msg = (
'CRITICAL: JWT_SECRET is not configured. '
'Please set JWT_SECRET environment variable to a secure random string (minimum 32 characters).'
)
import logging
logger = logging.getLogger(__name__)
logger.error(error_msg)
if settings.is_production:
raise ValueError(error_msg)
else:
# In development, generate a secure secret but warn
import secrets
jwt_secret = secrets.token_urlsafe(64)
logger.warning(
f'JWT_SECRET not configured. Auto-generated secret for development. '
f'Set JWT_SECRET environment variable for production: {jwt_secret}'
)
# Validate JWT secret strength
if len(jwt_secret) < 32:
error_msg = 'JWT_SECRET must be at least 32 characters long for security.'
import logging
logger = logging.getLogger(__name__)
logger.error(error_msg)
if settings.is_production:
raise ValueError(error_msg)
else:
logger.warning(error_msg)
# SECURITY: Validate JWT secret entropy (check for predictable patterns)
# Check if secret appears to be randomly generated (not a simple pattern)
if len(set(jwt_secret)) < len(jwt_secret) * 0.3: # Less than 30% unique characters suggests low entropy
error_msg = 'JWT_SECRET appears to have low entropy. Please use a randomly generated secret.'
import logging
logger = logging.getLogger(__name__)
logger.error(error_msg)
if settings.is_production:
raise ValueError(error_msg)
else:
logger.warning(error_msg)
return jwt_secret
def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
access_token: Optional[str] = Cookie(None, alias='accessToken'),
db: Session = Depends(get_db)
) -> User:
"""
Get current user from either Authorization header or httpOnly cookie.
Prefers Authorization header for backward compatibility, falls back to cookie.
"""
# Try to get token from Authorization header first
token = None
if credentials:
token = credentials.credentials
# Fall back to cookie if no header token
if not token and access_token:
token = access_token
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'}
)
if not token:
raise credentials_exception
try:
jwt_secret = get_jwt_secret()
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
user_id: int = payload.get('userId')
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
except ValueError as e:
# JWT secret configuration error - should not happen in production
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Server configuration error')
# PERFORMANCE: Eager load role relationship to avoid N+1 queries in authorize_roles
user = db.query(User).options(joinedload(User.role)).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
# Check if user account is active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='Account is disabled. Please contact support.'
)
# Check if account is locked
if user.locked_until and user.locked_until > datetime.utcnow():
remaining_minutes = int((user.locked_until - datetime.utcnow()).total_seconds() / 60)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f'Account is temporarily locked due to multiple failed login attempts. Please try again in {remaining_minutes} minute(s).'
)
# SECURITY: Check MFA for accountant/admin roles (warn but allow access for MFA setup)
try:
from ...payments.services.accountant_security_service import accountant_security_service
from ...shared.utils.role_helpers import is_accountant, is_admin
from ...shared.config.logging_config import get_logger
logger = get_logger(__name__)
if is_accountant(user, db) or is_admin(user, db):
is_enforced, reason = accountant_security_service.is_mfa_enforced(user, db)
if not is_enforced and reason:
# Log warning but allow access so user can set up MFA
# Individual routes can enforce MFA for sensitive operations
logger.warning(
f'User {user.id} ({user.email}) accessed system without MFA enabled. '
f'MFA is required for {user.role.name if user.role else "unknown"} role. '
f'Reason: {reason}'
)
# Store MFA requirement in user object for route-level checks
user._mfa_setup_required = True
user._mfa_setup_reason = reason
except HTTPException:
raise
except Exception as e:
# Don't block authentication if MFA check fails (log and continue)
logger = get_logger(__name__)
logger.warning(f'Error checking MFA enforcement: {str(e)}')
return user
def authorize_roles(*allowed_roles: str):
def role_checker(current_user: User=Depends(get_current_user), db: Session=Depends(get_db)) -> User:
# PERFORMANCE: Use eager-loaded relationship if available, otherwise query
# This reduces database queries since get_current_user now eager loads the role
from ...shared.utils.role_helpers import get_user_role_name
if hasattr(current_user, 'role') and current_user.role is not None:
user_role_name = current_user.role.name
else:
# Fallback: query role if relationship wasn't loaded (shouldn't happen, but safe)
role = db.query(Role).filter(Role.id == current_user.role_id).first()
if not role:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='User role not found')
user_role_name = role.name
# Backwardscompatible support for accountant subroles:
# any role starting with "accountant_" is treated as "accountant"
# when a route allows plain "accountant".
if user_role_name not in allowed_roles:
# Normalise accountant_* → accountant for role checks
if (
user_role_name.startswith("accountant_")
and "accountant" in allowed_roles
):
# treated as allowed
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='You do not have permission to access this resource'
)
return current_user
return role_checker
def get_current_user_optional(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)),
access_token: Optional[str] = Cookie(None, alias='accessToken'),
db: Session = Depends(get_db)
) -> Optional[User]:
"""
Get current user optionally from either Authorization header or httpOnly cookie.
Returns None if no valid token is found, or if user is inactive/locked.
This ensures inactive/locked users are never considered "authenticated" even for optional features.
"""
# Try to get token from Authorization header first
token = None
if credentials:
token = credentials.credentials
# Fall back to cookie if no header token
if not token and access_token:
token = access_token
if not token:
return None
try:
jwt_secret = get_jwt_secret()
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
user_id: int = payload.get('userId')
if user_id is None:
return None
except (JWTError, ValueError):
return None
# PERFORMANCE: Eager load role relationship for consistency
user = db.query(User).options(joinedload(User.role)).filter(User.id == user_id).first()
if user is None:
return None
# Check if user account is active - return None for inactive users
if not user.is_active:
return None
# Check if account is locked - return None for locked users
if user.locked_until and user.locked_until > datetime.utcnow():
return None
return user
def verify_token(token: str) -> dict:
jwt_secret = get_jwt_secret()
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
return payload