"""
Enterprise Security System for ETB-API
Comprehensive security features including threat detection, audit logging, and compliance
"""
import logging
import hashlib
import hmac
import time
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from django.http import HttpRequest, JsonResponse
from django.conf import settings
from django.utils import timezone
from django.core.cache import cache
from django.contrib.auth.models import User
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.dispatch import receiver
from django.db.models import Q
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.throttling import UserRateThrottle
import requests
import ipaddress
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
logger = logging.getLogger(__name__)
class SecurityEvent:
"""Security event tracking"""
def __init__(self, event_type: str, severity: str, description: str,
user: Optional[User] = None, ip_address: str = None,
user_agent: str = None, metadata: Dict[str, Any] = None):
self.event_type = event_type
self.severity = severity
self.description = description
self.user = user
self.ip_address = ip_address
self.user_agent = user_agent
self.metadata = metadata or {}
self.timestamp = timezone.now()
self.id = self._generate_event_id()
def _generate_event_id(self) -> str:
"""Generate unique event ID"""
data = f"{self.event_type}{self.timestamp.isoformat()}{self.ip_address or ''}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage"""
return {
'id': self.id,
'event_type': self.event_type,
'severity': self.severity,
'description': self.description,
'user_id': self.user.id if self.user else None,
'username': self.user.username if self.user else None,
'ip_address': self.ip_address,
'user_agent': self.user_agent,
'metadata': self.metadata,
'timestamp': self.timestamp.isoformat(),
}
class ThreatDetectionService:
"""Enterprise threat detection and analysis"""
def __init__(self):
self.suspicious_patterns = [
'sql_injection',
'xss_attempt',
'path_traversal',
'command_injection',
'brute_force',
'credential_stuffing',
'anomalous_behavior',
'privilege_escalation',
]
self.risk_factors = {
'high_risk_ips': self._load_high_risk_ips(),
'suspicious_user_agents': self._load_suspicious_user_agents(),
'known_attack_patterns': self._load_attack_patterns(),
}
def _load_high_risk_ips(self) -> List[str]:
"""Load list of high-risk IP addresses"""
# In production, this would load from a threat intelligence feed
return [
'192.168.1.100', # Example suspicious IP
'10.0.0.50', # Example suspicious IP
]
def _load_suspicious_user_agents(self) -> List[str]:
"""Load list of suspicious user agents"""
return [
'sqlmap',
'nikto',
'nmap',
'masscan',
'zap',
'burp',
]
def _load_attack_patterns(self) -> List[str]:
"""Load known attack patterns"""
return [
r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b)',
r'',
r'javascript:',
r'\.\./',
r'\.\.\\',
r'