""" Single Sign-On (SSO) Authentication Backends Supports SAML, OAuth2, OIDC, and LDAP """ import base64 import json import logging from typing import Optional, Dict, Any from urllib.parse import urlencode, parse_qs import requests from django.contrib.auth import get_user_model from django.contrib.auth.backends import BaseBackend from django.conf import settings from django.core.exceptions import ValidationError from ..models import AuditLog, User logger = logging.getLogger(__name__) User = get_user_model() class SSOAuthentication(BaseBackend): """Base SSO authentication backend""" def authenticate(self, request, **kwargs): """Authenticate user via SSO""" if not request: return None # Extract SSO token from request sso_token = self._extract_sso_token(request) if not sso_token: return None # Validate token and get user info user_info = self._validate_sso_token(sso_token) if not user_info: return None # Get or create user user = self._get_or_create_user(user_info) if user: self._log_sso_login(user, request) return user def _extract_sso_token(self, request) -> Optional[str]: """Extract SSO token from request""" # Check Authorization header auth_header = request.META.get('HTTP_AUTHORIZATION', '') if auth_header.startswith('Bearer '): return auth_header[7:] # Check query parameter return request.GET.get('sso_token') def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]: """Validate SSO token and return user info""" raise NotImplementedError("Subclasses must implement _validate_sso_token") def _get_or_create_user(self, user_info: Dict[str, Any]) -> Optional[User]: """Get or create user from SSO user info""" try: # Try to find existing user by SSO identifier user = User.objects.get( sso_provider=user_info.get('provider'), sso_identifier=user_info.get('id') ) # Update user info self._update_user_from_sso(user, user_info) return user except User.DoesNotExist: # Create new user return self._create_user_from_sso(user_info) def _update_user_from_sso(self, user: User, user_info: Dict[str, Any]): """Update existing user with SSO info""" user.email = user_info.get('email', user.email) user.first_name = user_info.get('first_name', user.first_name) user.last_name = user_info.get('last_name', user.last_name) user.last_login_ip = user_info.get('ip_address') user.save(update_fields=['email', 'first_name', 'last_name', 'last_login_ip']) def _create_user_from_sso(self, user_info: Dict[str, Any]) -> Optional[User]: """Create new user from SSO info""" try: username = user_info.get('username') or user_info.get('email') if not username: logger.error("No username or email provided in SSO user info") return None user = User.objects.create_user( username=username, email=user_info.get('email', ''), first_name=user_info.get('first_name', ''), last_name=user_info.get('last_name', ''), sso_provider=user_info.get('provider'), sso_identifier=user_info.get('id'), is_active=True ) logger.info(f"Created new user via SSO: {user.username}") return user except Exception as e: logger.error(f"Failed to create user from SSO: {e}") return None def _log_sso_login(self, user: User, request): """Log SSO login event""" AuditLog.objects.create( user=user, action_type='SSO_LOGIN', ip_address=self._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={ 'provider': user.sso_provider, 'sso_identifier': user.sso_identifier } ) def _get_client_ip(self, request) -> str: """Get client IP address from request""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip class SAMLAuthentication(SSOAuthentication): """SAML SSO authentication""" def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]: """Validate SAML token""" # This would integrate with a SAML library like python3-saml # For now, return mock data try: # Decode and validate SAML assertion # In production, use proper SAML validation decoded_token = base64.b64decode(token).decode('utf-8') # Mock SAML response parsing return { 'id': 'saml_user_123', 'username': 'saml.user', 'email': 'saml.user@company.com', 'first_name': 'SAML', 'last_name': 'User', 'provider': 'saml', 'ip_address': '192.168.1.100' } except Exception as e: logger.error(f"SAML token validation failed: {e}") return None class OAuth2Authentication(SSOAuthentication): """OAuth2 SSO authentication""" def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]: """Validate OAuth2 token""" # Validate token with OAuth2 provider providers = settings.SSO_PROVIDERS.get('oauth2', {}).get('providers', {}) for provider_name, config in providers.items(): if not config.get('client_id'): continue user_info = self._validate_oauth2_token(token, provider_name, config) if user_info: user_info['provider'] = f'oauth2_{provider_name}' return user_info return None def _validate_oauth2_token(self, token: str, provider: str, config: Dict) -> Optional[Dict[str, Any]]: """Validate OAuth2 token with specific provider""" try: if provider == 'google': return self._validate_google_token(token) elif provider == 'microsoft': return self._validate_microsoft_token(token) else: logger.warning(f"Unsupported OAuth2 provider: {provider}") return None except Exception as e: logger.error(f"OAuth2 token validation failed for {provider}: {e}") return None def _validate_google_token(self, token: str) -> Optional[Dict[str, Any]]: """Validate Google OAuth2 token""" try: # Verify token with Google response = requests.get( 'https://www.googleapis.com/oauth2/v2/userinfo', headers={'Authorization': f'Bearer {token}'} ) if response.status_code == 200: data = response.json() return { 'id': data.get('id'), 'username': data.get('email'), 'email': data.get('email'), 'first_name': data.get('given_name', ''), 'last_name': data.get('family_name', ''), } except Exception as e: logger.error(f"Google token validation failed: {e}") return None def _validate_microsoft_token(self, token: str) -> Optional[Dict[str, Any]]: """Validate Microsoft OAuth2 token""" try: # Verify token with Microsoft Graph response = requests.get( 'https://graph.microsoft.com/v1.0/me', headers={'Authorization': f'Bearer {token}'} ) if response.status_code == 200: data = response.json() return { 'id': data.get('id'), 'username': data.get('userPrincipalName'), 'email': data.get('mail') or data.get('userPrincipalName'), 'first_name': data.get('givenName', ''), 'last_name': data.get('surname', ''), } except Exception as e: logger.error(f"Microsoft token validation failed: {e}") return None class LDAPAuthentication(BaseBackend): """LDAP authentication backend""" def authenticate(self, request, username=None, password=None, **kwargs): """Authenticate user via LDAP""" if not username or not password: return None try: # This would integrate with python-ldap # For now, return mock authentication ldap_config = settings.SSO_PROVIDERS.get('ldap', {}) if not ldap_config.get('enabled'): return None # Mock LDAP authentication if self._authenticate_ldap_user(username, password): user = self._get_or_create_ldap_user(username) if user: self._log_ldap_login(user, request) return user except Exception as e: logger.error(f"LDAP authentication failed: {e}") return None def _authenticate_ldap_user(self, username: str, password: str) -> bool: """Authenticate user against LDAP server""" # Mock LDAP authentication # In production, use python-ldap to connect to LDAP server return username == 'ldap.user' and password == 'ldap_password' def _get_or_create_ldap_user(self, username: str) -> Optional[User]: """Get or create user from LDAP""" try: user = User.objects.get( sso_provider='ldap', sso_identifier=username ) return user except User.DoesNotExist: # Create new user from LDAP try: user = User.objects.create_user( username=username, email=f'{username}@company.com', sso_provider='ldap', sso_identifier=username, is_active=True ) logger.info(f"Created new user via LDAP: {user.username}") return user except Exception as e: logger.error(f"Failed to create user from LDAP: {e}") return None def _log_ldap_login(self, user: User, request): """Log LDAP login event""" AuditLog.objects.create( user=user, action_type='SSO_LOGIN', ip_address=self._get_client_ip(request), user_agent=request.META.get('HTTP_USER_AGENT', ''), details={ 'provider': 'ldap', 'sso_identifier': user.sso_identifier } ) def _get_client_ip(self, request) -> str: """Get client IP address from request""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip