""" Views for accounts app. """ from django.shortcuts import render, redirect, get_object_or_404 from django.contrib.auth import login, logout from django.contrib.auth.views import LoginView, PasswordChangeView, PasswordResetView, PasswordResetConfirmView, PasswordResetDoneView from django.views.generic import CreateView, UpdateView, DetailView, TemplateView, FormView, View from django.urls import reverse_lazy from django.contrib.messages.views import SuccessMessageMixin from django.contrib.messages import success, error from django.contrib.auth.decorators import login_required from django.utils.decorators import method_decorator from django_otp import devices_for_user from django_otp.plugins.otp_totp.models import TOTPDevice from django_otp.decorators import otp_required from django.views.decorators.csrf import csrf_exempt from django.http import JsonResponse import qrcode import qrcode.image.svg from io import BytesIO import base64 from .models import User, UserProfile, ActivityLog, FailedLoginAttempt from .forms import UserRegistrationForm, UserProfileForm, MFAVerifyForm, MFASetupForm from .security import InputSanitizer, PasswordSecurity from django.core.cache import cache from django.utils import timezone from datetime import timedelta class RegisterView(SuccessMessageMixin, CreateView): """User registration view with security checks.""" model = User form_class = UserRegistrationForm template_name = 'accounts/register.html' success_url = reverse_lazy('accounts:profile') success_message = "Registration successful! Please verify your email." def get_form_kwargs(self): """Pass request to form for rate limiting.""" kwargs = super().get_form_kwargs() kwargs['request'] = self.request return kwargs def form_valid(self, form): # Sanitize input if form.cleaned_data.get('email'): form.cleaned_data['email'] = InputSanitizer.sanitize_html(form.cleaned_data['email']) # Check password strength password = form.cleaned_data.get('password1') is_strong, message = PasswordSecurity.check_password_strength(password) if not is_strong: form.add_error('password1', message) return self.form_invalid(form) response = super().form_valid(form) login(self.request, self.object) # Log activity ActivityLog.objects.create( user=self.object, action='register', ip_address=self.get_client_ip(), user_agent=self.request.META.get('HTTP_USER_AGENT', '') ) return response def get_client_ip(self): x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = self.request.META.get('REMOTE_ADDR') return ip def csrf_failure(request, reason=""): """Custom CSRF failure view.""" from django.contrib import messages messages.error(request, 'Security error: Invalid request. Please try again.') return redirect('accounts:login') class LoginView(SuccessMessageMixin, LoginView): """Custom login view with MFA support and security checks.""" template_name = 'accounts/login.html' redirect_authenticated_user = True success_message = "Welcome back!" def form_valid(self, form): # Check for account lockout username = form.cleaned_data.get('username') ip = self.get_client_ip() # Check if IP is blocked if self.is_ip_blocked(ip): error(self.request, 'Too many failed login attempts. Please try again later.') return redirect('accounts:login') # Check if user account is locked try: user = User.objects.get(username=username) if self.is_account_locked(user): error(self.request, 'Account temporarily locked due to too many failed attempts.') return redirect('accounts:login') except User.DoesNotExist: pass # Don't reveal if user exists # Authenticate user first response = super().form_valid(form) user = self.request.user # Clear failed login attempts on successful login FailedLoginAttempt.objects.filter( email_or_username=username, ip_address=ip ).delete() cache.delete(f'login_attempts_{ip}') # Check if MFA is enabled if user.mfa_enabled: # Store user ID in session for MFA verification self.request.session['mfa_user_id'] = user.id # Don't log them in yet - redirect to MFA verification logout(self.request) return redirect('accounts:mfa_verify') # Log activity for non-MFA users ActivityLog.objects.create( user=user, action='login', ip_address=ip, user_agent=self.request.META.get('HTTP_USER_AGENT', '') ) return response def form_invalid(self, form): # Log failed login attempt username = form.data.get('username', '') ip = self.get_client_ip() FailedLoginAttempt.objects.create( email_or_username=username, ip_address=ip, user_agent=self.request.META.get('HTTP_USER_AGENT', '') ) # Check if should block IP failed_attempts = FailedLoginAttempt.objects.filter( ip_address=ip, timestamp__gte=timezone.now() - timedelta(minutes=15) ).count() if failed_attempts >= 10: cache.set(f'blocked_ip_{ip}', True, 3600) # Block for 1 hour return super().form_invalid(form) def is_ip_blocked(self, ip): """Check if IP is blocked.""" return cache.get(f'blocked_ip_{ip}', False) def is_account_locked(self, user): """Check if user account is locked.""" failed_attempts = FailedLoginAttempt.objects.filter( email_or_username=user.username, timestamp__gte=timezone.now() - timedelta(minutes=30) ).count() return failed_attempts >= 5 def get_client_ip(self): x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = self.request.META.get('REMOTE_ADDR') return ip class LogoutView(View): """Custom logout view that accepts GET requests.""" def get(self, request, *args, **kwargs): if request.user.is_authenticated: # Log activity ActivityLog.objects.create( user=request.user, action='logout', ip_address=self.get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', '') ) logout(request) success(request, "You have been logged out successfully.") return redirect('reports:home') def post(self, request, *args, **kwargs): return self.get(request, *args, **kwargs) def get_client_ip(self, request): x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class ProfileView(DetailView): """User profile view.""" model = User template_name = 'accounts/profile.html' context_object_name = 'user_obj' def get_object(self): return self.request.user class ProfileEditView(SuccessMessageMixin, UpdateView): """Edit user profile.""" model = UserProfile form_class = UserProfileForm template_name = 'accounts/profile_edit.html' success_url = reverse_lazy('accounts:profile') success_message = "Profile updated successfully!" def get_object(self): profile, created = UserProfile.objects.get_or_create(user=self.request.user) return profile def form_valid(self, form): response = super().form_valid(form) # Log activity ActivityLog.objects.create( user=self.request.user, action='profile_update', ip_address=self.get_client_ip(), user_agent=self.request.META.get('HTTP_USER_AGENT', '') ) return response def get_client_ip(self): x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = self.request.META.get('REMOTE_ADDR') return ip class PasswordChangeView(SuccessMessageMixin, PasswordChangeView): """Change password view.""" template_name = 'accounts/password_change.html' success_url = reverse_lazy('accounts:profile') success_message = "Password changed successfully!" class PasswordResetView(SuccessMessageMixin, PasswordResetView): """Password reset view.""" template_name = 'accounts/password_reset.html' email_template_name = 'accounts/password_reset_email.html' subject_template_name = 'accounts/password_reset_email_subject.txt' success_url = reverse_lazy('accounts:password_reset_done') success_message = "Password reset email sent!" def form_valid(self, form): """Override to use SiteSettings for from_email.""" try: from reports.models import SiteSettings site_settings = SiteSettings.get_settings() self.from_email = site_settings.default_from_email except: pass # Fallback to default return super().form_valid(form) def get_email_context_data(self, **kwargs): """Override to ensure site_name is set for email template.""" context = super().get_email_context_data(**kwargs) # Ensure site_name is set (Django uses sites framework, but provide fallback) if 'site_name' not in context or not context['site_name']: context['site_name'] = 'Портал за Докладване на Измами' return context class PasswordResetDoneView(TemplateView): """Password reset done view - shows confirmation that email was sent.""" template_name = 'accounts/password_reset_done.html' class PasswordResetConfirmView(SuccessMessageMixin, PasswordResetConfirmView): """Password reset confirmation view.""" template_name = 'accounts/password_reset_confirm.html' success_url = reverse_lazy('accounts:password_reset_complete') success_message = "Password reset successful!" class MFAVerifyView(FormView): """MFA verification view after login.""" template_name = 'accounts/mfa_verify.html' form_class = MFAVerifyForm def dispatch(self, request, *args, **kwargs): # Check if user ID is in session if 'mfa_user_id' not in request.session: error(request, 'Please log in first.') return redirect('accounts:login') return super().dispatch(request, *args, **kwargs) def get_form_kwargs(self): kwargs = super().get_form_kwargs() user_id = self.request.session.get('mfa_user_id') if user_id: kwargs['user'] = get_object_or_404(User, id=user_id) return kwargs def form_valid(self, form): user_id = self.request.session.get('mfa_user_id') user = get_object_or_404(User, id=user_id) # Verify the token if form.verify_token(): # Login the user login(self.request, user) # Clear the session del self.request.session['mfa_user_id'] # Log activity ActivityLog.objects.create( user=user, action='login', ip_address=self.get_client_ip(), user_agent=self.request.META.get('HTTP_USER_AGENT', '') ) success(self.request, 'Login successful!') return redirect('accounts:profile') else: error(self.request, 'Invalid verification code. Please try again.') return self.form_invalid(form) def get_client_ip(self): x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = self.request.META.get('REMOTE_ADDR') return ip @method_decorator(login_required, name='dispatch') class MFASetupView(TemplateView): """MFA setup view.""" template_name = 'accounts/mfa_setup.html' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) user = self.request.user # Delete any existing unconfirmed devices TOTPDevice.objects.filter(user=user, confirmed=False).delete() # Create new TOTP device device = TOTPDevice.objects.create( user=user, name='default', confirmed=False ) # Generate QR code config_url = device.config_url qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(config_url) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = BytesIO() img.save(buffer, format='PNG') img_str = base64.b64encode(buffer.getvalue()).decode() context['qr_code'] = f'data:image/png;base64,{img_str}' context['secret_key'] = device.key context['device'] = device context['mfa_enabled'] = user.mfa_enabled return context @method_decorator(login_required, name='dispatch') class MFAEnableView(FormView): """Enable MFA after verification.""" template_name = 'accounts/mfa_enable.html' form_class = MFAVerifyForm def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs['user'] = self.request.user return kwargs def form_valid(self, form): user = self.request.user # Verify the token if form.verify_token(): # Enable MFA user.mfa_enabled = True user.save() # Confirm the device device = TOTPDevice.objects.get(user=user, name='default') device.confirmed = True device.save() success(self.request, 'MFA has been enabled successfully!') return redirect('accounts:profile') else: error(self.request, 'Invalid verification code. Please try again.') return self.form_invalid(form) @method_decorator(login_required, name='dispatch') class MFADisableView(TemplateView): """Disable MFA.""" template_name = 'accounts/mfa_disable.html' def post(self, request, *args, **kwargs): user = request.user user.mfa_enabled = False user.mfa_secret = '' user.save() # Delete TOTP devices TOTPDevice.objects.filter(user=user).delete() success(request, 'MFA has been disabled.') return redirect('accounts:profile') def get(self, request, *args, **kwargs): return super().get(request, *args, **kwargs)