This commit is contained in:
Iliyan Angelov
2025-11-30 22:43:09 +02:00
parent 24b40450dd
commit 39fcfff811
1610 changed files with 5442 additions and 1383 deletions

View File

@@ -0,0 +1,133 @@
from fastapi import Depends, HTTPException, status, Request, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from typing import Optional
import os
from ...shared.config.database import get_db
from ...shared.config.settings import settings
from ...auth.models.user import User
from ...auth.models.role import Role
security = HTTPBearer(auto_error=False)
def get_jwt_secret() -> str:
"""
Get JWT secret securely, fail if not configured.
Never use hardcoded fallback secrets.
"""
default_secret = 'dev-secret-key-change-in-production-12345'
jwt_secret = getattr(settings, 'JWT_SECRET', None) or os.getenv('JWT_SECRET', None)
# Fail fast if secret is not configured or using default value
if not jwt_secret or jwt_secret == default_secret:
if settings.is_production:
raise ValueError(
'CRITICAL: JWT_SECRET is not properly configured in production. '
'Please set JWT_SECRET environment variable to a secure random string.'
)
# In development, warn but allow (startup validation should catch this)
import warnings
warnings.warn(
f'JWT_SECRET not configured. Using settings value but this is insecure. '
f'Set JWT_SECRET environment variable.',
UserWarning
)
jwt_secret = getattr(settings, 'JWT_SECRET', None)
if not jwt_secret:
raise ValueError('JWT_SECRET must be configured')
return jwt_secret
def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
access_token: Optional[str] = Cookie(None, alias='accessToken'),
db: Session = Depends(get_db)
) -> User:
"""
Get current user from either Authorization header or httpOnly cookie.
Prefers Authorization header for backward compatibility, falls back to cookie.
"""
# Try to get token from Authorization header first
token = None
if credentials:
token = credentials.credentials
# Fall back to cookie if no header token
if not token and access_token:
token = access_token
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate credentials',
headers={'WWW-Authenticate': 'Bearer'}
)
if not token:
raise credentials_exception
try:
jwt_secret = get_jwt_secret()
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
user_id: int = payload.get('userId')
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
except ValueError as e:
# JWT secret configuration error - should not happen in production
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail='Server configuration error')
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
return user
def authorize_roles(*allowed_roles: str):
def role_checker(current_user: User=Depends(get_current_user), db: Session=Depends(get_db)) -> User:
role = db.query(Role).filter(Role.id == current_user.role_id).first()
if not role:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='User role not found')
user_role_name = role.name
if user_role_name not in allowed_roles:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='You do not have permission to access this resource')
return current_user
return role_checker
def get_current_user_optional(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)),
access_token: Optional[str] = Cookie(None, alias='accessToken'),
db: Session = Depends(get_db)
) -> Optional[User]:
"""
Get current user optionally from either Authorization header or httpOnly cookie.
Returns None if no valid token is found.
"""
# Try to get token from Authorization header first
token = None
if credentials:
token = credentials.credentials
# Fall back to cookie if no header token
if not token and access_token:
token = access_token
if not token:
return None
try:
jwt_secret = get_jwt_secret()
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
user_id: int = payload.get('userId')
if user_id is None:
return None
except (JWTError, ValueError):
return None
user = db.query(User).filter(User.id == user_id).first()
return user
def verify_token(token: str) -> dict:
jwt_secret = get_jwt_secret()
payload = jwt.decode(token, jwt_secret, algorithms=['HS256'])
return payload