""" Security models for Enterprise Incident Management API Implements RBAC/ABAC, MFA, Audit Trails, and Data Classification """ import uuid import hashlib import json from datetime import datetime, timedelta from typing import Dict, Any, Optional from django.db import models from django.contrib.auth.models import AbstractUser, Permission from django.contrib.contenttypes.models import ContentType from django.contrib.auth.hashers import make_password, check_password from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator, MaxValueValidator from django.conf import settings from django.utils import timezone from rest_framework.authtoken.models import Token class DataClassification(models.Model): """Data classification levels for incidents and other data""" CLASSIFICATION_CHOICES = [ ('PUBLIC', 'Public'), ('INTERNAL', 'Internal'), ('CONFIDENTIAL', 'Confidential'), ('RESTRICTED', 'Restricted'), ('TOP_SECRET', 'Top Secret'), ] name = models.CharField(max_length=20, choices=CLASSIFICATION_CHOICES, unique=True) level = models.IntegerField(unique=True, help_text="Numeric level for comparison (higher = more sensitive)") description = models.TextField() color_code = models.CharField(max_length=7, default="#000000", help_text="Hex color code for UI") requires_clearance = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['level'] verbose_name = "Data Classification" verbose_name_plural = "Data Classifications" def __str__(self): return f"{self.name} (Level {self.level})" class Role(models.Model): """Role-based access control roles""" name = models.CharField(max_length=100, unique=True) description = models.TextField() permissions = models.ManyToManyField(Permission, blank=True) data_classification_access = models.ManyToManyField( DataClassification, blank=True, help_text="Maximum data classification level this role can access" ) is_active = models.BooleanField(default=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['name'] def __str__(self): return self.name class User(AbstractUser): """Extended user model with security features""" id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) employee_id = models.CharField(max_length=50, unique=True, null=True, blank=True) department = models.CharField(max_length=100, blank=True) clearance_level = models.ForeignKey( DataClassification, on_delete=models.SET_NULL, null=True, blank=True, help_text="User's security clearance level" ) roles = models.ManyToManyField(Role, blank=True) attributes = models.JSONField(default=dict, blank=True) mfa_enabled = models.BooleanField(default=False) mfa_secret = models.CharField(max_length=32, blank=True) last_login_ip = models.GenericIPAddressField(null=True, blank=True) failed_login_attempts = models.IntegerField(default=0) account_locked_until = models.DateTimeField(null=True, blank=True) sso_provider = models.CharField(max_length=50, blank=True, null=True) sso_identifier = models.CharField(max_length=255, blank=True, null=True) # On-Call Integration phone_number = models.CharField( max_length=20, blank=True, null=True, help_text="Phone number for on-call notifications" ) emergency_contact = models.CharField( max_length=100, blank=True, null=True, help_text="Emergency contact information" ) oncall_preferences = models.JSONField( default=dict, blank=True, help_text="On-call notification preferences" ) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['username'] def __str__(self): return f"{self.username} ({self.get_full_name() or self.email})" def has_data_access(self, classification_level: int) -> bool: """Check if user has access to data at given classification level""" if not self.clearance_level: return classification_level <= 1 # Default to PUBLIC/INTERNAL only return classification_level <= self.clearance_level.level def get_effective_permissions(self) -> set: """Get all permissions from roles and direct permissions""" permissions = set(self.user_permissions.all()) for role in self.roles.filter(is_active=True): permissions.update(role.permissions.all()) return permissions def is_account_locked(self) -> bool: """Check if account is currently locked""" if self.account_locked_until: return timezone.now() < self.account_locked_until return False class MFADevice(models.Model): """Multi-Factor Authentication devices""" DEVICE_TYPES = [ ('TOTP', 'Time-based One-Time Password'), ('HOTP', 'HMAC-based One-Time Password'), ('SMS', 'SMS'), ('EMAIL', 'Email'), ('HARDWARE', 'Hardware Token'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='mfa_devices') device_type = models.CharField(max_length=20, choices=DEVICE_TYPES) name = models.CharField(max_length=100, help_text="User-friendly name for the device") secret_key = models.CharField(max_length=255, help_text="Encrypted secret key") is_active = models.BooleanField(default=True) is_primary = models.BooleanField(default=False) last_used = models.DateTimeField(null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-is_primary', 'name'] unique_together = ['user', 'name'] def __str__(self): return f"{self.user.username} - {self.name} ({self.device_type})" class AuditLog(models.Model): """Immutable audit trail for all security-relevant actions""" ACTION_TYPES = [ ('LOGIN', 'User Login'), ('LOGOUT', 'User Logout'), ('LOGIN_FAILED', 'Failed Login Attempt'), ('PASSWORD_CHANGE', 'Password Changed'), ('MFA_ENABLED', 'MFA Enabled'), ('MFA_DISABLED', 'MFA Disabled'), ('ROLE_ASSIGNED', 'Role Assigned'), ('ROLE_REMOVED', 'Role Removed'), ('DATA_ACCESS', 'Data Accessed'), ('DATA_MODIFIED', 'Data Modified'), ('DATA_DELETED', 'Data Deleted'), ('ACCOUNT_LOCKED', 'Account Locked'), ('ACCOUNT_UNLOCKED', 'Account Unlocked'), ('SSO_LOGIN', 'SSO Login'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) timestamp = models.DateTimeField(auto_now_add=True, db_index=True) user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) action_type = models.CharField(max_length=50, choices=ACTION_TYPES) resource_type = models.CharField(max_length=100, blank=True) resource_id = models.CharField(max_length=255, blank=True) ip_address = models.GenericIPAddressField(null=True, blank=True) user_agent = models.TextField(blank=True) details = models.JSONField(default=dict) severity = models.CharField( max_length=20, choices=[ ('LOW', 'Low'), ('MEDIUM', 'Medium'), ('HIGH', 'High'), ('CRITICAL', 'Critical'), ], default='LOW' ) hash_value = models.CharField(max_length=64, unique=True, help_text="SHA-256 hash for integrity verification") class Meta: ordering = ['-timestamp'] indexes = [ models.Index(fields=['timestamp', 'action_type']), models.Index(fields=['user', 'timestamp']), models.Index(fields=['severity', 'timestamp']), ] def __str__(self): return f"{self.timestamp} - {self.action_type} by {self.user or 'System'}" def save(self, *args, **kwargs): """Override save to generate hash and ensure immutability""" if not self.hash_value: self.hash_value = self._generate_hash() super().save(*args, **kwargs) def _generate_hash(self) -> str: """Generate SHA-256 hash for integrity verification""" import uuid data = { 'timestamp': self.timestamp.isoformat() if self.timestamp else '', 'user_id': str(self.user.id) if self.user else '', 'action_type': self.action_type, 'resource_type': self.resource_type, 'resource_id': self.resource_id, 'ip_address': str(self.ip_address) if self.ip_address else '', 'user_agent': self.user_agent, 'details': json.dumps(self.details, sort_keys=True), 'severity': self.severity, 'unique_id': str(uuid.uuid4()), # Add unique component to prevent hash collisions } data_string = json.dumps(data, sort_keys=True) return hashlib.sha256(data_string.encode()).hexdigest() class SecurityEvent(models.Model): """Security event tracking for enterprise security system""" EVENT_TYPES = [ ('user_login', 'User Login'), ('user_logout', 'User Logout'), ('login_failure', 'Login Failure'), ('suspicious_request', 'Suspicious Request'), ('threat_detected', 'Threat Detected'), ('access_denied', 'Access Denied'), ('privilege_escalation', 'Privilege Escalation'), ('data_breach', 'Data Breach'), ('malware_detected', 'Malware Detected'), ('anomalous_behavior', 'Anomalous Behavior'), ] SEVERITY_CHOICES = [ ('info', 'Info'), ('warning', 'Warning'), ('error', 'Error'), ('critical', 'Critical'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) event_type = models.CharField(max_length=50, choices=EVENT_TYPES) severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES) description = models.TextField() user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) ip_address = models.GenericIPAddressField(null=True, blank=True) user_agent = models.TextField(blank=True) metadata = models.JSONField(default=dict, blank=True) timestamp = models.DateTimeField(auto_now_add=True) class Meta: ordering = ['-timestamp'] indexes = [ models.Index(fields=['event_type', 'timestamp']), models.Index(fields=['severity', 'timestamp']), models.Index(fields=['user', 'timestamp']), ] def __str__(self): return f"{self.event_type} - {self.severity} at {self.timestamp}" class SSOProvider(models.Model): """SSO provider configurations""" PROVIDER_TYPES = [ ('SAML', 'SAML'), ('OAUTH2', 'OAuth 2.0'), ('OIDC', 'OpenID Connect'), ('LDAP', 'LDAP'), ] name = models.CharField(max_length=100, unique=True) provider_type = models.CharField(max_length=20, choices=PROVIDER_TYPES) is_active = models.BooleanField(default=True) configuration = models.JSONField(default=dict) attribute_mapping = models.JSONField(default=dict, help_text="Mapping of SSO attributes to user fields") created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['name'] def __str__(self): return f"{self.name} ({self.provider_type})" class DevicePosture(models.Model): """Device posture assessment for Zero Trust""" DEVICE_TYPES = [ ('DESKTOP', 'Desktop Computer'), ('LAPTOP', 'Laptop Computer'), ('MOBILE', 'Mobile Device'), ('TABLET', 'Tablet'), ('SERVER', 'Server'), ('IOT', 'IoT Device'), ('UNKNOWN', 'Unknown Device'), ] OS_TYPES = [ ('WINDOWS', 'Windows'), ('MACOS', 'macOS'), ('LINUX', 'Linux'), ('ANDROID', 'Android'), ('IOS', 'iOS'), ('UNKNOWN', 'Unknown OS'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='device_postures') # Device identification device_id = models.CharField(max_length=255, unique=True, help_text="Unique device identifier") device_name = models.CharField(max_length=200, blank=True) device_type = models.CharField(max_length=20, choices=DEVICE_TYPES, default='UNKNOWN') os_type = models.CharField(max_length=20, choices=OS_TYPES, default='UNKNOWN') os_version = models.CharField(max_length=100, blank=True) browser_info = models.CharField(max_length=200, blank=True) # Security posture is_managed = models.BooleanField(default=False, help_text="Is device managed by organization") has_antivirus = models.BooleanField(default=False) antivirus_status = models.CharField(max_length=50, blank=True) firewall_enabled = models.BooleanField(default=False) encryption_enabled = models.BooleanField(default=False) screen_lock_enabled = models.BooleanField(default=False) biometric_auth = models.BooleanField(default=False) # Network information ip_address = models.GenericIPAddressField(null=True, blank=True) mac_address = models.CharField(max_length=17, blank=True) network_type = models.CharField(max_length=50, blank=True, help_text="Corporate, Public, Home, etc.") vpn_connected = models.BooleanField(default=False) # Risk assessment risk_score = models.IntegerField( default=0, validators=[MinValueValidator(0), MaxValueValidator(100)], help_text="Device risk score (0-100, higher = more risky)" ) last_assessment = models.DateTimeField(auto_now=True) assessment_details = models.JSONField(default=dict, help_text="Detailed assessment results") # Compliance status is_compliant = models.BooleanField(default=False) compliance_issues = models.JSONField(default=list, help_text="List of compliance issues") # Status is_active = models.BooleanField(default=True) is_trusted = models.BooleanField(default=False) trust_level = models.CharField( max_length=20, choices=[ ('HIGH', 'High Trust'), ('MEDIUM', 'Medium Trust'), ('LOW', 'Low Trust'), ('UNTRUSTED', 'Untrusted'), ], default='LOW' ) # Timestamps first_seen = models.DateTimeField(auto_now_add=True) last_seen = models.DateTimeField(auto_now=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-last_seen'] indexes = [ models.Index(fields=['user', 'is_active']), models.Index(fields=['device_id']), models.Index(fields=['risk_score', 'trust_level']), models.Index(fields=['is_compliant', 'is_trusted']), ] def __str__(self): return f"{self.device_name or self.device_id} ({self.user.username})" def calculate_risk_score(self) -> int: """Calculate device risk score based on various factors""" score = 0 # Base risk factors if not self.is_managed: score += 30 if not self.has_antivirus: score += 20 if not self.firewall_enabled: score += 15 if not self.encryption_enabled: score += 15 if not self.screen_lock_enabled: score += 10 if not self.biometric_auth: score += 5 # Network risk factors if self.network_type == 'Public': score += 25 elif self.network_type == 'Home': score += 10 if not self.vpn_connected and self.network_type != 'Corporate': score += 20 # Device type risk if self.device_type in ['MOBILE', 'TABLET']: score += 10 elif self.device_type == 'IOT': score += 30 # Compliance issues score += len(self.compliance_issues) * 5 return min(score, 100) def update_trust_level(self): """Update trust level based on risk score and compliance""" if self.risk_score <= 20 and self.is_compliant: self.trust_level = 'HIGH' elif self.risk_score <= 40 and self.is_compliant: self.trust_level = 'MEDIUM' elif self.risk_score <= 70: self.trust_level = 'LOW' else: self.trust_level = 'UNTRUSTED' self.is_trusted = self.trust_level in ['HIGH', 'MEDIUM'] class GeolocationRule(models.Model): """Geolocation-based access control rules""" RULE_TYPES = [ ('ALLOW', 'Allow'), ('DENY', 'Deny'), ('REQUIRE_MFA', 'Require Additional MFA'), ('RESTRICT', 'Restrict Access'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) name = models.CharField(max_length=100, unique=True) description = models.TextField() rule_type = models.CharField(max_length=20, choices=RULE_TYPES) # Geographic conditions allowed_countries = models.JSONField(default=list, help_text="List of allowed country codes") blocked_countries = models.JSONField(default=list, help_text="List of blocked country codes") allowed_regions = models.JSONField(default=list, help_text="List of allowed regions/states") blocked_regions = models.JSONField(default=list, help_text="List of blocked regions/states") allowed_cities = models.JSONField(default=list, help_text="List of allowed cities") blocked_cities = models.JSONField(default=list, help_text="List of blocked cities") # IP-based rules allowed_ip_ranges = models.JSONField(default=list, help_text="List of allowed IP ranges (CIDR)") blocked_ip_ranges = models.JSONField(default=list, help_text="List of blocked IP ranges (CIDR)") # Time-based conditions allowed_time_zones = models.JSONField(default=list, help_text="List of allowed time zones") working_hours_only = models.BooleanField(default=False) working_hours_start = models.TimeField(null=True, blank=True) working_hours_end = models.TimeField(null=True, blank=True) working_days = models.JSONField(default=list, help_text="List of working days (0-6, Monday=0)") # Risk factors max_distance_from_office = models.FloatField(null=True, blank=True, help_text="Max distance from office in km") office_latitude = models.FloatField(null=True, blank=True) office_longitude = models.FloatField(null=True, blank=True) # Actions and notifications notification_message = models.TextField(blank=True, help_text="Message to show when rule triggers") log_violation = models.BooleanField(default=True) require_manager_approval = models.BooleanField(default=False) # Status is_active = models.BooleanField(default=True) priority = models.IntegerField(default=100, help_text="Lower numbers have higher priority") # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) class Meta: ordering = ['priority', 'name'] indexes = [ models.Index(fields=['rule_type', 'is_active']), models.Index(fields=['priority']), ] def __str__(self): return f"{self.name} ({self.rule_type})" def evaluate_location(self, latitude: float, longitude: float, country_code: str = None, region: str = None, city: str = None, ip_address: str = None) -> dict: """Evaluate geolocation against this rule""" result = { 'matches': False, 'rule_type': self.rule_type, 'message': self.notification_message, 'requires_approval': self.require_manager_approval } # Check country restrictions if country_code: if self.blocked_countries and country_code in self.blocked_countries: result['matches'] = True result['reason'] = f"Country {country_code} is blocked" return result if self.allowed_countries and country_code not in self.allowed_countries: result['matches'] = True result['reason'] = f"Country {country_code} is not in allowed list" return result # Check region restrictions if region: if self.blocked_regions and region in self.blocked_regions: result['matches'] = True result['reason'] = f"Region {region} is blocked" return result if self.allowed_regions and region not in self.allowed_regions: result['matches'] = True result['reason'] = f"Region {region} is not in allowed list" return result # Check city restrictions if city: if self.blocked_cities and city in self.blocked_cities: result['matches'] = True result['reason'] = f"City {city} is blocked" return result if self.allowed_cities and city not in self.allowed_cities: result['matches'] = True result['reason'] = f"City {city} is not in allowed list" return result # Check IP restrictions if ip_address: if self.blocked_ip_ranges: for ip_range in self.blocked_ip_ranges: if self._ip_in_range(ip_address, ip_range): result['matches'] = True result['reason'] = f"IP {ip_address} is in blocked range {ip_range}" return result if self.allowed_ip_ranges: allowed = False for ip_range in self.allowed_ip_ranges: if self._ip_in_range(ip_address, ip_range): allowed = True break if not allowed: result['matches'] = True result['reason'] = f"IP {ip_address} is not in allowed ranges" return result # Check distance from office if (latitude and longitude and self.office_latitude and self.office_longitude and self.max_distance_from_office): distance = self._calculate_distance( latitude, longitude, self.office_latitude, self.office_longitude ) if distance > self.max_distance_from_office: result['matches'] = True result['reason'] = f"Distance {distance:.2f}km exceeds limit {self.max_distance_from_office}km" return result return result def _ip_in_range(self, ip: str, ip_range: str) -> bool: """Check if IP is in CIDR range""" import ipaddress try: return ipaddress.ip_address(ip) in ipaddress.ip_network(ip_range) except: return False def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two coordinates in kilometers""" import math R = 6371 # Earth's radius in kilometers dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) return R * c class AccessPolicy(models.Model): """Enhanced Attribute-based access control policies with Zero Trust context""" POLICY_TYPES = [ ('ALLOW', 'Allow'), ('DENY', 'Deny'), ('REQUIRE_MFA', 'Require Additional MFA'), ('STEP_UP_AUTH', 'Step-up Authentication'), ('RISK_BASED', 'Risk-based Decision'), ] name = models.CharField(max_length=100, unique=True) description = models.TextField() policy_type = models.CharField(max_length=20, choices=POLICY_TYPES) conditions = models.JSONField(help_text="JSON conditions for policy evaluation") resource_type = models.CharField(max_length=100, blank=True) actions = models.JSONField(default=list, help_text="List of actions this policy applies to") priority = models.IntegerField(default=100, help_text="Lower numbers have higher priority") # Zero Trust context requires_device_trust = models.BooleanField(default=False) min_device_trust_level = models.CharField( max_length=20, choices=[ ('HIGH', 'High Trust'), ('MEDIUM', 'Medium Trust'), ('LOW', 'Low Trust'), ], default='LOW' ) requires_geolocation_check = models.BooleanField(default=False) geolocation_rule = models.ForeignKey(GeolocationRule, on_delete=models.SET_NULL, null=True, blank=True) # Risk-based conditions max_risk_score = models.IntegerField( default=100, validators=[MinValueValidator(0), MaxValueValidator(100)], help_text="Maximum allowed risk score" ) requires_compliant_device = models.BooleanField(default=False) # Time-based conditions time_restrictions = models.JSONField(default=dict, help_text="Time-based access restrictions") # Adaptive authentication adaptive_auth_enabled = models.BooleanField(default=False) auth_factors_required = models.JSONField(default=list, help_text="Required authentication factors") is_active = models.BooleanField(default=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['priority', 'name'] verbose_name_plural = "Access Policies" def __str__(self): return f"{self.name} ({self.policy_type})" def evaluate(self, user: User, resource: Any = None, action: str = None, context: dict = None) -> dict: """Evaluate policy against user, resource, and Zero Trust context""" result = { 'allowed': False, 'policy_type': self.policy_type, 'reason': '', 'required_actions': [], 'risk_score': 0 } if not self.is_active: return result # Basic policy evaluation if self.policy_type == 'DENY': result['allowed'] = False result['reason'] = 'Policy explicitly denies access' return result # Check device trust requirements if self.requires_device_trust and context: device_posture = context.get('device_posture') if not device_posture or not device_posture.is_trusted: result['required_actions'].append('DEVICE_REGISTRATION') result['reason'] = 'Untrusted device requires registration' if device_posture and device_posture.trust_level not in ['HIGH', 'MEDIUM', 'LOW']: trust_levels = ['HIGH', 'MEDIUM', 'LOW'] min_level_index = trust_levels.index(self.min_device_trust_level) device_level_index = trust_levels.index(device_posture.trust_level) if device_level_index > min_level_index: result['required_actions'].append('STEP_UP_AUTH') result['reason'] = f'Device trust level {device_posture.trust_level} below required {self.min_device_trust_level}' # Check geolocation requirements if self.requires_geolocation_check and self.geolocation_rule and context: location_data = context.get('location', {}) geo_result = self.geolocation_rule.evaluate_location(**location_data) if geo_result['matches']: if self.geolocation_rule.rule_type == 'DENY': result['allowed'] = False result['reason'] = geo_result['reason'] return result elif self.geolocation_rule.rule_type == 'REQUIRE_MFA': result['required_actions'].append('ADDITIONAL_MFA') result['reason'] = geo_result['reason'] # Check risk score if context: risk_score = context.get('risk_score', 0) if risk_score > self.max_risk_score: result['required_actions'].append('RISK_MITIGATION') result['reason'] = f'Risk score {risk_score} exceeds maximum {self.max_risk_score}' # Check device compliance if self.requires_compliant_device and context: device_posture = context.get('device_posture') if not device_posture or not device_posture.is_compliant: result['required_actions'].append('DEVICE_COMPLIANCE') result['reason'] = 'Device compliance required' # If no blocking conditions, allow access if not result['required_actions'] and not result['reason']: result['allowed'] = True result['reason'] = 'Access granted' return result class RiskAssessment(models.Model): """Risk assessment for Zero Trust decisions""" RISK_FACTORS = [ ('DEVICE_RISK', 'Device Risk'), ('LOCATION_RISK', 'Location Risk'), ('BEHAVIOR_RISK', 'Behavior Risk'), ('NETWORK_RISK', 'Network Risk'), ('TIME_RISK', 'Time Risk'), ('USER_RISK', 'User Risk'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='risk_assessments') # Assessment context assessment_type = models.CharField(max_length=50, default='LOGIN', help_text="Type of assessment (LOGIN, ACCESS, TRANSACTION)") resource_type = models.CharField(max_length=100, blank=True, help_text="Type of resource being accessed") resource_id = models.CharField(max_length=255, blank=True, help_text="ID of resource being accessed") # Risk factors and scores device_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) location_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) behavior_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) network_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) time_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) user_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) # Overall risk overall_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)]) risk_level = models.CharField( max_length=20, choices=[ ('LOW', 'Low Risk'), ('MEDIUM', 'Medium Risk'), ('HIGH', 'High Risk'), ('CRITICAL', 'Critical Risk'), ], default='LOW' ) # Context data ip_address = models.GenericIPAddressField(null=True, blank=True) user_agent = models.TextField(blank=True) location_data = models.JSONField(default=dict, help_text="Geolocation and network data") device_data = models.JSONField(default=dict, help_text="Device information") behavior_data = models.JSONField(default=dict, help_text="User behavior patterns") # Assessment details risk_factors = models.JSONField(default=list, help_text="List of identified risk factors") mitigation_actions = models.JSONField(default=list, help_text="Recommended mitigation actions") assessment_details = models.JSONField(default=dict, help_text="Detailed assessment results") # Decision access_decision = models.CharField( max_length=20, choices=[ ('ALLOW', 'Allow Access'), ('DENY', 'Deny Access'), ('STEP_UP', 'Step-up Authentication'), ('REVIEW', 'Manual Review Required'), ], default='ALLOW' ) decision_reason = models.TextField(blank=True) # Timestamps assessed_at = models.DateTimeField(auto_now_add=True) expires_at = models.DateTimeField(null=True, blank=True) class Meta: ordering = ['-assessed_at'] indexes = [ models.Index(fields=['user', 'assessed_at']), models.Index(fields=['overall_risk_score', 'risk_level']), models.Index(fields=['access_decision']), ] def __str__(self): return f"Risk Assessment for {self.user.username} - {self.risk_level} ({self.overall_risk_score})" def calculate_overall_risk(self): """Calculate overall risk score from individual factors""" # Weighted risk calculation weights = { 'device': 0.25, 'location': 0.20, 'behavior': 0.20, 'network': 0.15, 'time': 0.10, 'user': 0.10 } weighted_score = ( self.device_risk_score * weights['device'] + self.location_risk_score * weights['location'] + self.behavior_risk_score * weights['behavior'] + self.network_risk_score * weights['network'] + self.time_risk_score * weights['time'] + self.user_risk_score * weights['user'] ) self.overall_risk_score = int(weighted_score) # Determine risk level if self.overall_risk_score <= 25: self.risk_level = 'LOW' elif self.overall_risk_score <= 50: self.risk_level = 'MEDIUM' elif self.overall_risk_score <= 75: self.risk_level = 'HIGH' else: self.risk_level = 'CRITICAL' def determine_access_decision(self): """Determine access decision based on risk level""" if self.risk_level == 'LOW': self.access_decision = 'ALLOW' self.decision_reason = 'Low risk - access allowed' elif self.risk_level == 'MEDIUM': self.access_decision = 'STEP_UP' self.decision_reason = 'Medium risk - step-up authentication required' elif self.risk_level == 'HIGH': self.access_decision = 'REVIEW' self.decision_reason = 'High risk - manual review required' else: # CRITICAL self.access_decision = 'DENY' self.decision_reason = 'Critical risk - access denied' class AdaptiveAuthentication(models.Model): """Adaptive authentication rules and configurations""" AUTH_METHODS = [ ('PASSWORD', 'Password'), ('MFA_TOTP', 'MFA TOTP'), ('MFA_SMS', 'MFA SMS'), ('MFA_EMAIL', 'MFA Email'), ('BIOMETRIC', 'Biometric'), ('HARDWARE_TOKEN', 'Hardware Token'), ('SSO', 'Single Sign-On'), ('CERTIFICATE', 'Certificate'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) name = models.CharField(max_length=100, unique=True) description = models.TextField() # Risk thresholds low_risk_threshold = models.IntegerField(default=25, validators=[MinValueValidator(0), MaxValueValidator(100)]) medium_risk_threshold = models.IntegerField(default=50, validators=[MinValueValidator(0), MaxValueValidator(100)]) high_risk_threshold = models.IntegerField(default=75, validators=[MinValueValidator(0), MaxValueValidator(100)]) # Authentication requirements by risk level low_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for low risk") medium_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for medium risk") high_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for high risk") critical_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for critical risk") # Context-based adjustments device_trust_multiplier = models.FloatField(default=1.0, help_text="Multiplier for device trust") location_trust_multiplier = models.FloatField(default=1.0, help_text="Multiplier for location trust") time_trust_multiplier = models.FloatField(default=1.0, help_text="Multiplier for time trust") # Behavioral analysis enable_behavioral_analysis = models.BooleanField(default=True) behavior_learning_period = models.IntegerField(default=30, help_text="Days to learn user behavior") anomaly_threshold = models.FloatField(default=0.7, help_text="Threshold for behavioral anomalies") # Machine learning features ml_enabled = models.BooleanField(default=False) ml_model_path = models.CharField(max_length=500, blank=True, help_text="Path to ML model file") ml_confidence_threshold = models.FloatField(default=0.8, help_text="ML confidence threshold") # Fallback options fallback_auth_methods = models.JSONField(default=list, help_text="Fallback auth methods") max_auth_attempts = models.IntegerField(default=3) lockout_duration = models.IntegerField(default=15, help_text="Lockout duration in minutes") # Status is_active = models.BooleanField(default=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['name'] def __str__(self): return f"{self.name} (Adaptive Auth)" def get_required_auth_methods(self, risk_score: int) -> list: """Get required authentication methods based on risk score""" if risk_score <= self.low_risk_threshold: return self.low_risk_auth_methods elif risk_score <= self.medium_risk_threshold: return self.medium_risk_auth_methods elif risk_score <= self.high_risk_threshold: return self.high_risk_auth_methods else: return self.critical_risk_auth_methods def calculate_adjusted_risk(self, base_risk: int, context: dict) -> int: """Calculate adjusted risk score based on context""" adjusted_risk = base_risk # Apply device trust multiplier if context.get('device_trust_level') == 'HIGH': adjusted_risk *= self.device_trust_multiplier elif context.get('device_trust_level') == 'LOW': adjusted_risk *= (2.0 - self.device_trust_multiplier) # Apply location trust multiplier if context.get('location_trust') == 'TRUSTED': adjusted_risk *= self.location_trust_multiplier elif context.get('location_trust') == 'UNTRUSTED': adjusted_risk *= (2.0 - self.location_trust_multiplier) # Apply time trust multiplier if context.get('time_trust') == 'NORMAL_HOURS': adjusted_risk *= self.time_trust_multiplier elif context.get('time_trust') == 'UNUSUAL_HOURS': adjusted_risk *= (2.0 - self.time_trust_multiplier) return min(int(adjusted_risk), 100) class UserBehaviorProfile(models.Model): """User behavior profile for anomaly detection""" id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='behavior_profile') # Login patterns typical_login_times = models.JSONField(default=list, help_text="Typical login times") typical_login_locations = models.JSONField(default=list, help_text="Typical login locations") typical_login_devices = models.JSONField(default=list, help_text="Typical login devices") # Access patterns typical_access_times = models.JSONField(default=list, help_text="Typical resource access times") typical_access_patterns = models.JSONField(default=list, help_text="Typical access patterns") typical_session_duration = models.FloatField(default=0.0, help_text="Typical session duration in hours") # Network patterns typical_ip_ranges = models.JSONField(default=list, help_text="Typical IP address ranges") typical_user_agents = models.JSONField(default=list, help_text="Typical user agents") # Behavioral metrics login_frequency = models.FloatField(default=0.0, help_text="Average logins per day") access_frequency = models.FloatField(default=0.0, help_text="Average resource accesses per day") anomaly_score = models.FloatField(default=0.0, help_text="Current anomaly score") # Learning status is_learning = models.BooleanField(default=True) learning_start_date = models.DateTimeField(auto_now_add=True) learning_complete_date = models.DateTimeField(null=True, blank=True) sample_count = models.IntegerField(default=0, help_text="Number of samples used for learning") # Timestamps last_updated = models.DateTimeField(auto_now=True) created_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ['-last_updated'] indexes = [ models.Index(fields=['user', 'is_learning']), models.Index(fields=['anomaly_score']), ] def __str__(self): return f"Behavior Profile for {self.user.username}" def calculate_anomaly_score(self, current_behavior: dict) -> float: """Calculate anomaly score for current behavior""" score = 0.0 # Time anomaly current_time = current_behavior.get('login_time') if current_time and self.typical_login_times: time_anomaly = self._calculate_time_anomaly(current_time, self.typical_login_times) score += time_anomaly * 0.3 # Location anomaly current_location = current_behavior.get('location') if current_location and self.typical_login_locations: location_anomaly = self._calculate_location_anomaly(current_location, self.typical_login_locations) score += location_anomaly * 0.3 # Device anomaly current_device = current_behavior.get('device_id') if current_device and self.typical_login_devices: device_anomaly = self._calculate_device_anomaly(current_device, self.typical_login_devices) score += device_anomaly * 0.2 # IP anomaly current_ip = current_behavior.get('ip_address') if current_ip and self.typical_ip_ranges: ip_anomaly = self._calculate_ip_anomaly(current_ip, self.typical_ip_ranges) score += ip_anomaly * 0.2 self.anomaly_score = min(score, 1.0) return self.anomaly_score def _calculate_time_anomaly(self, current_time, typical_times) -> float: """Calculate time-based anomaly score""" # Simplified time anomaly calculation # In production, this would use more sophisticated time series analysis return 0.0 # Placeholder def _calculate_location_anomaly(self, current_location, typical_locations) -> float: """Calculate location-based anomaly score""" # Simplified location anomaly calculation return 0.0 # Placeholder def _calculate_device_anomaly(self, current_device, typical_devices) -> float: """Calculate device-based anomaly score""" if current_device in typical_devices: return 0.0 return 1.0 def _calculate_ip_anomaly(self, current_ip, typical_ranges) -> float: """Calculate IP-based anomaly score""" # Simplified IP anomaly calculation return 0.0 # Placeholder