import time import unicodedata from collections import OrderedDict from typing import Optional from django.conf import settings from django.contrib import messages from django.contrib.auth import update_session_auth_hash from django.core.exceptions import FieldDoesNotExist from django.db import models from django.db.models import Q from django.utils.encoding import force_str from django.utils.http import base36_to_int, int_to_base36, urlencode from allauth.account import app_settings, signals from allauth.account.adapter import get_adapter from allauth.account.models import Login from allauth.core.exceptions import ImmediateHttpResponse from allauth.utils import ( get_request_param, get_user_model, import_callable, valid_email_or_none, ) def _unicode_ci_compare(s1, s2): """ Perform case-insensitive comparison of two identifiers, using the recommended algorithm from Unicode Technical Report 36, section 2.11.2(B)(2). """ norm_s1 = unicodedata.normalize("NFKC", s1).casefold() norm_s2 = unicodedata.normalize("NFKC", s2).casefold() return norm_s1 == norm_s2 def get_next_redirect_url(request, redirect_field_name="next"): """ Returns the next URL to redirect to, if it was explicitly passed via the request. """ redirect_to = get_request_param(request, redirect_field_name) if not get_adapter().is_safe_url(redirect_to): redirect_to = None return redirect_to def get_login_redirect_url(request, url=None, redirect_field_name="next", signup=False): ret = url if url and callable(url): # In order to be able to pass url getters around that depend # on e.g. the authenticated state. ret = url() if not ret: ret = get_next_redirect_url(request, redirect_field_name=redirect_field_name) if not ret: if signup: ret = get_adapter().get_signup_redirect_url(request) else: ret = get_adapter().get_login_redirect_url(request) return ret _user_display_callable = None def logout_on_password_change(request, user): # Since it is the default behavior of Django to invalidate all sessions on # password change, this function actually has to preserve the session when # logout isn't desired. if not app_settings.LOGOUT_ON_PASSWORD_CHANGE: update_session_auth_hash(request, user) def default_user_display(user): if app_settings.USER_MODEL_USERNAME_FIELD: return getattr(user, app_settings.USER_MODEL_USERNAME_FIELD) else: return force_str(user) def user_display(user): global _user_display_callable if not _user_display_callable: f = getattr(settings, "ACCOUNT_USER_DISPLAY", default_user_display) _user_display_callable = import_callable(f) return _user_display_callable(user) def user_field(user, field, *args, commit=False): """ Gets or sets (optional) user model fields. No-op if fields do not exist. """ if not field: return User = get_user_model() try: field_meta = User._meta.get_field(field) max_length = field_meta.max_length except FieldDoesNotExist: if not hasattr(user, field): return max_length = None if args: # Setter v = args[0] if v: v = v[0:max_length] setattr(user, field, v) if commit: user.save(update_fields=[field]) else: # Getter return getattr(user, field) def user_username(user, *args, commit=False): if args and not app_settings.PRESERVE_USERNAME_CASING and args[0]: args = [args[0].lower()] return user_field(user, app_settings.USER_MODEL_USERNAME_FIELD, *args) def user_email(user, *args, commit=False): return user_field(user, app_settings.USER_MODEL_EMAIL_FIELD, *args, commit=commit) def has_verified_email(user, email=None): from .models import EmailAddress emailaddress = None if email: ret = False try: emailaddress = EmailAddress.objects.get_for_user(user, email) ret = emailaddress.verified except EmailAddress.DoesNotExist: pass else: ret = EmailAddress.objects.filter(user=user, verified=True).exists() return ret def perform_login( request, user, email_verification, redirect_url=None, signal_kwargs=None, signup=False, email=None, ): """ Keyword arguments: signup -- Indicates whether or not sending the email is essential (during signup), or if it can be skipped (e.g. in case email verification is optional and we are only logging in). """ login = Login( user=user, email_verification=email_verification, redirect_url=redirect_url, signal_kwargs=signal_kwargs, signup=signup, email=email, ) return _perform_login(request, login) def _perform_login(request, login): # Local users are stopped due to form validation checking # is_active, yet, adapter methods could toy with is_active in a # `user_signed_up` signal. Furthermore, social users should be # stopped anyway. adapter = get_adapter() hook_kwargs = _get_login_hook_kwargs(login) response = adapter.pre_login(request, login.user, **hook_kwargs) if response: return response return resume_login(request, login) def _get_login_hook_kwargs(login): """ TODO: Just break backwards compatibility and pass only `login` to `pre/post_login()`. """ return dict( email_verification=login.email_verification, redirect_url=login.redirect_url, signal_kwargs=login.signal_kwargs, signup=login.signup, email=login.email, ) def resume_login(request, login): from allauth.account.stages import LoginStageController adapter = get_adapter() ctrl = LoginStageController(request, login) try: response = ctrl.handle() if response: return response adapter.login(request, login.user) hook_kwargs = _get_login_hook_kwargs(login) response = adapter.post_login(request, login.user, **hook_kwargs) if response: return response except ImmediateHttpResponse as e: response = e.response return response def unstash_login(request, peek=False): login = None if peek: data = request.session.get("account_login") else: data = request.session.pop("account_login", None) if data is not None: try: login = Login.deserialize(data) request._account_login_accessed = True except ValueError: pass return login def stash_login(request, login): request.session["account_login"] = login.serialize() request._account_login_accessed = True def complete_signup(request, user, email_verification, success_url, signal_kwargs=None): if signal_kwargs is None: signal_kwargs = {} signals.user_signed_up.send( sender=user.__class__, request=request, user=user, **signal_kwargs ) return perform_login( request, user, email_verification=email_verification, signup=True, redirect_url=success_url, signal_kwargs=signal_kwargs, ) def cleanup_email_addresses(request, addresses): """ Takes a list of EmailAddress instances and cleans it up, making sure only valid ones remain, without multiple primaries etc. Order is important: e.g. if multiple primary email addresses exist, the first one encountered will be kept as primary. """ from .models import EmailAddress adapter = get_adapter() # Let's group by `email` e2a = OrderedDict() # maps email to EmailAddress primary_addresses = [] verified_addresses = [] primary_verified_addresses = [] for address in addresses: # Pick up only valid ones... email = valid_email_or_none(address.email) if not email: continue # ... and non-conflicting ones... if ( app_settings.UNIQUE_EMAIL and app_settings.PREVENT_ENUMERATION != "strict" and EmailAddress.objects.lookup([email]) ): # Email address already exists. continue if ( app_settings.UNIQUE_EMAIL and app_settings.PREVENT_ENUMERATION == "strict" and address.verified and EmailAddress.objects.is_verified(email) ): # Email address already exists, and is verified as well. continue a = e2a.get(email.lower()) if a: a.primary = a.primary or address.primary a.verified = a.verified or address.verified else: a = address a.verified = a.verified or adapter.is_email_verified(request, a.email) e2a[email.lower()] = a if a.primary: primary_addresses.append(a) if a.verified: primary_verified_addresses.append(a) if a.verified: verified_addresses.append(a) # Now that we got things sorted out, let's assign a primary if primary_verified_addresses: primary_address = primary_verified_addresses[0] elif verified_addresses: # Pick any verified as primary primary_address = verified_addresses[0] elif primary_addresses: # Okay, let's pick primary then, even if unverified primary_address = primary_addresses[0] elif e2a: # Pick the first primary_address = e2a.keys()[0] else: # Empty primary_address = None # There can only be one primary for a in e2a.values(): a.primary = primary_address.email.lower() == a.email.lower() return list(e2a.values()), primary_address def setup_user_email(request, user, addresses): """ Creates proper EmailAddress for the user that was just signed up. Only sets up, doesn't do any other handling such as sending out email confirmation mails etc. """ from .models import EmailAddress assert not EmailAddress.objects.filter(user=user).exists() priority_addresses = [] # Is there a stashed email? adapter = get_adapter() stashed_email = adapter.unstash_verified_email(request) if stashed_email: priority_addresses.append( EmailAddress(user=user, email=stashed_email, primary=True, verified=True) ) email = user_email(user) if email: priority_addresses.append( EmailAddress(user=user, email=email, primary=True, verified=False) ) addresses, primary = cleanup_email_addresses( request, priority_addresses + addresses ) for a in addresses: a.user = user a.save() EmailAddress.objects.fill_cache_for_user(user, addresses) if primary and email and email.lower() != primary.email.lower(): user_email(user, primary.email) user.save() return primary def send_email_confirmation(request, user, signup=False, email=None): """ Email verification mails are sent: a) Explicitly: when a user signs up b) Implicitly: when a user attempts to log in using an unverified email while EMAIL_VERIFICATION is mandatory. Especially in case of b), we want to limit the number of mails sent (consider a user retrying a few times), which is why there is a cooldown period before sending a new mail. This cooldown period can be configured in ACCOUNT_EMAIL_CONFIRMATION_COOLDOWN setting. """ from .models import EmailAddress adapter = get_adapter() if not email: email = user_email(user) if email: try: email_address = EmailAddress.objects.get_for_user(user, email) if not email_address.verified: send_email = adapter.should_send_confirmation_mail( request, email_address ) if send_email: email_address.send_confirmation(request, signup=signup) else: send_email = False except EmailAddress.DoesNotExist: send_email = True email_address = EmailAddress.objects.add_email( request, user, email, signup=signup, confirm=True ) assert email_address # At this point, if we were supposed to send an email we have sent it. if send_email: adapter.add_message( request, messages.INFO, "account/messages/email_confirmation_sent.txt", {"email": email, "login": not signup, "signup": signup}, ) if signup: adapter.stash_user(request, user_pk_to_url_str(user)) def sync_user_email_addresses(user): """ Keep user.email in sync with user.emailaddress_set. Under some circumstances the user.email may not have ended up as an EmailAddress record, e.g. in the case of manually created admin users. """ from .models import EmailAddress email = user_email(user) if ( email and not EmailAddress.objects.filter(user=user, email__iexact=email).exists() ): # get_or_create() to gracefully handle races EmailAddress.objects.get_or_create( user=user, email=email, defaults={"primary": False, "verified": False} ) def filter_users_by_username(*username): if app_settings.PRESERVE_USERNAME_CASING: qlist = [ Q(**{app_settings.USER_MODEL_USERNAME_FIELD + "__iexact": u}) for u in username ] q = qlist[0] for q2 in qlist[1:]: q = q | q2 ret = get_user_model()._default_manager.filter(q) else: ret = get_user_model()._default_manager.filter( **{ app_settings.USER_MODEL_USERNAME_FIELD + "__in": [u.lower() for u in username] } ) return ret def filter_users_by_email(email, is_active=None, prefer_verified=False): """Return list of users by email address Typically one, at most just a few in length. First we look through EmailAddress table, than customisable User model table. Add results together avoiding SQL joins and deduplicate. `prefer_verified`: When looking up users by email, there can be cases where users with verified email addresses are preferable above users who did not verify their email address. The password reset is such a use case -- if there is a user with a verified email than that user should be returned, not one of the other users. """ from .models import EmailAddress User = get_user_model() mails = EmailAddress.objects.filter(email__iexact=email).prefetch_related("user") if is_active is not None: mails = mails.filter(user__is_active=is_active) mails = list(mails) is_verified = False if prefer_verified: verified_mails = list(filter(lambda e: e.verified, mails)) if verified_mails: mails = verified_mails is_verified = True users = [] for e in mails: if _unicode_ci_compare(e.email, email): users.append(e.user) if app_settings.USER_MODEL_EMAIL_FIELD and not is_verified: q_dict = {app_settings.USER_MODEL_EMAIL_FIELD + "__iexact": email} user_qs = User.objects.filter(**q_dict) if is_active is not None: user_qs = user_qs.filter(is_active=is_active) for user in user_qs.iterator(): user_email = getattr(user, app_settings.USER_MODEL_EMAIL_FIELD) if _unicode_ci_compare(user_email, email): users.append(user) return list(set(users)) def passthrough_next_redirect_url(request, url, redirect_field_name): assert url.find("?") < 0 # TODO: Handle this case properly next_url = get_next_redirect_url(request, redirect_field_name) if next_url: url = url + "?" + urlencode({redirect_field_name: next_url}) return url def user_pk_to_url_str(user): """ This should return a string. """ User = get_user_model() pk_field_class = type(User._meta.pk) if issubclass(pk_field_class, models.UUIDField): if isinstance(user.pk, str): return user.pk return user.pk.hex elif issubclass(pk_field_class, models.IntegerField): return int_to_base36(int(user.pk)) return str(user.pk) def url_str_to_user_pk(pk_str): User = get_user_model() remote_field = getattr(User._meta.pk, "remote_field", None) if remote_field and getattr(remote_field, "to", None): pk_field = User._meta.pk.remote_field.to._meta.pk else: pk_field = User._meta.pk pk_field_class = type(pk_field) if issubclass(pk_field_class, models.IntegerField): pk = base36_to_int(pk_str) # always call to_python() -- because there are fields like HashidField # that derive from IntegerField. pk = pk_field.to_python(pk) else: pk = pk_field.to_python(pk_str) return pk def record_authentication(request, user): request.session["account_authenticated_at"] = time.time() def did_recently_authenticate(request): if request.user.is_anonymous: return False if not request.user.has_usable_password(): # TODO: This user only has social accounts attached. Now, ideally, you # would want to reauthenticate over at the social account provider. For # now, this is not implemented. Although definitely suboptimal, this # method is currently used for reauthentication checks over at MFA, and, # users that delegate the security of their account to an external # provider like Google typically use MFA over there anyway. return True authenticated_at = request.session.get("account_authenticated_at") if not authenticated_at: return False return time.time() - authenticated_at < app_settings.REAUTHENTICATION_TIMEOUT def assess_unique_email(email) -> Optional[bool]: """ True -- email is unique False -- email is already in use None -- email is in use, but we should hide that using email verification. """ from .models import EmailAddress if not EmailAddress.objects.lookup([email]).exists(): # All good. return True elif not app_settings.PREVENT_ENUMERATION: # Fail right away. return False elif ( app_settings.EMAIL_VERIFICATION == app_settings.EmailVerificationMethod.MANDATORY ): # In case of mandatory verification and enumeration prevention, # we can avoid creating a new account with the same (unverified) # email address, because we are going to send an email anyway. assert app_settings.PREVENT_ENUMERATION return None elif app_settings.PREVENT_ENUMERATION == "strict": # We're going to be strict on enumeration prevention, and allow for # this email address to pass even though it already exists. In this # scenario, you can signup multiple times using the same email # address resulting in multiple accounts with an unverified email. return True else: assert app_settings.PREVENT_ENUMERATION is True # Conflict. We're supposed to prevent enumeration, but we can't # because that means letting the user in, while emails are required # to be unique. In this case, uniqueness takes precedence over # enumeration prevention. return False