""" Security middleware for enhanced protection. """ import time from django.core.cache import cache from django.http import HttpResponseForbidden, JsonResponse from django.utils.deprecation import MiddlewareMixin from django.contrib.auth import logout from django.contrib import messages from accounts.models import FailedLoginAttempt, ActivityLog from django.utils import timezone from datetime import timedelta, datetime class RateLimitMiddleware(MiddlewareMixin): """ Rate limiting middleware to prevent brute force attacks. """ def process_request(self, request): # Skip rate limiting for static files and admin if request.path.startswith('/static/') or request.path.startswith('/media/'): return None # Get client IP ip = self.get_client_ip(request) # Rate limit login attempts if request.path == '/accounts/login/' and request.method == 'POST': cache_key = f'login_attempts_{ip}' attempts = cache.get(cache_key, 0) if attempts >= 5: # Max 5 attempts per 15 minutes # Log failed attempt FailedLoginAttempt.objects.create( email_or_username=request.POST.get('username', ''), ip_address=ip, user_agent=request.META.get('HTTP_USER_AGENT', ''), is_blocked=True ) return JsonResponse({ 'error': 'Too many login attempts. Please try again in 15 minutes.' }, status=429) # Increment attempts cache.set(cache_key, attempts + 1, 900) # 15 minutes # Rate limit registration if request.path == '/accounts/register/' and request.method == 'POST': cache_key = f'register_attempts_{ip}' attempts = cache.get(cache_key, 0) if attempts >= 3: # Max 3 registrations per hour return JsonResponse({ 'error': 'Too many registration attempts. Please try again later.' }, status=429) cache.set(cache_key, attempts + 1, 3600) # 1 hour # Rate limit report creation if request.path.startswith('/reports/create/') and request.method == 'POST': if request.user.is_authenticated: cache_key = f'report_creation_{request.user.id}' attempts = cache.get(cache_key, 0) if attempts >= 10: # Max 10 reports per hour return JsonResponse({ 'error': 'Too many reports created. Please try again later.' }, status=429) cache.set(cache_key, attempts + 1, 3600) # 1 hour return None def get_client_ip(self, request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class SecurityHeadersMiddleware(MiddlewareMixin): """ Add security headers to all responses. """ def process_response(self, request, response): # Content Security Policy csp = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' https://fonts.googleapis.com; " "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; " "font-src 'self' https://fonts.gstatic.com; " "img-src 'self' data: https:; " "connect-src 'self'; " "frame-ancestors 'none'; " "base-uri 'self'; " "form-action 'self';" ) response['Content-Security-Policy'] = csp # X-Content-Type-Options response['X-Content-Type-Options'] = 'nosniff' # X-Frame-Options response['X-Frame-Options'] = 'DENY' # Referrer Policy response['Referrer-Policy'] = 'strict-origin-when-cross-origin' # Permissions Policy response['Permissions-Policy'] = ( 'geolocation=(), microphone=(), camera=(), ' 'payment=(), usb=(), magnetometer=(), gyroscope=()' ) # X-XSS-Protection (legacy but still useful) response['X-XSS-Protection'] = '1; mode=block' # Remove server header if 'Server' in response: del response['Server'] return response class IPWhitelistMiddleware(MiddlewareMixin): """ IP whitelist/blacklist middleware (optional, for admin access). """ def process_request(self, request): # Only apply to admin area if not request.path.startswith('/admin/'): return None ip = self.get_client_ip(request) # Get blacklisted IPs from cache or database blacklisted = cache.get(f'blacklisted_ip_{ip}', False) if blacklisted: return HttpResponseForbidden('Access denied.') return None def get_client_ip(self, request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class SecurityLoggingMiddleware(MiddlewareMixin): """ Log security-related events. """ def process_response(self, request, response): # Log suspicious activities if response.status_code == 403: self.log_security_event(request, 'FORBIDDEN_ACCESS', { 'path': request.path, 'method': request.method, 'status': 403 }) if response.status_code == 429: self.log_security_event(request, 'RATE_LIMIT_EXCEEDED', { 'path': request.path, 'method': request.method, }) # Log failed login attempts if request.path == '/accounts/login/' and request.method == 'POST': if response.status_code != 200 or (hasattr(response, 'content') and b'error' in response.content): self.log_security_event(request, 'FAILED_LOGIN', { 'username': request.POST.get('username', ''), }) return response def log_security_event(self, request, event_type, details): """Log security event to ActivityLog.""" try: ip = self.get_client_ip(request) # Convert non-JSON-serializable objects to strings serializable_details = self.make_json_serializable({ 'event_type': event_type, **details }) ActivityLog.objects.create( user=request.user if hasattr(request, 'user') and request.user.is_authenticated else None, action='security_event', ip_address=ip, user_agent=request.META.get('HTTP_USER_AGENT', ''), details=serializable_details ) except Exception: pass # Don't break the request if logging fails def make_json_serializable(self, obj): """Convert non-JSON-serializable objects to strings.""" import json from datetime import datetime, date from decimal import Decimal if isinstance(obj, dict): return {k: self.make_json_serializable(v) for k, v in obj.items()} elif isinstance(obj, (list, tuple)): return [self.make_json_serializable(item) for item in obj] elif isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, Decimal): return float(obj) elif hasattr(obj, '__dict__'): return str(obj) else: try: json.dumps(obj) # Test if it's JSON serializable return obj except (TypeError, ValueError): return str(obj) def get_client_ip(self, request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class SessionSecurityMiddleware(MiddlewareMixin): """ Enhanced session security. """ def process_request(self, request): # Check if user attribute exists (AuthenticationMiddleware must run first) if hasattr(request, 'user') and request.user.is_authenticated: # Check for session hijacking current_ip = self.get_client_ip(request) session_ip = request.session.get('ip_address') if session_ip and session_ip != current_ip: # IP changed - potential session hijacking logout(request) messages.error(request, 'Security alert: Session terminated due to IP change.') return None # Store IP in session request.session['ip_address'] = current_ip # Check session age session_age = request.session.get('created_at') if session_age: # Convert string back to datetime if needed if isinstance(session_age, str): from django.utils.dateparse import parse_datetime session_age = parse_datetime(session_age) if isinstance(session_age, datetime): age = timezone.now() - session_age if age > timedelta(hours=24): # Max 24 hours logout(request) messages.info(request, 'Your session has expired. Please log in again.') return None else: request.session['created_at'] = timezone.now().isoformat() return None def get_client_ip(self, request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip