434 lines
15 KiB
Python
434 lines
15 KiB
Python
"""
|
|
Views for accounts app.
|
|
"""
|
|
from django.shortcuts import render, redirect, get_object_or_404
|
|
from django.contrib.auth import login, logout
|
|
from django.contrib.auth.views import LoginView, PasswordChangeView, PasswordResetView, PasswordResetConfirmView, PasswordResetDoneView
|
|
from django.views.generic import CreateView, UpdateView, DetailView, TemplateView, FormView, View
|
|
from django.urls import reverse_lazy
|
|
from django.contrib.messages.views import SuccessMessageMixin
|
|
from django.contrib.messages import success, error
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.utils.decorators import method_decorator
|
|
from django_otp import devices_for_user
|
|
from django_otp.plugins.otp_totp.models import TOTPDevice
|
|
from django_otp.decorators import otp_required
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.http import JsonResponse
|
|
import qrcode
|
|
import qrcode.image.svg
|
|
from io import BytesIO
|
|
import base64
|
|
from .models import User, UserProfile, ActivityLog, FailedLoginAttempt
|
|
from .forms import UserRegistrationForm, UserProfileForm, MFAVerifyForm, MFASetupForm
|
|
from .security import InputSanitizer, PasswordSecurity
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
|
|
|
|
class RegisterView(SuccessMessageMixin, CreateView):
|
|
"""User registration view with security checks."""
|
|
model = User
|
|
form_class = UserRegistrationForm
|
|
template_name = 'accounts/register.html'
|
|
success_url = reverse_lazy('accounts:profile')
|
|
success_message = "Registration successful! Please verify your email."
|
|
|
|
def get_form_kwargs(self):
|
|
"""Pass request to form for rate limiting."""
|
|
kwargs = super().get_form_kwargs()
|
|
kwargs['request'] = self.request
|
|
return kwargs
|
|
|
|
def form_valid(self, form):
|
|
# Sanitize input
|
|
if form.cleaned_data.get('email'):
|
|
form.cleaned_data['email'] = InputSanitizer.sanitize_html(form.cleaned_data['email'])
|
|
|
|
# Check password strength
|
|
password = form.cleaned_data.get('password1')
|
|
is_strong, message = PasswordSecurity.check_password_strength(password)
|
|
if not is_strong:
|
|
form.add_error('password1', message)
|
|
return self.form_invalid(form)
|
|
|
|
response = super().form_valid(form)
|
|
login(self.request, self.object)
|
|
|
|
# Log activity
|
|
ActivityLog.objects.create(
|
|
user=self.object,
|
|
action='register',
|
|
ip_address=self.get_client_ip(),
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
return response
|
|
|
|
def get_client_ip(self):
|
|
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = self.request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
def csrf_failure(request, reason=""):
|
|
"""Custom CSRF failure view."""
|
|
from django.contrib import messages
|
|
messages.error(request, 'Security error: Invalid request. Please try again.')
|
|
return redirect('accounts:login')
|
|
|
|
|
|
class LoginView(SuccessMessageMixin, LoginView):
|
|
"""Custom login view with MFA support and security checks."""
|
|
template_name = 'accounts/login.html'
|
|
redirect_authenticated_user = True
|
|
success_message = "Welcome back!"
|
|
|
|
def form_valid(self, form):
|
|
# Check for account lockout
|
|
username = form.cleaned_data.get('username')
|
|
ip = self.get_client_ip()
|
|
|
|
# Check if IP is blocked
|
|
if self.is_ip_blocked(ip):
|
|
error(self.request, 'Too many failed login attempts. Please try again later.')
|
|
return redirect('accounts:login')
|
|
|
|
# Check if user account is locked
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
if self.is_account_locked(user):
|
|
error(self.request, 'Account temporarily locked due to too many failed attempts.')
|
|
return redirect('accounts:login')
|
|
except User.DoesNotExist:
|
|
pass # Don't reveal if user exists
|
|
|
|
# Authenticate user first
|
|
response = super().form_valid(form)
|
|
user = self.request.user
|
|
|
|
# Clear failed login attempts on successful login
|
|
FailedLoginAttempt.objects.filter(
|
|
email_or_username=username,
|
|
ip_address=ip
|
|
).delete()
|
|
cache.delete(f'login_attempts_{ip}')
|
|
|
|
# Check if MFA is enabled
|
|
if user.mfa_enabled:
|
|
# Store user ID in session for MFA verification
|
|
self.request.session['mfa_user_id'] = user.id
|
|
# Don't log them in yet - redirect to MFA verification
|
|
logout(self.request)
|
|
return redirect('accounts:mfa_verify')
|
|
|
|
# Log activity for non-MFA users
|
|
ActivityLog.objects.create(
|
|
user=user,
|
|
action='login',
|
|
ip_address=ip,
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
return response
|
|
|
|
def form_invalid(self, form):
|
|
# Log failed login attempt
|
|
username = form.data.get('username', '')
|
|
ip = self.get_client_ip()
|
|
|
|
FailedLoginAttempt.objects.create(
|
|
email_or_username=username,
|
|
ip_address=ip,
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
|
|
# Check if should block IP
|
|
failed_attempts = FailedLoginAttempt.objects.filter(
|
|
ip_address=ip,
|
|
timestamp__gte=timezone.now() - timedelta(minutes=15)
|
|
).count()
|
|
|
|
if failed_attempts >= 10:
|
|
cache.set(f'blocked_ip_{ip}', True, 3600) # Block for 1 hour
|
|
|
|
return super().form_invalid(form)
|
|
|
|
def is_ip_blocked(self, ip):
|
|
"""Check if IP is blocked."""
|
|
return cache.get(f'blocked_ip_{ip}', False)
|
|
|
|
def is_account_locked(self, user):
|
|
"""Check if user account is locked."""
|
|
failed_attempts = FailedLoginAttempt.objects.filter(
|
|
email_or_username=user.username,
|
|
timestamp__gte=timezone.now() - timedelta(minutes=30)
|
|
).count()
|
|
return failed_attempts >= 5
|
|
|
|
def get_client_ip(self):
|
|
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = self.request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class LogoutView(View):
|
|
"""Custom logout view that accepts GET requests."""
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
if request.user.is_authenticated:
|
|
# Log activity
|
|
ActivityLog.objects.create(
|
|
user=request.user,
|
|
action='logout',
|
|
ip_address=self.get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
logout(request)
|
|
success(request, "You have been logged out successfully.")
|
|
return redirect('reports:home')
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
return self.get(request, *args, **kwargs)
|
|
|
|
def get_client_ip(self, request):
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class ProfileView(DetailView):
|
|
"""User profile view."""
|
|
model = User
|
|
template_name = 'accounts/profile.html'
|
|
context_object_name = 'user_obj'
|
|
|
|
def get_object(self):
|
|
return self.request.user
|
|
|
|
|
|
class ProfileEditView(SuccessMessageMixin, UpdateView):
|
|
"""Edit user profile."""
|
|
model = UserProfile
|
|
form_class = UserProfileForm
|
|
template_name = 'accounts/profile_edit.html'
|
|
success_url = reverse_lazy('accounts:profile')
|
|
success_message = "Profile updated successfully!"
|
|
|
|
def get_object(self):
|
|
profile, created = UserProfile.objects.get_or_create(user=self.request.user)
|
|
return profile
|
|
|
|
def form_valid(self, form):
|
|
response = super().form_valid(form)
|
|
# Log activity
|
|
ActivityLog.objects.create(
|
|
user=self.request.user,
|
|
action='profile_update',
|
|
ip_address=self.get_client_ip(),
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
return response
|
|
|
|
def get_client_ip(self):
|
|
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = self.request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class PasswordChangeView(SuccessMessageMixin, PasswordChangeView):
|
|
"""Change password view."""
|
|
template_name = 'accounts/password_change.html'
|
|
success_url = reverse_lazy('accounts:profile')
|
|
success_message = "Password changed successfully!"
|
|
|
|
|
|
class PasswordResetView(SuccessMessageMixin, PasswordResetView):
|
|
"""Password reset view."""
|
|
template_name = 'accounts/password_reset.html'
|
|
email_template_name = 'accounts/password_reset_email.html'
|
|
subject_template_name = 'accounts/password_reset_email_subject.txt'
|
|
success_url = reverse_lazy('accounts:password_reset_done')
|
|
success_message = "Password reset email sent!"
|
|
|
|
def form_valid(self, form):
|
|
"""Override to use SiteSettings for from_email."""
|
|
try:
|
|
from reports.models import SiteSettings
|
|
site_settings = SiteSettings.get_settings()
|
|
self.from_email = site_settings.default_from_email
|
|
except:
|
|
pass # Fallback to default
|
|
return super().form_valid(form)
|
|
|
|
def get_email_context_data(self, **kwargs):
|
|
"""Override to ensure site_name is set for email template."""
|
|
context = super().get_email_context_data(**kwargs)
|
|
# Ensure site_name is set (Django uses sites framework, but provide fallback)
|
|
if 'site_name' not in context or not context['site_name']:
|
|
context['site_name'] = 'Портал за Докладване на Измами'
|
|
return context
|
|
|
|
|
|
class PasswordResetDoneView(TemplateView):
|
|
"""Password reset done view - shows confirmation that email was sent."""
|
|
template_name = 'accounts/password_reset_done.html'
|
|
|
|
|
|
class PasswordResetConfirmView(SuccessMessageMixin, PasswordResetConfirmView):
|
|
"""Password reset confirmation view."""
|
|
template_name = 'accounts/password_reset_confirm.html'
|
|
success_url = reverse_lazy('accounts:password_reset_complete')
|
|
success_message = "Password reset successful!"
|
|
|
|
|
|
class MFAVerifyView(FormView):
|
|
"""MFA verification view after login."""
|
|
template_name = 'accounts/mfa_verify.html'
|
|
form_class = MFAVerifyForm
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
# Check if user ID is in session
|
|
if 'mfa_user_id' not in request.session:
|
|
error(request, 'Please log in first.')
|
|
return redirect('accounts:login')
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super().get_form_kwargs()
|
|
user_id = self.request.session.get('mfa_user_id')
|
|
if user_id:
|
|
kwargs['user'] = get_object_or_404(User, id=user_id)
|
|
return kwargs
|
|
|
|
def form_valid(self, form):
|
|
user_id = self.request.session.get('mfa_user_id')
|
|
user = get_object_or_404(User, id=user_id)
|
|
|
|
# Verify the token
|
|
if form.verify_token():
|
|
# Login the user
|
|
login(self.request, user)
|
|
# Clear the session
|
|
del self.request.session['mfa_user_id']
|
|
# Log activity
|
|
ActivityLog.objects.create(
|
|
user=user,
|
|
action='login',
|
|
ip_address=self.get_client_ip(),
|
|
user_agent=self.request.META.get('HTTP_USER_AGENT', '')
|
|
)
|
|
success(self.request, 'Login successful!')
|
|
return redirect('accounts:profile')
|
|
else:
|
|
error(self.request, 'Invalid verification code. Please try again.')
|
|
return self.form_invalid(form)
|
|
|
|
def get_client_ip(self):
|
|
x_forwarded_for = self.request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = self.request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
@method_decorator(login_required, name='dispatch')
|
|
class MFASetupView(TemplateView):
|
|
"""MFA setup view."""
|
|
template_name = 'accounts/mfa_setup.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
user = self.request.user
|
|
|
|
# Delete any existing unconfirmed devices
|
|
TOTPDevice.objects.filter(user=user, confirmed=False).delete()
|
|
|
|
# Create new TOTP device
|
|
device = TOTPDevice.objects.create(
|
|
user=user,
|
|
name='default',
|
|
confirmed=False
|
|
)
|
|
|
|
# Generate QR code
|
|
config_url = device.config_url
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
|
qr.add_data(config_url)
|
|
qr.make(fit=True)
|
|
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
buffer = BytesIO()
|
|
img.save(buffer, format='PNG')
|
|
img_str = base64.b64encode(buffer.getvalue()).decode()
|
|
context['qr_code'] = f'data:image/png;base64,{img_str}'
|
|
context['secret_key'] = device.key
|
|
context['device'] = device
|
|
context['mfa_enabled'] = user.mfa_enabled
|
|
return context
|
|
|
|
|
|
@method_decorator(login_required, name='dispatch')
|
|
class MFAEnableView(FormView):
|
|
"""Enable MFA after verification."""
|
|
template_name = 'accounts/mfa_enable.html'
|
|
form_class = MFAVerifyForm
|
|
|
|
def get_form_kwargs(self):
|
|
kwargs = super().get_form_kwargs()
|
|
kwargs['user'] = self.request.user
|
|
return kwargs
|
|
|
|
def form_valid(self, form):
|
|
user = self.request.user
|
|
|
|
# Verify the token
|
|
if form.verify_token():
|
|
# Enable MFA
|
|
user.mfa_enabled = True
|
|
user.save()
|
|
|
|
# Confirm the device
|
|
device = TOTPDevice.objects.get(user=user, name='default')
|
|
device.confirmed = True
|
|
device.save()
|
|
|
|
success(self.request, 'MFA has been enabled successfully!')
|
|
return redirect('accounts:profile')
|
|
else:
|
|
error(self.request, 'Invalid verification code. Please try again.')
|
|
return self.form_invalid(form)
|
|
|
|
|
|
@method_decorator(login_required, name='dispatch')
|
|
class MFADisableView(TemplateView):
|
|
"""Disable MFA."""
|
|
template_name = 'accounts/mfa_disable.html'
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
user = request.user
|
|
user.mfa_enabled = False
|
|
user.mfa_secret = ''
|
|
user.save()
|
|
|
|
# Delete TOTP devices
|
|
TOTPDevice.objects.filter(user=user).delete()
|
|
|
|
success(request, 'MFA has been disabled.')
|
|
return redirect('accounts:profile')
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
return super().get(request, *args, **kwargs)
|