Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

207 lines
7.9 KiB
Python

"""
Security integration for incident intelligence
"""
from django.core.exceptions import PermissionDenied
from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from django.contrib.auth import get_user_model
from django.db import models
from security.models import AuditLog
import logging
User = get_user_model()
logger = logging.getLogger(__name__)
class IncidentSecurityMiddleware(MiddlewareMixin):
"""
Middleware to enforce security policies for incident intelligence
"""
def process_request(self, request):
"""Process request and check security permissions"""
# Skip security checks for non-incident endpoints
if not request.path.startswith('/api/incidents/'):
return None
# Skip for admin and authentication endpoints
if any(path in request.path for path in ['/admin/', '/auth/', '/login/', '/logout/']):
return None
# Check if user is authenticated
if not request.user.is_authenticated:
return JsonResponse({
'error': 'Authentication required',
'detail': 'You must be logged in to access incident data'
}, status=401)
# Log access attempt
self._log_access_attempt(request)
return None
def process_response(self, request, response):
"""Process response and log security events"""
# Log security-relevant responses
if request.path.startswith('/api/incidents/'):
self._log_security_event(request, response)
return response
def _log_access_attempt(self, request):
"""Log incident access attempt"""
try:
AuditLog.objects.create(
user=request.user,
action_type='DATA_ACCESS',
resource_type='incident_intelligence',
resource_id=request.path,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
details={
'method': request.method,
'path': request.path,
'query_params': dict(request.GET),
},
severity='LOW'
)
except Exception as e:
logger.error(f"Failed to log access attempt: {e}")
def _log_security_event(self, request, response):
"""Log security events"""
try:
# Determine severity based on response status
if response.status_code >= 400:
severity = 'HIGH' if response.status_code >= 500 else 'MEDIUM'
else:
severity = 'LOW'
# Log the event
AuditLog.objects.create(
user=request.user,
action_type='DATA_ACCESS',
resource_type='incident_intelligence',
resource_id=request.path,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
details={
'method': request.method,
'path': request.path,
'status_code': response.status_code,
'response_size': len(response.content) if hasattr(response, 'content') else 0,
},
severity=severity
)
except Exception as e:
logger.error(f"Failed to log security event: {e}")
def _get_client_ip(self, request):
"""Get client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class IncidentAccessControl:
"""
Access control utilities for incident intelligence
"""
@staticmethod
def check_incident_access(user, incident, action='read'):
"""Check if user has access to perform action on incident"""
# Check basic incident access
if not incident.is_accessible_by_user(user):
raise PermissionDenied("Insufficient clearance level for this incident")
# Check role-based permissions
if not IncidentAccessControl._check_role_permissions(user, action):
raise PermissionDenied(f"User does not have permission to {action} incidents")
# Check data classification access
if not IncidentAccessControl._check_data_classification_access(user, incident):
raise PermissionDenied("Insufficient data classification access")
return True
@staticmethod
def _check_role_permissions(user, action):
"""Check if user has required role permissions"""
# Get user's effective permissions
permissions = user.get_effective_permissions()
# Define required permissions for each action
required_permissions = {
'read': ['incident_intelligence.view_incident'],
'create': ['incident_intelligence.add_incident'],
'update': ['incident_intelligence.change_incident'],
'delete': ['incident_intelligence.delete_incident'],
'analyze': ['incident_intelligence.analyze_incident'],
}
required = required_permissions.get(action, [])
return all(perm in [p.codename for p in permissions] for perm in required)
@staticmethod
def _check_data_classification_access(user, incident):
"""Check if user has access to incident's data classification"""
if not incident.data_classification:
return True # No classification means accessible to all
return user.has_data_access(incident.data_classification.level)
@staticmethod
def filter_incidents_by_access(user, queryset):
"""Filter incidents based on user's access level"""
# Get user's clearance level
user_clearance = user.clearance_level.level if user.clearance_level else 1
# Filter incidents based on data classification
accessible_incidents = queryset.filter(
models.Q(data_classification__isnull=True) | # No classification
models.Q(data_classification__level__lte=user_clearance) # User has sufficient clearance
)
# Filter out sensitive incidents if user doesn't have clearance
if not user.clearance_level or user.clearance_level.level < 3: # Less than CONFIDENTIAL
accessible_incidents = accessible_incidents.filter(is_sensitive=False)
return accessible_incidents
def log_incident_operation(user, incident, operation, details=None):
"""Log incident operations for audit trail"""
try:
AuditLog.objects.create(
user=user,
action_type='DATA_MODIFIED' if operation in ['create', 'update', 'delete'] else 'DATA_ACCESS',
resource_type='incident',
resource_id=str(incident.id),
details={
'operation': operation,
'incident_title': incident.title,
'incident_severity': incident.severity,
'incident_category': incident.category,
'data_classification': incident.data_classification.name if incident.data_classification else None,
**(details or {})
},
severity='HIGH' if incident.severity in ['CRITICAL', 'EMERGENCY'] else 'MEDIUM'
)
except Exception as e:
logger.error(f"Failed to log incident operation: {e}")
def get_user_accessible_incidents(user):
"""Get incidents accessible by user based on security policies"""
from .models import Incident
# Start with all incidents
queryset = Incident.objects.all()
# Apply access control filters
return IncidentAccessControl.filter_incidents_by_access(user, queryset)