Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

325 lines
12 KiB
Python

"""
Single Sign-On (SSO) Authentication Backends
Supports SAML, OAuth2, OIDC, and LDAP
"""
import base64
import json
import logging
from typing import Optional, Dict, Any
from urllib.parse import urlencode, parse_qs
import requests
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import BaseBackend
from django.conf import settings
from django.core.exceptions import ValidationError
from ..models import AuditLog, User
logger = logging.getLogger(__name__)
User = get_user_model()
class SSOAuthentication(BaseBackend):
"""Base SSO authentication backend"""
def authenticate(self, request, **kwargs):
"""Authenticate user via SSO"""
if not request:
return None
# Extract SSO token from request
sso_token = self._extract_sso_token(request)
if not sso_token:
return None
# Validate token and get user info
user_info = self._validate_sso_token(sso_token)
if not user_info:
return None
# Get or create user
user = self._get_or_create_user(user_info)
if user:
self._log_sso_login(user, request)
return user
def _extract_sso_token(self, request) -> Optional[str]:
"""Extract SSO token from request"""
# Check Authorization header
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
return auth_header[7:]
# Check query parameter
return request.GET.get('sso_token')
def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate SSO token and return user info"""
raise NotImplementedError("Subclasses must implement _validate_sso_token")
def _get_or_create_user(self, user_info: Dict[str, Any]) -> Optional[User]:
"""Get or create user from SSO user info"""
try:
# Try to find existing user by SSO identifier
user = User.objects.get(
sso_provider=user_info.get('provider'),
sso_identifier=user_info.get('id')
)
# Update user info
self._update_user_from_sso(user, user_info)
return user
except User.DoesNotExist:
# Create new user
return self._create_user_from_sso(user_info)
def _update_user_from_sso(self, user: User, user_info: Dict[str, Any]):
"""Update existing user with SSO info"""
user.email = user_info.get('email', user.email)
user.first_name = user_info.get('first_name', user.first_name)
user.last_name = user_info.get('last_name', user.last_name)
user.last_login_ip = user_info.get('ip_address')
user.save(update_fields=['email', 'first_name', 'last_name', 'last_login_ip'])
def _create_user_from_sso(self, user_info: Dict[str, Any]) -> Optional[User]:
"""Create new user from SSO info"""
try:
username = user_info.get('username') or user_info.get('email')
if not username:
logger.error("No username or email provided in SSO user info")
return None
user = User.objects.create_user(
username=username,
email=user_info.get('email', ''),
first_name=user_info.get('first_name', ''),
last_name=user_info.get('last_name', ''),
sso_provider=user_info.get('provider'),
sso_identifier=user_info.get('id'),
is_active=True
)
logger.info(f"Created new user via SSO: {user.username}")
return user
except Exception as e:
logger.error(f"Failed to create user from SSO: {e}")
return None
def _log_sso_login(self, user: User, request):
"""Log SSO login event"""
AuditLog.objects.create(
user=user,
action_type='SSO_LOGIN',
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
details={
'provider': user.sso_provider,
'sso_identifier': user.sso_identifier
}
)
def _get_client_ip(self, request) -> str:
"""Get client IP address from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class SAMLAuthentication(SSOAuthentication):
"""SAML SSO authentication"""
def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate SAML token"""
# This would integrate with a SAML library like python3-saml
# For now, return mock data
try:
# Decode and validate SAML assertion
# In production, use proper SAML validation
decoded_token = base64.b64decode(token).decode('utf-8')
# Mock SAML response parsing
return {
'id': 'saml_user_123',
'username': 'saml.user',
'email': 'saml.user@company.com',
'first_name': 'SAML',
'last_name': 'User',
'provider': 'saml',
'ip_address': '192.168.1.100'
}
except Exception as e:
logger.error(f"SAML token validation failed: {e}")
return None
class OAuth2Authentication(SSOAuthentication):
"""OAuth2 SSO authentication"""
def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate OAuth2 token"""
# Validate token with OAuth2 provider
providers = settings.SSO_PROVIDERS.get('oauth2', {}).get('providers', {})
for provider_name, config in providers.items():
if not config.get('client_id'):
continue
user_info = self._validate_oauth2_token(token, provider_name, config)
if user_info:
user_info['provider'] = f'oauth2_{provider_name}'
return user_info
return None
def _validate_oauth2_token(self, token: str, provider: str, config: Dict) -> Optional[Dict[str, Any]]:
"""Validate OAuth2 token with specific provider"""
try:
if provider == 'google':
return self._validate_google_token(token)
elif provider == 'microsoft':
return self._validate_microsoft_token(token)
else:
logger.warning(f"Unsupported OAuth2 provider: {provider}")
return None
except Exception as e:
logger.error(f"OAuth2 token validation failed for {provider}: {e}")
return None
def _validate_google_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate Google OAuth2 token"""
try:
# Verify token with Google
response = requests.get(
'https://www.googleapis.com/oauth2/v2/userinfo',
headers={'Authorization': f'Bearer {token}'}
)
if response.status_code == 200:
data = response.json()
return {
'id': data.get('id'),
'username': data.get('email'),
'email': data.get('email'),
'first_name': data.get('given_name', ''),
'last_name': data.get('family_name', ''),
}
except Exception as e:
logger.error(f"Google token validation failed: {e}")
return None
def _validate_microsoft_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate Microsoft OAuth2 token"""
try:
# Verify token with Microsoft Graph
response = requests.get(
'https://graph.microsoft.com/v1.0/me',
headers={'Authorization': f'Bearer {token}'}
)
if response.status_code == 200:
data = response.json()
return {
'id': data.get('id'),
'username': data.get('userPrincipalName'),
'email': data.get('mail') or data.get('userPrincipalName'),
'first_name': data.get('givenName', ''),
'last_name': data.get('surname', ''),
}
except Exception as e:
logger.error(f"Microsoft token validation failed: {e}")
return None
class LDAPAuthentication(BaseBackend):
"""LDAP authentication backend"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""Authenticate user via LDAP"""
if not username or not password:
return None
try:
# This would integrate with python-ldap
# For now, return mock authentication
ldap_config = settings.SSO_PROVIDERS.get('ldap', {})
if not ldap_config.get('enabled'):
return None
# Mock LDAP authentication
if self._authenticate_ldap_user(username, password):
user = self._get_or_create_ldap_user(username)
if user:
self._log_ldap_login(user, request)
return user
except Exception as e:
logger.error(f"LDAP authentication failed: {e}")
return None
def _authenticate_ldap_user(self, username: str, password: str) -> bool:
"""Authenticate user against LDAP server"""
# Mock LDAP authentication
# In production, use python-ldap to connect to LDAP server
return username == 'ldap.user' and password == 'ldap_password'
def _get_or_create_ldap_user(self, username: str) -> Optional[User]:
"""Get or create user from LDAP"""
try:
user = User.objects.get(
sso_provider='ldap',
sso_identifier=username
)
return user
except User.DoesNotExist:
# Create new user from LDAP
try:
user = User.objects.create_user(
username=username,
email=f'{username}@company.com',
sso_provider='ldap',
sso_identifier=username,
is_active=True
)
logger.info(f"Created new user via LDAP: {user.username}")
return user
except Exception as e:
logger.error(f"Failed to create user from LDAP: {e}")
return None
def _log_ldap_login(self, user: User, request):
"""Log LDAP login event"""
AuditLog.objects.create(
user=user,
action_type='SSO_LOGIN',
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
details={
'provider': 'ldap',
'sso_identifier': user.sso_identifier
}
)
def _get_client_ip(self, request) -> str:
"""Get client IP address from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip