Files
OSINT/accounts/views.py
Iliyan Angelov ed94dd22dd update
2025-11-26 22:32:20 +02:00

434 lines
15 KiB
Python

"""
Views for accounts app.
"""
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth import login, logout
from django.contrib.auth.views import LoginView, PasswordChangeView, PasswordResetView, PasswordResetConfirmView, PasswordResetDoneView
from django.views.generic import CreateView, UpdateView, DetailView, TemplateView, FormView, View
from django.urls import reverse_lazy
from django.contrib.messages.views import SuccessMessageMixin
from django.contrib.messages import success, error
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django_otp import devices_for_user
from django_otp.plugins.otp_totp.models import TOTPDevice
from django_otp.decorators import otp_required
from django.views.decorators.csrf import csrf_exempt
from django.http import JsonResponse
import qrcode
import qrcode.image.svg
from io import BytesIO
import base64
from .models import User, UserProfile, ActivityLog, FailedLoginAttempt
from .forms import UserRegistrationForm, UserProfileForm, MFAVerifyForm, MFASetupForm
from .security import InputSanitizer, PasswordSecurity
from django.core.cache import cache
from django.utils import timezone
from datetime import timedelta
class RegisterView(SuccessMessageMixin, CreateView):
"""User registration view with security checks."""
model = User
form_class = UserRegistrationForm
template_name = 'accounts/register.html'
success_url = reverse_lazy('accounts:profile')
success_message = "Registration successful! Please verify your email."
def get_form_kwargs(self):
"""Pass request to form for rate limiting."""
kwargs = super().get_form_kwargs()
kwargs['request'] = self.request
return kwargs
def form_valid(self, form):
# Sanitize input
if form.cleaned_data.get('email'):
form.cleaned_data['email'] = InputSanitizer.sanitize_html(form.cleaned_data['email'])
# Check password strength
password = form.cleaned_data.get('password1')
is_strong, message = PasswordSecurity.check_password_strength(password)
if not is_strong:
form.add_error('password1', message)
return self.form_invalid(form)
response = super().form_valid(form)
login(self.request, self.object)
# Log activity
ActivityLog.objects.create(
user=self.object,
action='register',
ip_address=self.get_client_ip(),
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
)
return response
def get_client_ip(self):
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = self.request.META.get('REMOTE_ADDR')
return ip
def csrf_failure(request, reason=""):
"""Custom CSRF failure view."""
from django.contrib import messages
messages.error(request, 'Security error: Invalid request. Please try again.')
return redirect('accounts:login')
class LoginView(SuccessMessageMixin, LoginView):
"""Custom login view with MFA support and security checks."""
template_name = 'accounts/login.html'
redirect_authenticated_user = True
success_message = "Welcome back!"
def form_valid(self, form):
# Check for account lockout
username = form.cleaned_data.get('username')
ip = self.get_client_ip()
# Check if IP is blocked
if self.is_ip_blocked(ip):
error(self.request, 'Too many failed login attempts. Please try again later.')
return redirect('accounts:login')
# Check if user account is locked
try:
user = User.objects.get(username=username)
if self.is_account_locked(user):
error(self.request, 'Account temporarily locked due to too many failed attempts.')
return redirect('accounts:login')
except User.DoesNotExist:
pass # Don't reveal if user exists
# Authenticate user first
response = super().form_valid(form)
user = self.request.user
# Clear failed login attempts on successful login
FailedLoginAttempt.objects.filter(
email_or_username=username,
ip_address=ip
).delete()
cache.delete(f'login_attempts_{ip}')
# Check if MFA is enabled
if user.mfa_enabled:
# Store user ID in session for MFA verification
self.request.session['mfa_user_id'] = user.id
# Don't log them in yet - redirect to MFA verification
logout(self.request)
return redirect('accounts:mfa_verify')
# Log activity for non-MFA users
ActivityLog.objects.create(
user=user,
action='login',
ip_address=ip,
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
)
return response
def form_invalid(self, form):
# Log failed login attempt
username = form.data.get('username', '')
ip = self.get_client_ip()
FailedLoginAttempt.objects.create(
email_or_username=username,
ip_address=ip,
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
)
# Check if should block IP
failed_attempts = FailedLoginAttempt.objects.filter(
ip_address=ip,
timestamp__gte=timezone.now() - timedelta(minutes=15)
).count()
if failed_attempts >= 10:
cache.set(f'blocked_ip_{ip}', True, 3600) # Block for 1 hour
return super().form_invalid(form)
def is_ip_blocked(self, ip):
"""Check if IP is blocked."""
return cache.get(f'blocked_ip_{ip}', False)
def is_account_locked(self, user):
"""Check if user account is locked."""
failed_attempts = FailedLoginAttempt.objects.filter(
email_or_username=user.username,
timestamp__gte=timezone.now() - timedelta(minutes=30)
).count()
return failed_attempts >= 5
def get_client_ip(self):
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = self.request.META.get('REMOTE_ADDR')
return ip
class LogoutView(View):
"""Custom logout view that accepts GET requests."""
def get(self, request, *args, **kwargs):
if request.user.is_authenticated:
# Log activity
ActivityLog.objects.create(
user=request.user,
action='logout',
ip_address=self.get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', '')
)
logout(request)
success(request, "You have been logged out successfully.")
return redirect('reports:home')
def post(self, request, *args, **kwargs):
return self.get(request, *args, **kwargs)
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class ProfileView(DetailView):
"""User profile view."""
model = User
template_name = 'accounts/profile.html'
context_object_name = 'user_obj'
def get_object(self):
return self.request.user
class ProfileEditView(SuccessMessageMixin, UpdateView):
"""Edit user profile."""
model = UserProfile
form_class = UserProfileForm
template_name = 'accounts/profile_edit.html'
success_url = reverse_lazy('accounts:profile')
success_message = "Profile updated successfully!"
def get_object(self):
profile, created = UserProfile.objects.get_or_create(user=self.request.user)
return profile
def form_valid(self, form):
response = super().form_valid(form)
# Log activity
ActivityLog.objects.create(
user=self.request.user,
action='profile_update',
ip_address=self.get_client_ip(),
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
)
return response
def get_client_ip(self):
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = self.request.META.get('REMOTE_ADDR')
return ip
class PasswordChangeView(SuccessMessageMixin, PasswordChangeView):
"""Change password view."""
template_name = 'accounts/password_change.html'
success_url = reverse_lazy('accounts:profile')
success_message = "Password changed successfully!"
class PasswordResetView(SuccessMessageMixin, PasswordResetView):
"""Password reset view."""
template_name = 'accounts/password_reset.html'
email_template_name = 'accounts/password_reset_email.html'
subject_template_name = 'accounts/password_reset_email_subject.txt'
success_url = reverse_lazy('accounts:password_reset_done')
success_message = "Password reset email sent!"
def form_valid(self, form):
"""Override to use SiteSettings for from_email."""
try:
from reports.models import SiteSettings
site_settings = SiteSettings.get_settings()
self.from_email = site_settings.default_from_email
except:
pass # Fallback to default
return super().form_valid(form)
def get_email_context_data(self, **kwargs):
"""Override to ensure site_name is set for email template."""
context = super().get_email_context_data(**kwargs)
# Ensure site_name is set (Django uses sites framework, but provide fallback)
if 'site_name' not in context or not context['site_name']:
context['site_name'] = 'Портал за Докладване на Измами'
return context
class PasswordResetDoneView(TemplateView):
"""Password reset done view - shows confirmation that email was sent."""
template_name = 'accounts/password_reset_done.html'
class PasswordResetConfirmView(SuccessMessageMixin, PasswordResetConfirmView):
"""Password reset confirmation view."""
template_name = 'accounts/password_reset_confirm.html'
success_url = reverse_lazy('accounts:password_reset_complete')
success_message = "Password reset successful!"
class MFAVerifyView(FormView):
"""MFA verification view after login."""
template_name = 'accounts/mfa_verify.html'
form_class = MFAVerifyForm
def dispatch(self, request, *args, **kwargs):
# Check if user ID is in session
if 'mfa_user_id' not in request.session:
error(request, 'Please log in first.')
return redirect('accounts:login')
return super().dispatch(request, *args, **kwargs)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
user_id = self.request.session.get('mfa_user_id')
if user_id:
kwargs['user'] = get_object_or_404(User, id=user_id)
return kwargs
def form_valid(self, form):
user_id = self.request.session.get('mfa_user_id')
user = get_object_or_404(User, id=user_id)
# Verify the token
if form.verify_token():
# Login the user
login(self.request, user)
# Clear the session
del self.request.session['mfa_user_id']
# Log activity
ActivityLog.objects.create(
user=user,
action='login',
ip_address=self.get_client_ip(),
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
)
success(self.request, 'Login successful!')
return redirect('accounts:profile')
else:
error(self.request, 'Invalid verification code. Please try again.')
return self.form_invalid(form)
def get_client_ip(self):
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = self.request.META.get('REMOTE_ADDR')
return ip
@method_decorator(login_required, name='dispatch')
class MFASetupView(TemplateView):
"""MFA setup view."""
template_name = 'accounts/mfa_setup.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
user = self.request.user
# Delete any existing unconfirmed devices
TOTPDevice.objects.filter(user=user, confirmed=False).delete()
# Create new TOTP device
device = TOTPDevice.objects.create(
user=user,
name='default',
confirmed=False
)
# Generate QR code
config_url = device.config_url
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(config_url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
context['qr_code'] = f'data:image/png;base64,{img_str}'
context['secret_key'] = device.key
context['device'] = device
context['mfa_enabled'] = user.mfa_enabled
return context
@method_decorator(login_required, name='dispatch')
class MFAEnableView(FormView):
"""Enable MFA after verification."""
template_name = 'accounts/mfa_enable.html'
form_class = MFAVerifyForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def form_valid(self, form):
user = self.request.user
# Verify the token
if form.verify_token():
# Enable MFA
user.mfa_enabled = True
user.save()
# Confirm the device
device = TOTPDevice.objects.get(user=user, name='default')
device.confirmed = True
device.save()
success(self.request, 'MFA has been enabled successfully!')
return redirect('accounts:profile')
else:
error(self.request, 'Invalid verification code. Please try again.')
return self.form_invalid(form)
@method_decorator(login_required, name='dispatch')
class MFADisableView(TemplateView):
"""Disable MFA."""
template_name = 'accounts/mfa_disable.html'
def post(self, request, *args, **kwargs):
user = request.user
user.mfa_enabled = False
user.mfa_secret = ''
user.save()
# Delete TOTP devices
TOTPDevice.objects.filter(user=user).delete()
success(request, 'MFA has been disabled.')
return redirect('accounts:profile')
def get(self, request, *args, **kwargs):
return super().get(request, *args, **kwargs)