This commit is contained in:
Iliyan Angelov
2025-12-09 17:07:38 +02:00
parent e43a95eafb
commit 9de9d9701e
14 changed files with 270 additions and 91 deletions

View File

@@ -21,7 +21,7 @@ router = APIRouter(prefix='/accountant/security', tags=['accountant-security'])
async def verify_step_up(
request: Request,
step_up_data: dict,
current_user: User = Depends(authorize_roles('admin', 'accountant')),
current_user: User = Depends(authorize_roles('accountant')),
db: Session = Depends(get_db)
):
"""Verify step-up authentication (MFA token or password re-entry)."""
@@ -36,7 +36,9 @@ async def verify_step_up(
# Try to get from header or cookie
session_token = request.headers.get('X-Session-Token') or request.cookies.get('session_token')
# If still no session token, try to find the most recent active session for this user
# If still no session token, try to find the most recent active session for this user.
# If none exists (e.g., password-only admin without MFA), create a fresh session so
# password-based step-up can proceed without forcing a full re-login.
if not session_token:
active_session = db.query(AccountantSession).filter(
AccountantSession.user_id == current_user.id,
@@ -47,7 +49,13 @@ async def verify_step_up(
if active_session:
session_token = active_session.session_token
else:
raise HTTPException(status_code=400, detail='No active session found. Please log in again.')
new_session = accountant_security_service.create_session(
db=db,
user_id=current_user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get('User-Agent')
)
session_token = new_session.session_token
# Verify MFA if token provided
if mfa_token:
@@ -107,7 +115,7 @@ async def verify_step_up(
@router.get('/sessions')
async def get_active_sessions(
current_user: User = Depends(authorize_roles('admin', 'accountant')),
current_user: User = Depends(authorize_roles('accountant')),
db: Session = Depends(get_db)
):
"""Get active sessions for current user."""
@@ -143,7 +151,7 @@ async def get_active_sessions(
@router.post('/sessions/{session_id}/revoke')
async def revoke_session(
session_id: int,
current_user: User = Depends(authorize_roles('admin', 'accountant')),
current_user: User = Depends(authorize_roles('accountant')),
db: Session = Depends(get_db)
):
"""Revoke a specific session."""
@@ -172,7 +180,7 @@ async def revoke_session(
@router.post('/sessions/revoke-all')
async def revoke_all_sessions(
current_user: User = Depends(authorize_roles('admin', 'accountant')),
current_user: User = Depends(authorize_roles('accountant')),
db: Session = Depends(get_db)
):
"""Revoke all active sessions for current user."""
@@ -196,7 +204,7 @@ async def get_activity_logs(
limit: int = Query(50, ge=1, le=100),
risk_level: Optional[str] = Query(None),
is_unusual: Optional[bool] = Query(None),
current_user: User = Depends(authorize_roles('admin', 'accountant')),
current_user: User = Depends(authorize_roles('accountant')),
db: Session = Depends(get_db)
):
"""Get activity logs for current user or all users (admin only)."""

View File

@@ -6,7 +6,7 @@ from typing import Optional, Dict, Any, Tuple
from datetime import datetime, timedelta
from ...auth.models.user import User
from ..models.accountant_session import AccountantSession, AccountantActivityLog
from ...shared.utils.role_helpers import is_accountant, is_admin
from ...shared.utils.role_helpers import is_accountant
from ...shared.config.logging_config import get_logger
import secrets
import hashlib
@@ -27,10 +27,8 @@ class AccountantSecurityService:
@staticmethod
def requires_mfa(user: User, db: Session) -> bool:
"""Check if user role requires MFA."""
# Admin and all accountant roles require MFA
if is_admin(user, db) or is_accountant(user, db):
return True
return False
# Only accountant roles are handled by this service
return is_accountant(user, db)
@staticmethod
def is_mfa_enforced(user: User, db: Session) -> Tuple[bool, Optional[str]]:
@@ -40,7 +38,7 @@ class AccountantSecurityService:
"""
if AccountantSecurityService.requires_mfa(user, db):
if not user.mfa_enabled:
return False, "MFA is required for accountant/admin roles but not enabled"
return False, "MFA is required for accountant role but not enabled"
return True, None
return False, None
@@ -120,12 +118,18 @@ class AccountantSecurityService:
db: Session,
user_id: int,
session_token: Optional[str] = None,
action_description: str = "high-risk action"
action_description: str = "high-risk action",
enforce_role_check: bool = True,
) -> Tuple[bool, Optional[str]]:
"""
Check if step-up authentication is required.
Returns (requires_step_up: bool, reason: str | None)
"""
user = db.query(User).filter(User.id == user_id).first()
if enforce_role_check:
# Only enforce step-up for accountant roles when role check is enabled
if user and not is_accountant(user, db):
return False, None
# If no session token provided, try to find the most recent active session for this user
if not session_token:
active_session = db.query(AccountantSession).filter(