534 lines
19 KiB
Python
534 lines
19 KiB
Python
"""
|
|
Security API views for authentication, authorization, and audit
|
|
"""
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.contrib.auth import authenticate, login, logout
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils import timezone
|
|
from django.db.models import Q
|
|
from django.http import JsonResponse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_http_methods
|
|
|
|
from rest_framework import status, generics, permissions
|
|
from rest_framework.decorators import api_view, permission_classes, action
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import ModelViewSet
|
|
from rest_framework.authtoken.models import Token
|
|
from rest_framework.permissions import IsAuthenticated, IsAdminUser
|
|
|
|
from ..models import (
|
|
DataClassification, Role, User, MFADevice,
|
|
AuditLog, SSOProvider, AccessPolicy
|
|
)
|
|
from ..serializers.security import (
|
|
DataClassificationSerializer, RoleSerializer, UserSerializer,
|
|
MFADeviceSerializer, MFASetupSerializer, MFAVerificationSerializer,
|
|
AuditLogSerializer, SSOProviderSerializer, AccessPolicySerializer,
|
|
LoginSerializer, PasswordChangeSerializer, UserProfileSerializer
|
|
)
|
|
from ..mfa.totp import MFAProvider
|
|
from ..authentication.sso import SSOAuthentication
|
|
|
|
|
|
class DataClassificationViewSet(ModelViewSet):
|
|
"""ViewSet for data classification management"""
|
|
queryset = DataClassification.objects.all()
|
|
serializer_class = DataClassificationSerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
"""Filter classifications based on user clearance"""
|
|
user = self.request.user
|
|
if user.is_superuser:
|
|
return DataClassification.objects.all()
|
|
|
|
# Filter based on user's clearance level
|
|
if user.clearance_level:
|
|
return DataClassification.objects.filter(
|
|
level__lte=user.clearance_level.level
|
|
)
|
|
|
|
return DataClassification.objects.filter(level__lte=1)
|
|
|
|
|
|
class RoleViewSet(ModelViewSet):
|
|
"""ViewSet for role management"""
|
|
queryset = Role.objects.all()
|
|
serializer_class = RoleSerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
"""Filter roles based on user permissions"""
|
|
user = self.request.user
|
|
if user.is_superuser:
|
|
return Role.objects.all()
|
|
|
|
# Users can only see roles they have permissions for
|
|
return Role.objects.filter(
|
|
permissions__in=user.get_effective_permissions()
|
|
).distinct()
|
|
|
|
|
|
class UserViewSet(ModelViewSet):
|
|
"""ViewSet for user management"""
|
|
queryset = User.objects.all()
|
|
serializer_class = UserSerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
def get_queryset(self):
|
|
"""Filter users based on data classification access"""
|
|
user = self.request.user
|
|
if user.is_superuser:
|
|
return User.objects.all()
|
|
|
|
# Filter based on clearance level
|
|
if user.clearance_level:
|
|
return User.objects.filter(
|
|
clearance_level__level__lte=user.clearance_level.level
|
|
)
|
|
|
|
return User.objects.filter(clearance_level__level__lte=1)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def lock_account(self, request, pk=None):
|
|
"""Lock user account"""
|
|
user = self.get_object()
|
|
duration = request.data.get('duration_minutes', 30)
|
|
|
|
user.lock_account(duration)
|
|
|
|
# Log the action
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='ACCOUNT_LOCKED',
|
|
resource_type='User',
|
|
resource_id=str(user.id),
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={'locked_user': user.username, 'duration_minutes': duration},
|
|
severity='HIGH'
|
|
)
|
|
|
|
return Response({'message': f'Account locked for {duration} minutes'})
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def unlock_account(self, request, pk=None):
|
|
"""Unlock user account"""
|
|
user = self.get_object()
|
|
user.unlock_account()
|
|
|
|
# Log the action
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='ACCOUNT_UNLOCKED',
|
|
resource_type='User',
|
|
resource_id=str(user.id),
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={'unlocked_user': user.username},
|
|
severity='MEDIUM'
|
|
)
|
|
|
|
return Response({'message': 'Account unlocked'})
|
|
|
|
@action(detail=True, methods=['patch'])
|
|
def update_roles(self, request, pk=None):
|
|
"""Update user roles"""
|
|
user = self.get_object()
|
|
role_ids = request.data.get('role_ids', [])
|
|
|
|
# Validate role IDs
|
|
roles = Role.objects.filter(id__in=role_ids)
|
|
if len(roles) != len(role_ids):
|
|
return Response(
|
|
{'error': 'Invalid role IDs provided'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Update user roles
|
|
user.roles.set(roles)
|
|
|
|
# Log the action
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='ROLES_UPDATED',
|
|
resource_type='User',
|
|
resource_id=str(user.id),
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={'updated_user': user.username, 'new_roles': [role.name for role in roles]},
|
|
severity='MEDIUM'
|
|
)
|
|
|
|
return Response({'message': 'User roles updated successfully'})
|
|
|
|
@action(detail=True, methods=['patch'])
|
|
def update_clearance(self, request, pk=None):
|
|
"""Update user clearance level"""
|
|
user = self.get_object()
|
|
clearance_level_id = request.data.get('clearance_level_id')
|
|
|
|
if not clearance_level_id:
|
|
return Response(
|
|
{'error': 'clearance_level_id is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
try:
|
|
clearance_level = DataClassification.objects.get(id=clearance_level_id)
|
|
except DataClassification.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Invalid clearance level ID'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
old_clearance = user.clearance_level.name if user.clearance_level else 'None'
|
|
user.clearance_level = clearance_level
|
|
user.save()
|
|
|
|
# Log the action
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='CLEARANCE_UPDATED',
|
|
resource_type='User',
|
|
resource_id=str(user.id),
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={
|
|
'updated_user': user.username,
|
|
'old_clearance': old_clearance,
|
|
'new_clearance': clearance_level.name
|
|
},
|
|
severity='HIGH'
|
|
)
|
|
|
|
return Response({'message': 'User clearance level updated successfully'})
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def dashboard_permissions(self, request):
|
|
"""Get dashboard permission mapping for admin"""
|
|
dashboard_components = {
|
|
'Dashboard (Overview)': {'permissions': [], 'clearance_level': None},
|
|
'Incidents': {'permissions': ['view_incident'], 'clearance_level': None},
|
|
'Monitoring': {'permissions': ['view_monitoringdashboard', 'view_alert', 'view_healthcheck'], 'clearance_level': None},
|
|
'SLA & On-Call': {'permissions': ['view_sladefinition', 'view_slainstance', 'view_oncallassignment'], 'clearance_level': None},
|
|
'Security': {'permissions': ['view_user', 'view_role', 'view_auditlog', 'view_securityevent'], 'clearance_level': 2},
|
|
'Automation': {'permissions': ['view_runbook', 'view_autoremediation', 'view_integration'], 'clearance_level': None},
|
|
'War Rooms': {'permissions': ['view_warroom', 'view_warroommessage', 'view_conferencebridge'], 'clearance_level': None},
|
|
'Analytics': {'permissions': ['view_kpimetric', 'view_anomalydetection', 'view_dashboardconfiguration'], 'clearance_level': None},
|
|
'Knowledge': {'permissions': ['view_knowledgebasearticle', 'view_postmortem', 'view_learningpattern'], 'clearance_level': None},
|
|
'Compliance': {'permissions': ['view_compliancereport', 'view_regulatoryframework', 'view_legalhold'], 'clearance_level': 3},
|
|
}
|
|
|
|
return Response(dashboard_components)
|
|
|
|
def _get_client_ip(self, request):
|
|
"""Get client IP address"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class MFADeviceViewSet(ModelViewSet):
|
|
"""ViewSet for MFA device management"""
|
|
serializer_class = MFADeviceSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Users can only manage their own MFA devices"""
|
|
return MFADevice.objects.filter(user=self.request.user)
|
|
|
|
def perform_create(self, serializer):
|
|
"""Create MFA device for current user"""
|
|
serializer.save(user=self.request.user)
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def setup_totp(self, request):
|
|
"""Setup TOTP device"""
|
|
serializer = MFASetupSerializer(data=request.data, context={'request': request})
|
|
if serializer.is_valid():
|
|
device_name = serializer.validated_data['device_name']
|
|
|
|
# Create TOTP device
|
|
device, qr_data = MFAProvider.create_totp_device(request.user, device_name)
|
|
|
|
# Generate QR code image
|
|
qr_image = MFAProvider.generate_qr_code_image(qr_data)
|
|
qr_base64 = base64.b64encode(qr_image).decode('utf-8')
|
|
|
|
return Response({
|
|
'device_id': str(device.id),
|
|
'qr_code_data': qr_data,
|
|
'qr_code_image': f"data:image/png;base64,{qr_base64}",
|
|
'message': 'TOTP device created. Scan QR code with authenticator app.'
|
|
})
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def verify_totp(self, request, pk=None):
|
|
"""Verify TOTP token"""
|
|
device = self.get_object()
|
|
serializer = MFAVerificationSerializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
token = serializer.validated_data['token']
|
|
|
|
if MFAProvider.verify_totp_token(request.user, token, str(device.id)):
|
|
return Response({'message': 'TOTP token verified successfully'})
|
|
else:
|
|
return Response(
|
|
{'error': 'Invalid TOTP token'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def enable_mfa(self, request):
|
|
"""Enable MFA for user"""
|
|
if MFAProvider.enable_mfa_for_user(request.user):
|
|
return Response({'message': 'MFA enabled successfully'})
|
|
else:
|
|
return Response(
|
|
{'error': 'No active MFA devices found'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def disable_mfa(self, request):
|
|
"""Disable MFA for user"""
|
|
MFAProvider.disable_mfa_for_user(request.user)
|
|
return Response({'message': 'MFA disabled successfully'})
|
|
|
|
|
|
class AuditLogViewSet(ModelViewSet):
|
|
"""ViewSet for audit log viewing"""
|
|
queryset = AuditLog.objects.all()
|
|
serializer_class = AuditLogSerializer
|
|
permission_classes = [IsAdminUser]
|
|
http_method_names = ['get', 'head', 'options'] # Read-only
|
|
|
|
def get_queryset(self):
|
|
"""Filter audit logs based on user permissions"""
|
|
queryset = AuditLog.objects.all()
|
|
|
|
# Filter by date range
|
|
start_date = self.request.query_params.get('start_date')
|
|
end_date = self.request.query_params.get('end_date')
|
|
|
|
if start_date:
|
|
queryset = queryset.filter(timestamp__gte=start_date)
|
|
if end_date:
|
|
queryset = queryset.filter(timestamp__lte=end_date)
|
|
|
|
# Filter by action type
|
|
action_type = self.request.query_params.get('action_type')
|
|
if action_type:
|
|
queryset = queryset.filter(action_type=action_type)
|
|
|
|
# Filter by severity
|
|
severity = self.request.query_params.get('severity')
|
|
if severity:
|
|
queryset = queryset.filter(severity=severity)
|
|
|
|
# Filter by user
|
|
user_id = self.request.query_params.get('user_id')
|
|
if user_id:
|
|
queryset = queryset.filter(user_id=user_id)
|
|
|
|
return queryset.order_by('-timestamp')
|
|
|
|
|
|
class SSOProviderViewSet(ModelViewSet):
|
|
"""ViewSet for SSO provider management"""
|
|
queryset = SSOProvider.objects.all()
|
|
serializer_class = SSOProviderSerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
|
|
class AccessPolicyViewSet(ModelViewSet):
|
|
"""ViewSet for access policy management"""
|
|
queryset = AccessPolicy.objects.all()
|
|
serializer_class = AccessPolicySerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([])
|
|
def login_view(request):
|
|
"""User login with MFA support"""
|
|
serializer = LoginSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
username = serializer.validated_data['username']
|
|
password = serializer.validated_data['password']
|
|
mfa_token = serializer.validated_data.get('mfa_token', '')
|
|
remember_me = serializer.validated_data.get('remember_me', False)
|
|
|
|
# Authenticate user
|
|
user = authenticate(request, username=username, password=password)
|
|
|
|
if user is None:
|
|
# Log failed login attempt
|
|
AuditLog.objects.create(
|
|
action_type='LOGIN_FAILED',
|
|
ip_address=_get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={'username': username},
|
|
severity='MEDIUM'
|
|
)
|
|
|
|
return Response(
|
|
{'error': 'Invalid credentials'},
|
|
status=status.HTTP_401_UNAUTHORIZED
|
|
)
|
|
|
|
# Check if account is locked
|
|
if user.is_account_locked():
|
|
return Response(
|
|
{'error': 'Account is locked'},
|
|
status=status.HTTP_423_LOCKED
|
|
)
|
|
|
|
# Check MFA if enabled
|
|
if user.mfa_enabled:
|
|
if not mfa_token:
|
|
return Response(
|
|
{'error': 'MFA token required', 'mfa_required': True},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
if not MFAProvider.verify_totp_token(user, mfa_token):
|
|
return Response(
|
|
{'error': 'Invalid MFA token'},
|
|
status=status.HTTP_401_UNAUTHORIZED
|
|
)
|
|
|
|
# Login successful
|
|
login(request, user)
|
|
|
|
# Update user login info
|
|
user.last_login_ip = _get_client_ip(request)
|
|
user.failed_login_attempts = 0
|
|
user.save(update_fields=['last_login_ip', 'failed_login_attempts'])
|
|
|
|
# Generate or get token
|
|
token, created = Token.objects.get_or_create(user=user)
|
|
|
|
# Set session expiry
|
|
if not remember_me:
|
|
request.session.set_expiry(3600) # 1 hour
|
|
|
|
# Log successful login
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action_type='LOGIN',
|
|
ip_address=_get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={'remember_me': remember_me, 'mfa_used': user.mfa_enabled}
|
|
)
|
|
|
|
return Response({
|
|
'token': token.key,
|
|
'user': UserSerializer(user).data,
|
|
'message': 'Login successful'
|
|
})
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def logout_view(request):
|
|
"""User logout"""
|
|
# Log logout
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='LOGOUT',
|
|
ip_address=_get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
|
|
# Delete token
|
|
try:
|
|
request.user.auth_token.delete()
|
|
except:
|
|
pass
|
|
|
|
logout(request)
|
|
|
|
return Response({'message': 'Logout successful'})
|
|
|
|
|
|
@api_view(['GET'])
|
|
@permission_classes([IsAuthenticated])
|
|
def user_profile(request):
|
|
"""Get current user profile"""
|
|
serializer = UserProfileSerializer(request.user)
|
|
return Response(serializer.data)
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def change_password(request):
|
|
"""Change user password"""
|
|
serializer = PasswordChangeSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
user = request.user
|
|
current_password = serializer.validated_data['current_password']
|
|
new_password = serializer.validated_data['new_password']
|
|
|
|
# Verify current password
|
|
if not user.check_password(current_password):
|
|
return Response(
|
|
{'error': 'Current password is incorrect'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Set new password
|
|
user.set_password(new_password)
|
|
user.save()
|
|
|
|
# Log password change
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action_type='PASSWORD_CHANGE',
|
|
ip_address=_get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
severity='MEDIUM'
|
|
)
|
|
|
|
return Response({'message': 'Password changed successfully'})
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
@api_view(['GET'])
|
|
@permission_classes([IsAuthenticated])
|
|
def mfa_status(request):
|
|
"""Get MFA status for current user"""
|
|
devices = MFAProvider.get_user_mfa_devices(request.user)
|
|
|
|
return Response({
|
|
'mfa_enabled': request.user.mfa_enabled,
|
|
'devices': devices,
|
|
'device_count': len(devices)
|
|
})
|
|
|
|
|
|
def _get_client_ip(request):
|
|
"""Get client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|