Files
OSINT/accounts/middleware.py
Iliyan Angelov ed94dd22dd update
2025-11-26 22:32:20 +02:00

280 lines
10 KiB
Python

"""
Security middleware for enhanced protection.
"""
import time
from django.core.cache import cache
from django.http import HttpResponseForbidden, JsonResponse
from django.utils.deprecation import MiddlewareMixin
from django.contrib.auth import logout
from django.contrib import messages
from accounts.models import FailedLoginAttempt, ActivityLog
from django.utils import timezone
from datetime import timedelta, datetime
class RateLimitMiddleware(MiddlewareMixin):
"""
Rate limiting middleware to prevent brute force attacks.
"""
def process_request(self, request):
# Skip rate limiting for static files and admin
if request.path.startswith('/static/') or request.path.startswith('/media/'):
return None
# Get client IP
ip = self.get_client_ip(request)
# Rate limit login attempts
if request.path == '/accounts/login/' and request.method == 'POST':
cache_key = f'login_attempts_{ip}'
attempts = cache.get(cache_key, 0)
if attempts >= 5: # Max 5 attempts per 15 minutes
# Log failed attempt
FailedLoginAttempt.objects.create(
email_or_username=request.POST.get('username', ''),
ip_address=ip,
user_agent=request.META.get('HTTP_USER_AGENT', ''),
is_blocked=True
)
return JsonResponse({
'error': 'Too many login attempts. Please try again in 15 minutes.'
}, status=429)
# Increment attempts
cache.set(cache_key, attempts + 1, 900) # 15 minutes
# Rate limit registration
if request.path == '/accounts/register/' and request.method == 'POST':
cache_key = f'register_attempts_{ip}'
attempts = cache.get(cache_key, 0)
if attempts >= 3: # Max 3 registrations per hour
return JsonResponse({
'error': 'Too many registration attempts. Please try again later.'
}, status=429)
cache.set(cache_key, attempts + 1, 3600) # 1 hour
# Rate limit report creation
if request.path.startswith('/reports/create/') and request.method == 'POST':
if request.user.is_authenticated:
cache_key = f'report_creation_{request.user.id}'
attempts = cache.get(cache_key, 0)
if attempts >= 10: # Max 10 reports per hour
return JsonResponse({
'error': 'Too many reports created. Please try again later.'
}, status=429)
cache.set(cache_key, attempts + 1, 3600) # 1 hour
return None
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class SecurityHeadersMiddleware(MiddlewareMixin):
"""
Add security headers to all responses.
"""
def process_response(self, request, response):
# Content Security Policy
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https:; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self';"
)
response['Content-Security-Policy'] = csp
# X-Content-Type-Options
response['X-Content-Type-Options'] = 'nosniff'
# X-Frame-Options
response['X-Frame-Options'] = 'DENY'
# Referrer Policy
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Permissions Policy
response['Permissions-Policy'] = (
'geolocation=(), microphone=(), camera=(), '
'payment=(), usb=(), magnetometer=(), gyroscope=()'
)
# X-XSS-Protection (legacy but still useful)
response['X-XSS-Protection'] = '1; mode=block'
# Remove server header
if 'Server' in response:
del response['Server']
return response
class IPWhitelistMiddleware(MiddlewareMixin):
"""
IP whitelist/blacklist middleware (optional, for admin access).
"""
def process_request(self, request):
# Only apply to admin area
if not request.path.startswith('/admin/'):
return None
ip = self.get_client_ip(request)
# Get blacklisted IPs from cache or database
blacklisted = cache.get(f'blacklisted_ip_{ip}', False)
if blacklisted:
return HttpResponseForbidden('Access denied.')
return None
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class SecurityLoggingMiddleware(MiddlewareMixin):
"""
Log security-related events.
"""
def process_response(self, request, response):
# Log suspicious activities
if response.status_code == 403:
self.log_security_event(request, 'FORBIDDEN_ACCESS', {
'path': request.path,
'method': request.method,
'status': 403
})
if response.status_code == 429:
self.log_security_event(request, 'RATE_LIMIT_EXCEEDED', {
'path': request.path,
'method': request.method,
})
# Log failed login attempts
if request.path == '/accounts/login/' and request.method == 'POST':
if response.status_code != 200 or (hasattr(response, 'content') and b'error' in response.content):
self.log_security_event(request, 'FAILED_LOGIN', {
'username': request.POST.get('username', ''),
})
return response
def log_security_event(self, request, event_type, details):
"""Log security event to ActivityLog."""
try:
ip = self.get_client_ip(request)
# Convert non-JSON-serializable objects to strings
serializable_details = self.make_json_serializable({
'event_type': event_type,
**details
})
ActivityLog.objects.create(
user=request.user if hasattr(request, 'user') and request.user.is_authenticated else None,
action='security_event',
ip_address=ip,
user_agent=request.META.get('HTTP_USER_AGENT', ''),
details=serializable_details
)
except Exception:
pass # Don't break the request if logging fails
def make_json_serializable(self, obj):
"""Convert non-JSON-serializable objects to strings."""
import json
from datetime import datetime, date
from decimal import Decimal
if isinstance(obj, dict):
return {k: self.make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [self.make_json_serializable(item) for item in obj]
elif isinstance(obj, (datetime, date)):
return obj.isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif hasattr(obj, '__dict__'):
return str(obj)
else:
try:
json.dumps(obj) # Test if it's JSON serializable
return obj
except (TypeError, ValueError):
return str(obj)
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class SessionSecurityMiddleware(MiddlewareMixin):
"""
Enhanced session security.
"""
def process_request(self, request):
# Check if user attribute exists (AuthenticationMiddleware must run first)
if hasattr(request, 'user') and request.user.is_authenticated:
# Check for session hijacking
current_ip = self.get_client_ip(request)
session_ip = request.session.get('ip_address')
if session_ip and session_ip != current_ip:
# IP changed - potential session hijacking
logout(request)
messages.error(request, 'Security alert: Session terminated due to IP change.')
return None
# Store IP in session
request.session['ip_address'] = current_ip
# Check session age
session_age = request.session.get('created_at')
if session_age:
# Convert string back to datetime if needed
if isinstance(session_age, str):
from django.utils.dateparse import parse_datetime
session_age = parse_datetime(session_age)
if isinstance(session_age, datetime):
age = timezone.now() - session_age
if age > timedelta(hours=24): # Max 24 hours
logout(request)
messages.info(request, 'Your session has expired. Please log in again.')
return None
else:
request.session['created_at'] = timezone.now().isoformat()
return None
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip