325 lines
12 KiB
Python
325 lines
12 KiB
Python
"""
|
|
Single Sign-On (SSO) Authentication Backends
|
|
Supports SAML, OAuth2, OIDC, and LDAP
|
|
"""
|
|
import base64
|
|
import json
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
from urllib.parse import urlencode, parse_qs
|
|
|
|
import requests
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.backends import BaseBackend
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from ..models import AuditLog, User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
|
|
|
|
class SSOAuthentication(BaseBackend):
|
|
"""Base SSO authentication backend"""
|
|
|
|
def authenticate(self, request, **kwargs):
|
|
"""Authenticate user via SSO"""
|
|
if not request:
|
|
return None
|
|
|
|
# Extract SSO token from request
|
|
sso_token = self._extract_sso_token(request)
|
|
if not sso_token:
|
|
return None
|
|
|
|
# Validate token and get user info
|
|
user_info = self._validate_sso_token(sso_token)
|
|
if not user_info:
|
|
return None
|
|
|
|
# Get or create user
|
|
user = self._get_or_create_user(user_info)
|
|
if user:
|
|
self._log_sso_login(user, request)
|
|
|
|
return user
|
|
|
|
def _extract_sso_token(self, request) -> Optional[str]:
|
|
"""Extract SSO token from request"""
|
|
# Check Authorization header
|
|
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
|
|
if auth_header.startswith('Bearer '):
|
|
return auth_header[7:]
|
|
|
|
# Check query parameter
|
|
return request.GET.get('sso_token')
|
|
|
|
def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]:
|
|
"""Validate SSO token and return user info"""
|
|
raise NotImplementedError("Subclasses must implement _validate_sso_token")
|
|
|
|
def _get_or_create_user(self, user_info: Dict[str, Any]) -> Optional[User]:
|
|
"""Get or create user from SSO user info"""
|
|
try:
|
|
# Try to find existing user by SSO identifier
|
|
user = User.objects.get(
|
|
sso_provider=user_info.get('provider'),
|
|
sso_identifier=user_info.get('id')
|
|
)
|
|
|
|
# Update user info
|
|
self._update_user_from_sso(user, user_info)
|
|
return user
|
|
|
|
except User.DoesNotExist:
|
|
# Create new user
|
|
return self._create_user_from_sso(user_info)
|
|
|
|
def _update_user_from_sso(self, user: User, user_info: Dict[str, Any]):
|
|
"""Update existing user with SSO info"""
|
|
user.email = user_info.get('email', user.email)
|
|
user.first_name = user_info.get('first_name', user.first_name)
|
|
user.last_name = user_info.get('last_name', user.last_name)
|
|
user.last_login_ip = user_info.get('ip_address')
|
|
user.save(update_fields=['email', 'first_name', 'last_name', 'last_login_ip'])
|
|
|
|
def _create_user_from_sso(self, user_info: Dict[str, Any]) -> Optional[User]:
|
|
"""Create new user from SSO info"""
|
|
try:
|
|
username = user_info.get('username') or user_info.get('email')
|
|
if not username:
|
|
logger.error("No username or email provided in SSO user info")
|
|
return None
|
|
|
|
user = User.objects.create_user(
|
|
username=username,
|
|
email=user_info.get('email', ''),
|
|
first_name=user_info.get('first_name', ''),
|
|
last_name=user_info.get('last_name', ''),
|
|
sso_provider=user_info.get('provider'),
|
|
sso_identifier=user_info.get('id'),
|
|
is_active=True
|
|
)
|
|
|
|
logger.info(f"Created new user via SSO: {user.username}")
|
|
return user
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create user from SSO: {e}")
|
|
return None
|
|
|
|
def _log_sso_login(self, user: User, request):
|
|
"""Log SSO login event"""
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action_type='SSO_LOGIN',
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={
|
|
'provider': user.sso_provider,
|
|
'sso_identifier': user.sso_identifier
|
|
}
|
|
)
|
|
|
|
def _get_client_ip(self, request) -> str:
|
|
"""Get client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|
|
|
|
|
|
class SAMLAuthentication(SSOAuthentication):
|
|
"""SAML SSO authentication"""
|
|
|
|
def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]:
|
|
"""Validate SAML token"""
|
|
# This would integrate with a SAML library like python3-saml
|
|
# For now, return mock data
|
|
try:
|
|
# Decode and validate SAML assertion
|
|
# In production, use proper SAML validation
|
|
decoded_token = base64.b64decode(token).decode('utf-8')
|
|
|
|
# Mock SAML response parsing
|
|
return {
|
|
'id': 'saml_user_123',
|
|
'username': 'saml.user',
|
|
'email': 'saml.user@company.com',
|
|
'first_name': 'SAML',
|
|
'last_name': 'User',
|
|
'provider': 'saml',
|
|
'ip_address': '192.168.1.100'
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"SAML token validation failed: {e}")
|
|
return None
|
|
|
|
|
|
class OAuth2Authentication(SSOAuthentication):
|
|
"""OAuth2 SSO authentication"""
|
|
|
|
def _validate_sso_token(self, token: str) -> Optional[Dict[str, Any]]:
|
|
"""Validate OAuth2 token"""
|
|
# Validate token with OAuth2 provider
|
|
providers = settings.SSO_PROVIDERS.get('oauth2', {}).get('providers', {})
|
|
|
|
for provider_name, config in providers.items():
|
|
if not config.get('client_id'):
|
|
continue
|
|
|
|
user_info = self._validate_oauth2_token(token, provider_name, config)
|
|
if user_info:
|
|
user_info['provider'] = f'oauth2_{provider_name}'
|
|
return user_info
|
|
|
|
return None
|
|
|
|
def _validate_oauth2_token(self, token: str, provider: str, config: Dict) -> Optional[Dict[str, Any]]:
|
|
"""Validate OAuth2 token with specific provider"""
|
|
try:
|
|
if provider == 'google':
|
|
return self._validate_google_token(token)
|
|
elif provider == 'microsoft':
|
|
return self._validate_microsoft_token(token)
|
|
else:
|
|
logger.warning(f"Unsupported OAuth2 provider: {provider}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"OAuth2 token validation failed for {provider}: {e}")
|
|
return None
|
|
|
|
def _validate_google_token(self, token: str) -> Optional[Dict[str, Any]]:
|
|
"""Validate Google OAuth2 token"""
|
|
try:
|
|
# Verify token with Google
|
|
response = requests.get(
|
|
'https://www.googleapis.com/oauth2/v2/userinfo',
|
|
headers={'Authorization': f'Bearer {token}'}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return {
|
|
'id': data.get('id'),
|
|
'username': data.get('email'),
|
|
'email': data.get('email'),
|
|
'first_name': data.get('given_name', ''),
|
|
'last_name': data.get('family_name', ''),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Google token validation failed: {e}")
|
|
|
|
return None
|
|
|
|
def _validate_microsoft_token(self, token: str) -> Optional[Dict[str, Any]]:
|
|
"""Validate Microsoft OAuth2 token"""
|
|
try:
|
|
# Verify token with Microsoft Graph
|
|
response = requests.get(
|
|
'https://graph.microsoft.com/v1.0/me',
|
|
headers={'Authorization': f'Bearer {token}'}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return {
|
|
'id': data.get('id'),
|
|
'username': data.get('userPrincipalName'),
|
|
'email': data.get('mail') or data.get('userPrincipalName'),
|
|
'first_name': data.get('givenName', ''),
|
|
'last_name': data.get('surname', ''),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Microsoft token validation failed: {e}")
|
|
|
|
return None
|
|
|
|
|
|
class LDAPAuthentication(BaseBackend):
|
|
"""LDAP authentication backend"""
|
|
|
|
def authenticate(self, request, username=None, password=None, **kwargs):
|
|
"""Authenticate user via LDAP"""
|
|
if not username or not password:
|
|
return None
|
|
|
|
try:
|
|
# This would integrate with python-ldap
|
|
# For now, return mock authentication
|
|
ldap_config = settings.SSO_PROVIDERS.get('ldap', {})
|
|
if not ldap_config.get('enabled'):
|
|
return None
|
|
|
|
# Mock LDAP authentication
|
|
if self._authenticate_ldap_user(username, password):
|
|
user = self._get_or_create_ldap_user(username)
|
|
if user:
|
|
self._log_ldap_login(user, request)
|
|
return user
|
|
|
|
except Exception as e:
|
|
logger.error(f"LDAP authentication failed: {e}")
|
|
|
|
return None
|
|
|
|
def _authenticate_ldap_user(self, username: str, password: str) -> bool:
|
|
"""Authenticate user against LDAP server"""
|
|
# Mock LDAP authentication
|
|
# In production, use python-ldap to connect to LDAP server
|
|
return username == 'ldap.user' and password == 'ldap_password'
|
|
|
|
def _get_or_create_ldap_user(self, username: str) -> Optional[User]:
|
|
"""Get or create user from LDAP"""
|
|
try:
|
|
user = User.objects.get(
|
|
sso_provider='ldap',
|
|
sso_identifier=username
|
|
)
|
|
return user
|
|
|
|
except User.DoesNotExist:
|
|
# Create new user from LDAP
|
|
try:
|
|
user = User.objects.create_user(
|
|
username=username,
|
|
email=f'{username}@company.com',
|
|
sso_provider='ldap',
|
|
sso_identifier=username,
|
|
is_active=True
|
|
)
|
|
logger.info(f"Created new user via LDAP: {user.username}")
|
|
return user
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create user from LDAP: {e}")
|
|
return None
|
|
|
|
def _log_ldap_login(self, user: User, request):
|
|
"""Log LDAP login event"""
|
|
AuditLog.objects.create(
|
|
user=user,
|
|
action_type='SSO_LOGIN',
|
|
ip_address=self._get_client_ip(request),
|
|
user_agent=request.META.get('HTTP_USER_AGENT', ''),
|
|
details={
|
|
'provider': 'ldap',
|
|
'sso_identifier': user.sso_identifier
|
|
}
|
|
)
|
|
|
|
def _get_client_ip(self, request) -> str:
|
|
"""Get client IP address from request"""
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
|
|
if x_forwarded_for:
|
|
ip = x_forwarded_for.split(',')[0]
|
|
else:
|
|
ip = request.META.get('REMOTE_ADDR')
|
|
return ip
|