""" Enterprise Security System for ETB-API Comprehensive security features including threat detection, audit logging, and compliance """ import logging import hashlib import hmac import time import json from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Union from django.http import HttpRequest, JsonResponse from django.conf import settings from django.utils import timezone from django.core.cache import cache from django.contrib.auth.models import User from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed from django.dispatch import receiver from django.db.models import Q from rest_framework import status from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated from rest_framework.throttling import UserRateThrottle import requests import ipaddress from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC import base64 import os logger = logging.getLogger(__name__) class SecurityEvent: """Security event tracking""" def __init__(self, event_type: str, severity: str, description: str, user: Optional[User] = None, ip_address: str = None, user_agent: str = None, metadata: Dict[str, Any] = None): self.event_type = event_type self.severity = severity self.description = description self.user = user self.ip_address = ip_address self.user_agent = user_agent self.metadata = metadata or {} self.timestamp = timezone.now() self.id = self._generate_event_id() def _generate_event_id(self) -> str: """Generate unique event ID""" data = f"{self.event_type}{self.timestamp.isoformat()}{self.ip_address or ''}" return hashlib.sha256(data.encode()).hexdigest()[:16] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for storage""" return { 'id': self.id, 'event_type': self.event_type, 'severity': self.severity, 'description': self.description, 'user_id': self.user.id if self.user else None, 'username': self.user.username if self.user else None, 'ip_address': self.ip_address, 'user_agent': self.user_agent, 'metadata': self.metadata, 'timestamp': self.timestamp.isoformat(), } class ThreatDetectionService: """Enterprise threat detection and analysis""" def __init__(self): self.suspicious_patterns = [ 'sql_injection', 'xss_attempt', 'path_traversal', 'command_injection', 'brute_force', 'credential_stuffing', 'anomalous_behavior', 'privilege_escalation', ] self.risk_factors = { 'high_risk_ips': self._load_high_risk_ips(), 'suspicious_user_agents': self._load_suspicious_user_agents(), 'known_attack_patterns': self._load_attack_patterns(), } def _load_high_risk_ips(self) -> List[str]: """Load list of high-risk IP addresses""" # In production, this would load from a threat intelligence feed return [ '192.168.1.100', # Example suspicious IP '10.0.0.50', # Example suspicious IP ] def _load_suspicious_user_agents(self) -> List[str]: """Load list of suspicious user agents""" return [ 'sqlmap', 'nikto', 'nmap', 'masscan', 'zap', 'burp', ] def _load_attack_patterns(self) -> List[str]: """Load known attack patterns""" return [ r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b)', r']*>.*?', r'javascript:', r'\.\./', r'\.\.\\', r']*>', r']*>', r']*>', ] def analyze_request(self, request: HttpRequest) -> Dict[str, Any]: """Analyze request for security threats""" analysis = { 'risk_score': 0, 'threats_detected': [], 'recommendations': [], 'block_request': False, } # Check IP address ip_analysis = self._analyze_ip_address(request) analysis['risk_score'] += ip_analysis['risk_score'] analysis['threats_detected'].extend(ip_analysis['threats']) # Check user agent ua_analysis = self._analyze_user_agent(request) analysis['risk_score'] += ua_analysis['risk_score'] analysis['threats_detected'].extend(ua_analysis['threats']) # Check request parameters param_analysis = self._analyze_parameters(request) analysis['risk_score'] += param_analysis['risk_score'] analysis['threats_detected'].extend(param_analysis['threats']) # Check request headers header_analysis = self._analyze_headers(request) analysis['risk_score'] += header_analysis['risk_score'] analysis['threats_detected'].extend(header_analysis['threats']) # Check request body body_analysis = self._analyze_request_body(request) analysis['risk_score'] += body_analysis['risk_score'] analysis['threats_detected'].extend(body_analysis['threats']) # Determine if request should be blocked if analysis['risk_score'] >= 80: analysis['block_request'] = True analysis['recommendations'].append('Block request due to high risk score') elif analysis['risk_score'] >= 50: analysis['recommendations'].append('Monitor request closely') return analysis def _analyze_ip_address(self, request: HttpRequest) -> Dict[str, Any]: """Analyze IP address for threats""" ip_address = self._get_client_ip(request) analysis = { 'risk_score': 0, 'threats': [], } # Check if IP is in high-risk list if ip_address in self.risk_factors['high_risk_ips']: analysis['risk_score'] += 40 analysis['threats'].append('High-risk IP address') # Check if IP is from suspicious country/region if self._is_suspicious_geolocation(ip_address): analysis['risk_score'] += 20 analysis['threats'].append('Suspicious geolocation') # Check for rapid requests from same IP if self._is_rapid_requests(ip_address): analysis['risk_score'] += 30 analysis['threats'].append('Rapid requests detected') return analysis def _analyze_user_agent(self, request: HttpRequest) -> Dict[str, Any]: """Analyze user agent for threats""" user_agent = request.META.get('HTTP_USER_AGENT', '') analysis = { 'risk_score': 0, 'threats': [], } # Check for suspicious user agents for suspicious_ua in self.risk_factors['suspicious_user_agents']: if suspicious_ua.lower() in user_agent.lower(): analysis['risk_score'] += 50 analysis['threats'].append(f'Suspicious user agent: {suspicious_ua}') # Check for missing user agent if not user_agent: analysis['risk_score'] += 10 analysis['threats'].append('Missing user agent') return analysis def _analyze_parameters(self, request: HttpRequest) -> Dict[str, Any]: """Analyze request parameters for attack patterns""" analysis = { 'risk_score': 0, 'threats': [], } # Check GET parameters for key, value in request.GET.items(): param_analysis = self._check_attack_patterns(str(value)) analysis['risk_score'] += param_analysis['risk_score'] analysis['threats'].extend(param_analysis['threats']) # Check POST parameters if hasattr(request, 'POST'): for key, value in request.POST.items(): param_analysis = self._check_attack_patterns(str(value)) analysis['risk_score'] += param_analysis['risk_score'] analysis['threats'].extend(param_analysis['threats']) return analysis def _analyze_headers(self, request: HttpRequest) -> Dict[str, Any]: """Analyze request headers for threats""" analysis = { 'risk_score': 0, 'threats': [], } # Check for suspicious headers suspicious_headers = [ 'X-Forwarded-For', 'X-Real-IP', 'X-Originating-IP', 'X-Remote-IP', 'X-Remote-Addr', ] for header in suspicious_headers: if header in request.META: value = request.META[header] if self._is_suspicious_ip(value): analysis['risk_score'] += 20 analysis['threats'].append(f'Suspicious header: {header}') return analysis def _analyze_request_body(self, request: HttpRequest) -> Dict[str, Any]: """Analyze request body for threats""" analysis = { 'risk_score': 0, 'threats': [], } # Check request body for attack patterns if hasattr(request, 'body') and request.body: body_analysis = self._check_attack_patterns(request.body.decode('utf-8', errors='ignore')) analysis['risk_score'] += body_analysis['risk_score'] analysis['threats'].extend(body_analysis['threats']) return analysis def _check_attack_patterns(self, text: str) -> Dict[str, Any]: """Check text for attack patterns""" analysis = { 'risk_score': 0, 'threats': [], } import re for pattern in self.risk_factors['known_attack_patterns']: if re.search(pattern, text, re.IGNORECASE): analysis['risk_score'] += 25 analysis['threats'].append(f'Attack pattern detected: {pattern}') return analysis def _get_client_ip(self, request: HttpRequest) -> str: """Get client IP address""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip def _is_suspicious_geolocation(self, ip_address: str) -> bool: """Check if IP is from suspicious geolocation""" # In production, use a geolocation service # For now, return False return False def _is_rapid_requests(self, ip_address: str) -> bool: """Check if IP is making rapid requests""" cache_key = f"rapid_requests_{ip_address}" request_count = cache.get(cache_key, 0) if request_count > 100: # More than 100 requests in 1 minute return True cache.set(cache_key, request_count + 1, 60) # 1 minute return False def _is_suspicious_ip(self, ip_address: str) -> bool: """Check if IP address is suspicious""" try: ip = ipaddress.ip_address(ip_address) # Check for private IPs in suspicious headers if ip.is_private: return True except ValueError: return True return False class AuditLogger: """Enterprise audit logging system""" def __init__(self): self.logger = logging.getLogger('security.audit') self.encryption_key = self._get_encryption_key() def _get_encryption_key(self) -> bytes: """Get encryption key for sensitive data""" key = os.getenv('AUDIT_ENCRYPTION_KEY') if not key: # Generate a key if not set key = Fernet.generate_key() logger.warning("AUDIT_ENCRYPTION_KEY not set, using generated key") return key def log_security_event(self, event: SecurityEvent) -> None: """Log security event""" try: # Encrypt sensitive data encrypted_data = self._encrypt_sensitive_data(event.to_dict()) # Log to file self.logger.info(f"Security Event: {json.dumps(encrypted_data)}") # Store in database (if configured) self._store_in_database(event) # Send to SIEM (if configured) self._send_to_siem(event) except Exception as e: logger.error(f"Failed to log security event: {str(e)}") def _encrypt_sensitive_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """Encrypt sensitive data in audit log""" sensitive_fields = ['ip_address', 'user_agent', 'metadata'] for field in sensitive_fields: if field in data and data[field]: try: fernet = Fernet(self.encryption_key) encrypted_value = fernet.encrypt(str(data[field]).encode()) data[field] = base64.b64encode(encrypted_value).decode() except Exception as e: logger.error(f"Failed to encrypt {field}: {str(e)}") return data def _store_in_database(self, event: SecurityEvent) -> None: """Store security event in database""" try: from security.models import SecurityEvent as SecurityEventModel SecurityEventModel.objects.create( event_type=event.event_type, severity=event.severity, description=event.description, user=event.user, ip_address=event.ip_address, user_agent=event.user_agent, metadata=event.metadata, timestamp=event.timestamp, ) except Exception as e: logger.error(f"Failed to store security event in database: {str(e)}") def _send_to_siem(self, event: SecurityEvent) -> None: """Send security event to SIEM system""" try: siem_url = os.getenv('SIEM_WEBHOOK_URL') if not siem_url: return payload = { 'event_type': event.event_type, 'severity': event.severity, 'description': event.description, 'timestamp': event.timestamp.isoformat(), 'source': 'etb-api', } if event.user: payload['user_id'] = event.user.id payload['username'] = event.user.username if event.ip_address: payload['ip_address'] = event.ip_address response = requests.post(siem_url, json=payload, timeout=5) response.raise_for_status() except Exception as e: logger.error(f"Failed to send event to SIEM: {str(e)}") class ComplianceManager: """Enterprise compliance management""" def __init__(self): self.compliance_frameworks = { 'SOX': self._get_sox_requirements(), 'HIPAA': self._get_hipaa_requirements(), 'GDPR': self._get_gdpr_requirements(), 'PCI_DSS': self._get_pci_dss_requirements(), 'ISO27001': self._get_iso27001_requirements(), } def _get_sox_requirements(self) -> Dict[str, Any]: """Get SOX compliance requirements""" return { 'access_controls': True, 'audit_trails': True, 'data_integrity': True, 'change_management': True, 'segregation_of_duties': True, } def _get_hipaa_requirements(self) -> Dict[str, Any]: """Get HIPAA compliance requirements""" return { 'data_encryption': True, 'access_controls': True, 'audit_logs': True, 'data_backup': True, 'incident_response': True, } def _get_gdpr_requirements(self) -> Dict[str, Any]: """Get GDPR compliance requirements""" return { 'data_protection': True, 'consent_management': True, 'data_portability': True, 'right_to_erasure': True, 'privacy_by_design': True, } def _get_pci_dss_requirements(self) -> Dict[str, Any]: """Get PCI DSS compliance requirements""" return { 'network_security': True, 'data_protection': True, 'access_controls': True, 'monitoring': True, 'incident_response': True, } def _get_iso27001_requirements(self) -> Dict[str, Any]: """Get ISO 27001 compliance requirements""" return { 'information_security_policy': True, 'risk_management': True, 'access_controls': True, 'incident_management': True, 'business_continuity': True, } def check_compliance(self, framework: str) -> Dict[str, Any]: """Check compliance with specific framework""" if framework not in self.compliance_frameworks: return { 'error': f'Unknown compliance framework: {framework}', } requirements = self.compliance_frameworks[framework] compliance_status = {} for requirement, required in requirements.items(): compliance_status[requirement] = self._check_requirement(requirement, required) # Calculate overall compliance score total_requirements = len(requirements) met_requirements = sum(1 for status in compliance_status.values() if status['compliant']) compliance_score = (met_requirements / total_requirements) * 100 return { 'framework': framework, 'compliance_score': compliance_score, 'requirements': compliance_status, 'overall_status': 'compliant' if compliance_score >= 80 else 'non_compliant', } def _check_requirement(self, requirement: str, required: bool) -> Dict[str, Any]: """Check specific compliance requirement""" if not required: return { 'compliant': True, 'message': 'Requirement not applicable', } # Check if requirement is implemented if requirement == 'access_controls': return self._check_access_controls() elif requirement == 'audit_trails': return self._check_audit_trails() elif requirement == 'data_encryption': return self._check_data_encryption() elif requirement == 'incident_response': return self._check_incident_response() else: return { 'compliant': False, 'message': f'Requirement {requirement} not implemented', } def _check_access_controls(self) -> Dict[str, Any]: """Check access control implementation""" try: from django.contrib.auth.models import User from django.contrib.auth.models import Permission # Check if users exist user_count = User.objects.count() if user_count == 0: return { 'compliant': False, 'message': 'No users configured', } # Check if permissions are configured permission_count = Permission.objects.count() if permission_count == 0: return { 'compliant': False, 'message': 'No permissions configured', } return { 'compliant': True, 'message': 'Access controls properly configured', } except Exception as e: return { 'compliant': False, 'message': f'Error checking access controls: {str(e)}', } def _check_audit_trails(self) -> Dict[str, Any]: """Check audit trail implementation""" try: from security.models import SecurityEvent # Check if audit logs exist event_count = SecurityEvent.objects.count() if event_count == 0: return { 'compliant': False, 'message': 'No audit logs found', } # Check if recent audit logs exist recent_events = SecurityEvent.objects.filter( timestamp__gte=timezone.now() - timedelta(days=1) ).count() if recent_events == 0: return { 'compliant': False, 'message': 'No recent audit logs found', } return { 'compliant': True, 'message': 'Audit trails properly configured', } except Exception as e: return { 'compliant': False, 'message': f'Error checking audit trails: {str(e)}', } def _check_data_encryption(self) -> Dict[str, Any]: """Check data encryption implementation""" try: # Check if encryption is enabled in settings if not getattr(settings, 'SECURE_SSL_REDIRECT', False): return { 'compliant': False, 'message': 'SSL/TLS encryption not enabled', } # Check if database encryption is configured db_engine = settings.DATABASES['default']['ENGINE'] if 'postgresql' in db_engine: # Check if SSL is required db_options = settings.DATABASES['default'].get('OPTIONS', {}) if not db_options.get('sslmode'): return { 'compliant': False, 'message': 'Database SSL not configured', } return { 'compliant': True, 'message': 'Data encryption properly configured', } except Exception as e: return { 'compliant': False, 'message': f'Error checking data encryption: {str(e)}', } def _check_incident_response(self) -> Dict[str, Any]: """Check incident response implementation""" try: from incident_intelligence.models import Incident # Check if incident management is configured incident_count = Incident.objects.count() if incident_count == 0: return { 'compliant': False, 'message': 'No incident management configured', } return { 'compliant': True, 'message': 'Incident response properly configured', } except Exception as e: return { 'compliant': False, 'message': f'Error checking incident response: {str(e)}', } # Global instances threat_detector = ThreatDetectionService() audit_logger = AuditLogger() compliance_manager = ComplianceManager() # Signal handlers for security events @receiver(user_logged_in) def log_user_login(sender, request, user, **kwargs): """Log successful user login""" event = SecurityEvent( event_type='user_login', severity='info', description=f'User {user.username} logged in successfully', user=user, ip_address=threat_detector._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT'), metadata={'login_method': 'password'} ) audit_logger.log_security_event(event) @receiver(user_logged_out) def log_user_logout(sender, request, user, **kwargs): """Log user logout""" event = SecurityEvent( event_type='user_logout', severity='info', description=f'User {user.username} logged out', user=user, ip_address=threat_detector._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT'), ) audit_logger.log_security_event(event) @receiver(user_login_failed) def log_login_failure(sender, credentials, request, **kwargs): """Log failed login attempt""" event = SecurityEvent( event_type='login_failure', severity='warning', description=f'Failed login attempt for user: {credentials.get("username", "unknown")}', ip_address=threat_detector._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT'), metadata={'username': credentials.get('username', 'unknown')} ) audit_logger.log_security_event(event) # API Views for security management @api_view(['GET']) @permission_classes([IsAuthenticated]) def security_dashboard(request): """Get security dashboard data""" try: # Get recent security events from security.models import SecurityEvent recent_events = SecurityEvent.objects.filter( timestamp__gte=timezone.now() - timedelta(days=7) ).order_by('-timestamp')[:10] # Get threat analysis threat_analysis = threat_detector.analyze_request(request) # Get compliance status compliance_status = {} for framework in compliance_manager.compliance_frameworks.keys(): compliance_status[framework] = compliance_manager.check_compliance(framework) return Response({ 'recent_events': [ { 'event_type': event.event_type, 'severity': event.severity, 'description': event.description, 'timestamp': event.timestamp.isoformat(), 'user': event.user.username if event.user else None, } for event in recent_events ], 'threat_analysis': threat_analysis, 'compliance_status': compliance_status, }) except Exception as e: logger.error(f"Security dashboard error: {str(e)}") return Response( {'error': 'Failed to load security dashboard'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['GET']) @permission_classes([IsAuthenticated]) def compliance_report(request): """Get compliance report""" try: framework = request.GET.get('framework') if framework: report = compliance_manager.check_compliance(framework) else: report = {} for framework in compliance_manager.compliance_frameworks.keys(): report[framework] = compliance_manager.check_compliance(framework) return Response(report) except Exception as e: logger.error(f"Compliance report error: {str(e)}") return Response( {'error': 'Failed to generate compliance report'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) class SecurityMiddleware: """Security middleware for threat detection and logging""" def __init__(self, get_response): self.get_response = get_response def __call__(self, request): # Analyze request for threats threat_analysis = threat_detector.analyze_request(request) # Log high-risk requests if threat_analysis['risk_score'] >= 50: event = SecurityEvent( event_type='suspicious_request', severity='warning' if threat_analysis['risk_score'] < 80 else 'critical', description=f'Suspicious request detected: {threat_analysis["threats_detected"]}', ip_address=threat_detector._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT'), metadata={ 'risk_score': threat_analysis['risk_score'], 'threats': threat_analysis['threats_detected'], 'path': request.path, 'method': request.method, } ) audit_logger.log_security_event(event) # Block high-risk requests if threat_analysis['block_request']: return JsonResponse( { 'error': 'Request blocked due to security concerns', 'threats_detected': threat_analysis['threats_detected'], }, status=status.HTTP_403_FORBIDDEN ) response = self.get_response(request) # Add security headers response['X-Content-Type-Options'] = 'nosniff' response['X-Frame-Options'] = 'DENY' response['X-XSS-Protection'] = '1; mode=block' response['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' response['Content-Security-Policy'] = "default-src 'self'" return response