207 lines
7.9 KiB
Python
207 lines
7.9 KiB
Python
"""
|
|
Security integration for incident intelligence
|
|
"""
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.http import JsonResponse
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
from django.contrib.auth import get_user_model
|
|
from django.db import models
|
|
from security.models import AuditLog
|
|
import logging
|
|
|
|
User = get_user_model()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IncidentSecurityMiddleware(MiddlewareMixin):
|
|
"""
|
|
Middleware to enforce security policies for incident intelligence
|
|
"""
|
|
|
|
def process_request(self, request):
|
|
"""Process request and check security permissions"""
|
|
# Skip security checks for non-incident endpoints
|
|
if not request.path.startswith('/api/incidents/'):
|
|
return None
|
|
|
|
# Skip for admin and authentication endpoints
|
|
if any(path in request.path for path in ['/admin/', '/auth/', '/login/', '/logout/']):
|
|
return None
|
|
|
|
# Check if user is authenticated
|
|
if not request.user.is_authenticated:
|
|
return JsonResponse({
|
|
'error': 'Authentication required',
|
|
'detail': 'You must be logged in to access incident data'
|
|
}, status=401)
|
|
|
|
# Log access attempt
|
|
self._log_access_attempt(request)
|
|
|
|
return None
|
|
|
|
def process_response(self, request, response):
|
|
"""Process response and log security events"""
|
|
# Log security-relevant responses
|
|
if request.path.startswith('/api/incidents/'):
|
|
self._log_security_event(request, response)
|
|
|
|
return response
|
|
|
|
def _log_access_attempt(self, request):
|
|
"""Log incident access attempt"""
|
|
try:
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='DATA_ACCESS',
|
|
resource_type='incident_intelligence',
|
|
resource_id=request.path,
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={
|
|
'method': request.method,
|
|
'path': request.path,
|
|
'query_params': dict(request.GET),
|
|
},
|
|
severity='LOW'
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to log access attempt: {e}")
|
|
|
|
def _log_security_event(self, request, response):
|
|
"""Log security events"""
|
|
try:
|
|
# Determine severity based on response status
|
|
if response.status_code >= 400:
|
|
severity = 'HIGH' if response.status_code >= 500 else 'MEDIUM'
|
|
else:
|
|
severity = 'LOW'
|
|
|
|
# Log the event
|
|
AuditLog.objects.create(
|
|
user=request.user,
|
|
action_type='DATA_ACCESS',
|
|
resource_type='incident_intelligence',
|
|
resource_id=request.path,
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={
|
|
'method': request.method,
|
|
'path': request.path,
|
|
'status_code': response.status_code,
|
|
'response_size': len(response.content) if hasattr(response, 'content') else 0,
|
|
},
|
|
severity=severity
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to log security event: {e}")
|
|
|
|
def _get_client_ip(self, request):
|
|
"""Get client IP address"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class IncidentAccessControl:
|
|
"""
|
|
Access control utilities for incident intelligence
|
|
"""
|
|
|
|
@staticmethod
|
|
def check_incident_access(user, incident, action='read'):
|
|
"""Check if user has access to perform action on incident"""
|
|
# Check basic incident access
|
|
if not incident.is_accessible_by_user(user):
|
|
raise PermissionDenied("Insufficient clearance level for this incident")
|
|
|
|
# Check role-based permissions
|
|
if not IncidentAccessControl._check_role_permissions(user, action):
|
|
raise PermissionDenied(f"User does not have permission to {action} incidents")
|
|
|
|
# Check data classification access
|
|
if not IncidentAccessControl._check_data_classification_access(user, incident):
|
|
raise PermissionDenied("Insufficient data classification access")
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def _check_role_permissions(user, action):
|
|
"""Check if user has required role permissions"""
|
|
# Get user's effective permissions
|
|
permissions = user.get_effective_permissions()
|
|
|
|
# Define required permissions for each action
|
|
required_permissions = {
|
|
'read': ['incident_intelligence.view_incident'],
|
|
'create': ['incident_intelligence.add_incident'],
|
|
'update': ['incident_intelligence.change_incident'],
|
|
'delete': ['incident_intelligence.delete_incident'],
|
|
'analyze': ['incident_intelligence.analyze_incident'],
|
|
}
|
|
|
|
required = required_permissions.get(action, [])
|
|
return all(perm in [p.codename for p in permissions] for perm in required)
|
|
|
|
@staticmethod
|
|
def _check_data_classification_access(user, incident):
|
|
"""Check if user has access to incident's data classification"""
|
|
if not incident.data_classification:
|
|
return True # No classification means accessible to all
|
|
|
|
return user.has_data_access(incident.data_classification.level)
|
|
|
|
@staticmethod
|
|
def filter_incidents_by_access(user, queryset):
|
|
"""Filter incidents based on user's access level"""
|
|
# Get user's clearance level
|
|
user_clearance = user.clearance_level.level if user.clearance_level else 1
|
|
|
|
# Filter incidents based on data classification
|
|
accessible_incidents = queryset.filter(
|
|
models.Q(data_classification__isnull=True) | # No classification
|
|
models.Q(data_classification__level__lte=user_clearance) # User has sufficient clearance
|
|
)
|
|
|
|
# Filter out sensitive incidents if user doesn't have clearance
|
|
if not user.clearance_level or user.clearance_level.level < 3: # Less than CONFIDENTIAL
|
|
accessible_incidents = accessible_incidents.filter(is_sensitive=False)
|
|
|
|
return accessible_incidents
|
|
|
|
|
|
def log_incident_operation(user, incident, operation, details=None):
|
|
"""Log incident operations for audit trail"""
|
|
try:
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action_type='DATA_MODIFIED' if operation in ['create', 'update', 'delete'] else 'DATA_ACCESS',
|
|
resource_type='incident',
|
|
resource_id=str(incident.id),
|
|
details={
|
|
'operation': operation,
|
|
'incident_title': incident.title,
|
|
'incident_severity': incident.severity,
|
|
'incident_category': incident.category,
|
|
'data_classification': incident.data_classification.name if incident.data_classification else None,
|
|
**(details or {})
|
|
},
|
|
severity='HIGH' if incident.severity in ['CRITICAL', 'EMERGENCY'] else 'MEDIUM'
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to log incident operation: {e}")
|
|
|
|
|
|
def get_user_accessible_incidents(user):
|
|
"""Get incidents accessible by user based on security policies"""
|
|
from .models import Incident
|
|
|
|
# Start with all incidents
|
|
queryset = Incident.objects.all()
|
|
|
|
# Apply access control filters
|
|
return IncidentAccessControl.filter_incidents_by_access(user, queryset)
|