375 lines
14 KiB
Python
375 lines
14 KiB
Python
"""
|
|
Zero Trust API Views
|
|
Provides endpoints for device registration, risk assessment, and Zero Trust management
|
|
"""
|
|
import logging
|
|
from rest_framework import status, generics, permissions
|
|
from rest_framework.decorators import api_view, permission_classes, action
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import ModelViewSet
|
|
from rest_framework.permissions import IsAuthenticated, IsAdminUser
|
|
from django.utils import timezone
|
|
from django.db import models
|
|
|
|
from ..models import (
|
|
DevicePosture, GeolocationRule, RiskAssessment,
|
|
AdaptiveAuthentication, UserBehaviorProfile
|
|
)
|
|
from ..serializers.zero_trust import (
|
|
DevicePostureSerializer, GeolocationRuleSerializer,
|
|
RiskAssessmentSerializer, AdaptiveAuthenticationSerializer,
|
|
UserBehaviorProfileSerializer, DeviceRegistrationSerializer,
|
|
RiskAssessmentRequestSerializer
|
|
)
|
|
from ..services.zero_trust import zero_trust_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DevicePostureViewSet(ModelViewSet):
|
|
"""ViewSet for device posture management"""
|
|
queryset = DevicePosture.objects.all()
|
|
serializer_class = DevicePostureSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Users can only see their own devices"""
|
|
return DevicePosture.objects.filter(user=self.request.user)
|
|
|
|
def perform_create(self, serializer):
|
|
"""Create device posture for current user"""
|
|
serializer.save(user=self.request.user)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def update_posture(self, request, pk=None):
|
|
"""Update device posture information"""
|
|
device = self.get_object()
|
|
|
|
# Update device posture data
|
|
device_data = request.data
|
|
for field, value in device_data.items():
|
|
if hasattr(device, field) and field not in ['id', 'user', 'created_at']:
|
|
setattr(device, field, value)
|
|
|
|
# Recalculate risk score and trust level
|
|
device.risk_score = device.calculate_risk_score()
|
|
device.update_trust_level()
|
|
device.save()
|
|
|
|
return Response({
|
|
'message': 'Device posture updated',
|
|
'risk_score': device.risk_score,
|
|
'trust_level': device.trust_level,
|
|
'is_compliant': device.is_compliant
|
|
})
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def register_device(self, request):
|
|
"""Register a new device"""
|
|
serializer = DeviceRegistrationSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
try:
|
|
device = zero_trust_service.register_device(
|
|
request.user,
|
|
serializer.validated_data
|
|
)
|
|
|
|
return Response({
|
|
'success': True,
|
|
'device_id': device.device_id,
|
|
'trust_level': device.trust_level,
|
|
'risk_score': device.risk_score,
|
|
'is_compliant': device.is_compliant,
|
|
'message': 'Device registered successfully'
|
|
}, status=status.HTTP_201_CREATED)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Device registration failed: {e}")
|
|
return Response({
|
|
'error': 'Device registration failed',
|
|
'details': str(e)
|
|
}, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class GeolocationRuleViewSet(ModelViewSet):
|
|
"""ViewSet for geolocation rule management"""
|
|
queryset = GeolocationRule.objects.all()
|
|
serializer_class = GeolocationRuleSerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def test_rule(self, request, pk=None):
|
|
"""Test geolocation rule against provided location data"""
|
|
rule = self.get_object()
|
|
location_data = request.data
|
|
|
|
result = rule.evaluate_location(**location_data)
|
|
|
|
return Response({
|
|
'rule_name': rule.name,
|
|
'rule_type': rule.rule_type,
|
|
'test_result': result,
|
|
'location_data': location_data
|
|
})
|
|
|
|
|
|
class RiskAssessmentViewSet(ModelViewSet):
|
|
"""ViewSet for risk assessment management"""
|
|
queryset = RiskAssessment.objects.all()
|
|
serializer_class = RiskAssessmentSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Users can only see their own risk assessments"""
|
|
return RiskAssessment.objects.filter(user=self.request.user)
|
|
|
|
@action(detail=False, methods=['post'])
|
|
def assess_access(self, request):
|
|
"""Perform risk assessment for access request"""
|
|
serializer = RiskAssessmentRequestSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
try:
|
|
# Collect request context
|
|
request_context = {
|
|
'ip_address': self._get_client_ip(request),
|
|
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
|
|
'device_id': request.headers.get('X-Device-ID'),
|
|
'timestamp': timezone.now(),
|
|
**serializer.validated_data
|
|
}
|
|
|
|
# Perform assessment
|
|
assessment_result = zero_trust_service.assess_access_request(
|
|
request.user,
|
|
request_context
|
|
)
|
|
|
|
return Response(assessment_result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Risk assessment failed: {e}")
|
|
return Response({
|
|
'error': 'Risk assessment failed',
|
|
'details': str(e)
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def my_risk_profile(self, request):
|
|
"""Get user's current risk profile"""
|
|
try:
|
|
# Get latest risk assessment
|
|
latest_assessment = RiskAssessment.objects.filter(
|
|
user=request.user
|
|
).order_by('-assessed_at').first()
|
|
|
|
# Get behavior profile
|
|
try:
|
|
behavior_profile = UserBehaviorProfile.objects.get(user=request.user)
|
|
except UserBehaviorProfile.DoesNotExist:
|
|
behavior_profile = None
|
|
|
|
# Get device postures
|
|
devices = DevicePosture.objects.filter(
|
|
user=request.user,
|
|
is_active=True
|
|
).order_by('-last_seen')
|
|
|
|
return Response({
|
|
'latest_assessment': RiskAssessmentSerializer(latest_assessment).data if latest_assessment else None,
|
|
'behavior_profile': UserBehaviorProfileSerializer(behavior_profile).data if behavior_profile else None,
|
|
'devices': DevicePostureSerializer(devices, many=True).data,
|
|
'risk_summary': {
|
|
'total_devices': devices.count(),
|
|
'trusted_devices': devices.filter(is_trusted=True).count(),
|
|
'compliant_devices': devices.filter(is_compliant=True).count(),
|
|
'average_risk_score': devices.aggregate(avg_risk=models.Avg('risk_score'))['avg_risk'] or 0
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get risk profile: {e}")
|
|
return Response({
|
|
'error': 'Failed to get risk profile',
|
|
'details': str(e)
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
def _get_client_ip(self, request):
|
|
"""Get client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0].strip()
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class AdaptiveAuthenticationViewSet(ModelViewSet):
|
|
"""ViewSet for adaptive authentication management"""
|
|
queryset = AdaptiveAuthentication.objects.all()
|
|
serializer_class = AdaptiveAuthenticationSerializer
|
|
permission_classes = [IsAdminUser]
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def test_auth_requirements(self, request, pk=None):
|
|
"""Test authentication requirements for given risk score"""
|
|
adaptive_auth = self.get_object()
|
|
risk_score = request.data.get('risk_score', 0)
|
|
|
|
required_methods = adaptive_auth.get_required_auth_methods(risk_score)
|
|
|
|
return Response({
|
|
'risk_score': risk_score,
|
|
'required_auth_methods': required_methods,
|
|
'adaptive_auth_name': adaptive_auth.name
|
|
})
|
|
|
|
|
|
class UserBehaviorProfileViewSet(ModelViewSet):
|
|
"""ViewSet for user behavior profile management"""
|
|
queryset = UserBehaviorProfile.objects.all()
|
|
serializer_class = UserBehaviorProfileSerializer
|
|
permission_classes = [IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
"""Users can only see their own behavior profile"""
|
|
return UserBehaviorProfile.objects.filter(user=self.request.user)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def calculate_anomaly(self, request, pk=None):
|
|
"""Calculate anomaly score for current behavior"""
|
|
profile = self.get_object()
|
|
current_behavior = request.data
|
|
|
|
anomaly_score = profile.calculate_anomaly_score(current_behavior)
|
|
|
|
return Response({
|
|
'anomaly_score': anomaly_score,
|
|
'is_anomalous': anomaly_score > 0.7, # Threshold
|
|
'current_behavior': current_behavior
|
|
})
|
|
|
|
|
|
@api_view(['GET'])
|
|
@permission_classes([IsAuthenticated])
|
|
def zero_trust_status(request):
|
|
"""Get Zero Trust system status and configuration"""
|
|
try:
|
|
# Get user's Zero Trust status
|
|
user_devices = DevicePosture.objects.filter(user=request.user, is_active=True)
|
|
latest_assessment = RiskAssessment.objects.filter(user=request.user).order_by('-assessed_at').first()
|
|
|
|
# Get system configuration
|
|
adaptive_auth = AdaptiveAuthentication.objects.filter(is_active=True).first()
|
|
geo_rules = GeolocationRule.objects.filter(is_active=True).count()
|
|
|
|
return Response({
|
|
'zero_trust_enabled': True,
|
|
'user_status': {
|
|
'registered_devices': user_devices.count(),
|
|
'trusted_devices': user_devices.filter(is_trusted=True).count(),
|
|
'latest_risk_level': latest_assessment.risk_level if latest_assessment else 'UNKNOWN',
|
|
'latest_risk_score': latest_assessment.overall_risk_score if latest_assessment else 0
|
|
},
|
|
'system_configuration': {
|
|
'adaptive_auth_enabled': adaptive_auth is not None,
|
|
'geolocation_rules_count': geo_rules,
|
|
'behavioral_analysis_enabled': True,
|
|
'device_posture_enabled': True
|
|
},
|
|
'recommendations': _get_security_recommendations(request.user, user_devices, latest_assessment)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get Zero Trust status: {e}")
|
|
return Response({
|
|
'error': 'Failed to get Zero Trust status',
|
|
'details': str(e)
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def perform_risk_assessment(request):
|
|
"""Perform a comprehensive risk assessment"""
|
|
try:
|
|
# Collect request context
|
|
request_context = {
|
|
'ip_address': _get_client_ip(request),
|
|
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
|
|
'device_id': request.headers.get('X-Device-ID'),
|
|
'timestamp': timezone.now(),
|
|
**request.data
|
|
}
|
|
|
|
# Perform assessment
|
|
assessment_result = zero_trust_service.assess_access_request(
|
|
request.user,
|
|
request_context
|
|
)
|
|
|
|
return Response(assessment_result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Risk assessment failed: {e}")
|
|
return Response({
|
|
'error': 'Risk assessment failed',
|
|
'details': str(e)
|
|
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
|
|
def _get_security_recommendations(user, devices, latest_assessment):
|
|
"""Get security recommendations based on user's current status"""
|
|
recommendations = []
|
|
|
|
# Device recommendations
|
|
if devices.count() == 0:
|
|
recommendations.append({
|
|
'type': 'device',
|
|
'priority': 'high',
|
|
'message': 'Register your device for enhanced security',
|
|
'action': 'register_device'
|
|
})
|
|
|
|
untrusted_devices = devices.filter(is_trusted=False)
|
|
if untrusted_devices.exists():
|
|
recommendations.append({
|
|
'type': 'device',
|
|
'priority': 'medium',
|
|
'message': f'{untrusted_devices.count()} untrusted devices detected',
|
|
'action': 'improve_device_security'
|
|
})
|
|
|
|
# Risk recommendations
|
|
if latest_assessment and latest_assessment.risk_level in ['HIGH', 'CRITICAL']:
|
|
recommendations.append({
|
|
'type': 'risk',
|
|
'priority': 'high',
|
|
'message': f'High risk level detected: {latest_assessment.risk_level}',
|
|
'action': 'review_security_settings'
|
|
})
|
|
|
|
# MFA recommendations
|
|
if not user.mfa_enabled:
|
|
recommendations.append({
|
|
'type': 'authentication',
|
|
'priority': 'medium',
|
|
'message': 'Enable Multi-Factor Authentication',
|
|
'action': 'enable_mfa'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
|
|
def _get_client_ip(request):
|
|
"""Get client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0].strip()
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|