""" Security API views for authentication, authorization, and audit """ import base64 from datetime import datetime, timedelta from django.contrib.auth import authenticate, login, logout from django.contrib.auth.decorators import login_required from django.utils.decorators import method_decorator from django.utils import timezone from django.db.models import Q from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from rest_framework import status, generics, permissions from rest_framework.decorators import api_view, permission_classes, action from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet from rest_framework.authtoken.models import Token from rest_framework.permissions import IsAuthenticated, IsAdminUser from ..models import ( DataClassification, Role, User, MFADevice, AuditLog, SSOProvider, AccessPolicy ) from ..serializers.security import ( DataClassificationSerializer, RoleSerializer, UserSerializer, MFADeviceSerializer, MFASetupSerializer, MFAVerificationSerializer, AuditLogSerializer, SSOProviderSerializer, AccessPolicySerializer, LoginSerializer, PasswordChangeSerializer, UserProfileSerializer ) from ..mfa.totp import MFAProvider from ..authentication.sso import SSOAuthentication class DataClassificationViewSet(ModelViewSet): """ViewSet for data classification management""" queryset = DataClassification.objects.all() serializer_class = DataClassificationSerializer permission_classes = [IsAdminUser] def get_queryset(self): """Filter classifications based on user clearance""" user = self.request.user if user.is_superuser: return DataClassification.objects.all() # Filter based on user's clearance level if user.clearance_level: return DataClassification.objects.filter( level__lte=user.clearance_level.level ) return DataClassification.objects.filter(level__lte=1) class RoleViewSet(ModelViewSet): """ViewSet for role management""" queryset = Role.objects.all() serializer_class = RoleSerializer permission_classes = [IsAdminUser] def get_queryset(self): """Filter roles based on user permissions""" user = self.request.user if user.is_superuser: return Role.objects.all() # Users can only see roles they have permissions for return Role.objects.filter( permissions__in=user.get_effective_permissions() ).distinct() class UserViewSet(ModelViewSet): """ViewSet for user management""" queryset = User.objects.all() serializer_class = UserSerializer permission_classes = [IsAdminUser] def get_queryset(self): """Filter users based on data classification access""" user = self.request.user if user.is_superuser: return User.objects.all() # Filter based on clearance level if user.clearance_level: return User.objects.filter( clearance_level__level__lte=user.clearance_level.level ) return User.objects.filter(clearance_level__level__lte=1) @action(detail=True, methods=['post']) def lock_account(self, request, pk=None): """Lock user account""" user = self.get_object() duration = request.data.get('duration_minutes', 30) user.lock_account(duration) # Log the action AuditLog.objects.create( user=request.user, action_type='ACCOUNT_LOCKED', resource_type='User', resource_id=str(user.id), ip_address=self._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={'locked_user': user.username, 'duration_minutes': duration}, severity='HIGH' ) return Response({'message': f'Account locked for {duration} minutes'}) @action(detail=True, methods=['post']) def unlock_account(self, request, pk=None): """Unlock user account""" user = self.get_object() user.unlock_account() # Log the action AuditLog.objects.create( user=request.user, action_type='ACCOUNT_UNLOCKED', resource_type='User', resource_id=str(user.id), ip_address=self._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={'unlocked_user': user.username}, severity='MEDIUM' ) return Response({'message': 'Account unlocked'}) @action(detail=True, methods=['patch']) def update_roles(self, request, pk=None): """Update user roles""" user = self.get_object() role_ids = request.data.get('role_ids', []) # Validate role IDs roles = Role.objects.filter(id__in=role_ids) if len(roles) != len(role_ids): return Response( {'error': 'Invalid role IDs provided'}, status=status.HTTP_400_BAD_REQUEST ) # Update user roles user.roles.set(roles) # Log the action AuditLog.objects.create( user=request.user, action_type='ROLES_UPDATED', resource_type='User', resource_id=str(user.id), ip_address=self._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={'updated_user': user.username, 'new_roles': [role.name for role in roles]}, severity='MEDIUM' ) return Response({'message': 'User roles updated successfully'}) @action(detail=True, methods=['patch']) def update_clearance(self, request, pk=None): """Update user clearance level""" user = self.get_object() clearance_level_id = request.data.get('clearance_level_id') if not clearance_level_id: return Response( {'error': 'clearance_level_id is required'}, status=status.HTTP_400_BAD_REQUEST ) try: clearance_level = DataClassification.objects.get(id=clearance_level_id) except DataClassification.DoesNotExist: return Response( {'error': 'Invalid clearance level ID'}, status=status.HTTP_400_BAD_REQUEST ) old_clearance = user.clearance_level.name if user.clearance_level else 'None' user.clearance_level = clearance_level user.save() # Log the action AuditLog.objects.create( user=request.user, action_type='CLEARANCE_UPDATED', resource_type='User', resource_id=str(user.id), ip_address=self._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={ 'updated_user': user.username, 'old_clearance': old_clearance, 'new_clearance': clearance_level.name }, severity='HIGH' ) return Response({'message': 'User clearance level updated successfully'}) @action(detail=False, methods=['get']) def dashboard_permissions(self, request): """Get dashboard permission mapping for admin""" dashboard_components = { 'Dashboard (Overview)': {'permissions': [], 'clearance_level': None}, 'Incidents': {'permissions': ['view_incident'], 'clearance_level': None}, 'Monitoring': {'permissions': ['view_monitoringdashboard', 'view_alert', 'view_healthcheck'], 'clearance_level': None}, 'SLA & On-Call': {'permissions': ['view_sladefinition', 'view_slainstance', 'view_oncallassignment'], 'clearance_level': None}, 'Security': {'permissions': ['view_user', 'view_role', 'view_auditlog', 'view_securityevent'], 'clearance_level': 2}, 'Automation': {'permissions': ['view_runbook', 'view_autoremediation', 'view_integration'], 'clearance_level': None}, 'War Rooms': {'permissions': ['view_warroom', 'view_warroommessage', 'view_conferencebridge'], 'clearance_level': None}, 'Analytics': {'permissions': ['view_kpimetric', 'view_anomalydetection', 'view_dashboardconfiguration'], 'clearance_level': None}, 'Knowledge': {'permissions': ['view_knowledgebasearticle', 'view_postmortem', 'view_learningpattern'], 'clearance_level': None}, 'Compliance': {'permissions': ['view_compliancereport', 'view_regulatoryframework', 'view_legalhold'], 'clearance_level': 3}, } return Response(dashboard_components) def _get_client_ip(self, request): """Get client IP address""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class MFADeviceViewSet(ModelViewSet): """ViewSet for MFA device management""" serializer_class = MFADeviceSerializer permission_classes = [IsAuthenticated] def get_queryset(self): """Users can only manage their own MFA devices""" return MFADevice.objects.filter(user=self.request.user) def perform_create(self, serializer): """Create MFA device for current user""" serializer.save(user=self.request.user) @action(detail=False, methods=['post']) def setup_totp(self, request): """Setup TOTP device""" serializer = MFASetupSerializer(data=request.data, context={'request': request}) if serializer.is_valid(): device_name = serializer.validated_data['device_name'] # Create TOTP device device, qr_data = MFAProvider.create_totp_device(request.user, device_name) # Generate QR code image qr_image = MFAProvider.generate_qr_code_image(qr_data) qr_base64 = base64.b64encode(qr_image).decode('utf-8') return Response({ 'device_id': str(device.id), 'qr_code_data': qr_data, 'qr_code_image': f"data:image/png;base64,{qr_base64}", 'message': 'TOTP device created. Scan QR code with authenticator app.' }) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=True, methods=['post']) def verify_totp(self, request, pk=None): """Verify TOTP token""" device = self.get_object() serializer = MFAVerificationSerializer(data=request.data) if serializer.is_valid(): token = serializer.validated_data['token'] if MFAProvider.verify_totp_token(request.user, token, str(device.id)): return Response({'message': 'TOTP token verified successfully'}) else: return Response( {'error': 'Invalid TOTP token'}, status=status.HTTP_400_BAD_REQUEST ) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=['post']) def enable_mfa(self, request): """Enable MFA for user""" if MFAProvider.enable_mfa_for_user(request.user): return Response({'message': 'MFA enabled successfully'}) else: return Response( {'error': 'No active MFA devices found'}, status=status.HTTP_400_BAD_REQUEST ) @action(detail=False, methods=['post']) def disable_mfa(self, request): """Disable MFA for user""" MFAProvider.disable_mfa_for_user(request.user) return Response({'message': 'MFA disabled successfully'}) class AuditLogViewSet(ModelViewSet): """ViewSet for audit log viewing""" queryset = AuditLog.objects.all() serializer_class = AuditLogSerializer permission_classes = [IsAdminUser] http_method_names = ['get', 'head', 'options'] # Read-only def get_queryset(self): """Filter audit logs based on user permissions""" queryset = AuditLog.objects.all() # Filter by date range start_date = self.request.query_params.get('start_date') end_date = self.request.query_params.get('end_date') if start_date: queryset = queryset.filter(timestamp__gte=start_date) if end_date: queryset = queryset.filter(timestamp__lte=end_date) # Filter by action type action_type = self.request.query_params.get('action_type') if action_type: queryset = queryset.filter(action_type=action_type) # Filter by severity severity = self.request.query_params.get('severity') if severity: queryset = queryset.filter(severity=severity) # Filter by user user_id = self.request.query_params.get('user_id') if user_id: queryset = queryset.filter(user_id=user_id) return queryset.order_by('-timestamp') class SSOProviderViewSet(ModelViewSet): """ViewSet for SSO provider management""" queryset = SSOProvider.objects.all() serializer_class = SSOProviderSerializer permission_classes = [IsAdminUser] class AccessPolicyViewSet(ModelViewSet): """ViewSet for access policy management""" queryset = AccessPolicy.objects.all() serializer_class = AccessPolicySerializer permission_classes = [IsAdminUser] @api_view(['POST']) @permission_classes([]) def login_view(request): """User login with MFA support""" serializer = LoginSerializer(data=request.data) if serializer.is_valid(): username = serializer.validated_data['username'] password = serializer.validated_data['password'] mfa_token = serializer.validated_data.get('mfa_token', '') remember_me = serializer.validated_data.get('remember_me', False) # Authenticate user user = authenticate(request, username=username, password=password) if user is None: # Log failed login attempt AuditLog.objects.create( action_type='LOGIN_FAILED', ip_address=_get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={'username': username}, severity='MEDIUM' ) return Response( {'error': 'Invalid credentials'}, status=status.HTTP_401_UNAUTHORIZED ) # Check if account is locked if user.is_account_locked(): return Response( {'error': 'Account is locked'}, status=status.HTTP_423_LOCKED ) # Check MFA if enabled if user.mfa_enabled: if not mfa_token: return Response( {'error': 'MFA token required', 'mfa_required': True}, status=status.HTTP_400_BAD_REQUEST ) if not MFAProvider.verify_totp_token(user, mfa_token): return Response( {'error': 'Invalid MFA token'}, status=status.HTTP_401_UNAUTHORIZED ) # Login successful login(request, user) # Update user login info user.last_login_ip = _get_client_ip(request) user.failed_login_attempts = 0 user.save(update_fields=['last_login_ip', 'failed_login_attempts']) # Generate or get token token, created = Token.objects.get_or_create(user=user) # Set session expiry if not remember_me: request.session.set_expiry(3600) # 1 hour # Log successful login AuditLog.objects.create( user=user, action_type='LOGIN', ip_address=_get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={'remember_me': remember_me, 'mfa_used': user.mfa_enabled} ) return Response({ 'token': token.key, 'user': UserSerializer(user).data, 'message': 'Login successful' }) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @api_view(['POST']) @permission_classes([IsAuthenticated]) def logout_view(request): """User logout""" # Log logout AuditLog.objects.create( user=request.user, action_type='LOGOUT', ip_address=_get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', '') ) # Delete token try: request.user.auth_token.delete() except: pass logout(request) return Response({'message': 'Logout successful'}) @api_view(['GET']) @permission_classes([IsAuthenticated]) def user_profile(request): """Get current user profile""" serializer = UserProfileSerializer(request.user) return Response(serializer.data) @api_view(['POST']) @permission_classes([IsAuthenticated]) def change_password(request): """Change user password""" serializer = PasswordChangeSerializer(data=request.data) if serializer.is_valid(): user = request.user current_password = serializer.validated_data['current_password'] new_password = serializer.validated_data['new_password'] # Verify current password if not user.check_password(current_password): return Response( {'error': 'Current password is incorrect'}, status=status.HTTP_400_BAD_REQUEST ) # Set new password user.set_password(new_password) user.save() # Log password change AuditLog.objects.create( user=user, action_type='PASSWORD_CHANGE', ip_address=_get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), severity='MEDIUM' ) return Response({'message': 'Password changed successfully'}) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @api_view(['GET']) @permission_classes([IsAuthenticated]) def mfa_status(request): """Get MFA status for current user""" devices = MFAProvider.get_user_mfa_devices(request.user) return Response({ 'mfa_enabled': request.user.mfa_enabled, 'devices': devices, 'device_count': len(devices) }) def _get_client_ip(request): """Get client IP address from request""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip