Files
ETB/ETB-API/security/models.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

1078 lines
44 KiB
Python

"""
Security models for Enterprise Incident Management API
Implements RBAC/ABAC, MFA, Audit Trails, and Data Classification
"""
import uuid
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from django.db import models
from django.contrib.auth.models import AbstractUser, Permission
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.hashers import make_password, check_password
from django.core.exceptions import ValidationError
from django.core.validators import MinValueValidator, MaxValueValidator
from django.conf import settings
from django.utils import timezone
from rest_framework.authtoken.models import Token
class DataClassification(models.Model):
"""Data classification levels for incidents and other data"""
CLASSIFICATION_CHOICES = [
('PUBLIC', 'Public'),
('INTERNAL', 'Internal'),
('CONFIDENTIAL', 'Confidential'),
('RESTRICTED', 'Restricted'),
('TOP_SECRET', 'Top Secret'),
]
name = models.CharField(max_length=20, choices=CLASSIFICATION_CHOICES, unique=True)
level = models.IntegerField(unique=True, help_text="Numeric level for comparison (higher = more sensitive)")
description = models.TextField()
color_code = models.CharField(max_length=7, default="#000000", help_text="Hex color code for UI")
requires_clearance = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['level']
verbose_name = "Data Classification"
verbose_name_plural = "Data Classifications"
def __str__(self):
return f"{self.name} (Level {self.level})"
class Role(models.Model):
"""Role-based access control roles"""
name = models.CharField(max_length=100, unique=True)
description = models.TextField()
permissions = models.ManyToManyField(Permission, blank=True)
data_classification_access = models.ManyToManyField(
DataClassification,
blank=True,
help_text="Maximum data classification level this role can access"
)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
def __str__(self):
return self.name
class User(AbstractUser):
"""Extended user model with security features"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
employee_id = models.CharField(max_length=50, unique=True, null=True, blank=True)
department = models.CharField(max_length=100, blank=True)
clearance_level = models.ForeignKey(
DataClassification,
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text="User's security clearance level"
)
roles = models.ManyToManyField(Role, blank=True)
attributes = models.JSONField(default=dict, blank=True)
mfa_enabled = models.BooleanField(default=False)
mfa_secret = models.CharField(max_length=32, blank=True)
last_login_ip = models.GenericIPAddressField(null=True, blank=True)
failed_login_attempts = models.IntegerField(default=0)
account_locked_until = models.DateTimeField(null=True, blank=True)
sso_provider = models.CharField(max_length=50, blank=True, null=True)
sso_identifier = models.CharField(max_length=255, blank=True, null=True)
# On-Call Integration
phone_number = models.CharField(
max_length=20,
blank=True,
null=True,
help_text="Phone number for on-call notifications"
)
emergency_contact = models.CharField(
max_length=100,
blank=True,
null=True,
help_text="Emergency contact information"
)
oncall_preferences = models.JSONField(
default=dict,
blank=True,
help_text="On-call notification preferences"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['username']
def __str__(self):
return f"{self.username} ({self.get_full_name() or self.email})"
def has_data_access(self, classification_level: int) -> bool:
"""Check if user has access to data at given classification level"""
if not self.clearance_level:
return classification_level <= 1 # Default to PUBLIC/INTERNAL only
return classification_level <= self.clearance_level.level
def get_effective_permissions(self) -> set:
"""Get all permissions from roles and direct permissions"""
permissions = set(self.user_permissions.all())
for role in self.roles.filter(is_active=True):
permissions.update(role.permissions.all())
return permissions
def is_account_locked(self) -> bool:
"""Check if account is currently locked"""
if self.account_locked_until:
return timezone.now() < self.account_locked_until
return False
class MFADevice(models.Model):
"""Multi-Factor Authentication devices"""
DEVICE_TYPES = [
('TOTP', 'Time-based One-Time Password'),
('HOTP', 'HMAC-based One-Time Password'),
('SMS', 'SMS'),
('EMAIL', 'Email'),
('HARDWARE', 'Hardware Token'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='mfa_devices')
device_type = models.CharField(max_length=20, choices=DEVICE_TYPES)
name = models.CharField(max_length=100, help_text="User-friendly name for the device")
secret_key = models.CharField(max_length=255, help_text="Encrypted secret key")
is_active = models.BooleanField(default=True)
is_primary = models.BooleanField(default=False)
last_used = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-is_primary', 'name']
unique_together = ['user', 'name']
def __str__(self):
return f"{self.user.username} - {self.name} ({self.device_type})"
class AuditLog(models.Model):
"""Immutable audit trail for all security-relevant actions"""
ACTION_TYPES = [
('LOGIN', 'User Login'),
('LOGOUT', 'User Logout'),
('LOGIN_FAILED', 'Failed Login Attempt'),
('PASSWORD_CHANGE', 'Password Changed'),
('MFA_ENABLED', 'MFA Enabled'),
('MFA_DISABLED', 'MFA Disabled'),
('ROLE_ASSIGNED', 'Role Assigned'),
('ROLE_REMOVED', 'Role Removed'),
('DATA_ACCESS', 'Data Accessed'),
('DATA_MODIFIED', 'Data Modified'),
('DATA_DELETED', 'Data Deleted'),
('ACCOUNT_LOCKED', 'Account Locked'),
('ACCOUNT_UNLOCKED', 'Account Unlocked'),
('SSO_LOGIN', 'SSO Login'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
action_type = models.CharField(max_length=50, choices=ACTION_TYPES)
resource_type = models.CharField(max_length=100, blank=True)
resource_id = models.CharField(max_length=255, blank=True)
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)
details = models.JSONField(default=dict)
severity = models.CharField(
max_length=20,
choices=[
('LOW', 'Low'),
('MEDIUM', 'Medium'),
('HIGH', 'High'),
('CRITICAL', 'Critical'),
],
default='LOW'
)
hash_value = models.CharField(max_length=64, unique=True, help_text="SHA-256 hash for integrity verification")
class Meta:
ordering = ['-timestamp']
indexes = [
models.Index(fields=['timestamp', 'action_type']),
models.Index(fields=['user', 'timestamp']),
models.Index(fields=['severity', 'timestamp']),
]
def __str__(self):
return f"{self.timestamp} - {self.action_type} by {self.user or 'System'}"
def save(self, *args, **kwargs):
"""Override save to generate hash and ensure immutability"""
if not self.hash_value:
self.hash_value = self._generate_hash()
super().save(*args, **kwargs)
def _generate_hash(self) -> str:
"""Generate SHA-256 hash for integrity verification"""
import uuid
data = {
'timestamp': self.timestamp.isoformat() if self.timestamp else '',
'user_id': str(self.user.id) if self.user else '',
'action_type': self.action_type,
'resource_type': self.resource_type,
'resource_id': self.resource_id,
'ip_address': str(self.ip_address) if self.ip_address else '',
'user_agent': self.user_agent,
'details': json.dumps(self.details, sort_keys=True),
'severity': self.severity,
'unique_id': str(uuid.uuid4()), # Add unique component to prevent hash collisions
}
data_string = json.dumps(data, sort_keys=True)
return hashlib.sha256(data_string.encode()).hexdigest()
class SecurityEvent(models.Model):
"""Security event tracking for enterprise security system"""
EVENT_TYPES = [
('user_login', 'User Login'),
('user_logout', 'User Logout'),
('login_failure', 'Login Failure'),
('suspicious_request', 'Suspicious Request'),
('threat_detected', 'Threat Detected'),
('access_denied', 'Access Denied'),
('privilege_escalation', 'Privilege Escalation'),
('data_breach', 'Data Breach'),
('malware_detected', 'Malware Detected'),
('anomalous_behavior', 'Anomalous Behavior'),
]
SEVERITY_CHOICES = [
('info', 'Info'),
('warning', 'Warning'),
('error', 'Error'),
('critical', 'Critical'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
event_type = models.CharField(max_length=50, choices=EVENT_TYPES)
severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES)
description = models.TextField()
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)
metadata = models.JSONField(default=dict, blank=True)
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-timestamp']
indexes = [
models.Index(fields=['event_type', 'timestamp']),
models.Index(fields=['severity', 'timestamp']),
models.Index(fields=['user', 'timestamp']),
]
def __str__(self):
return f"{self.event_type} - {self.severity} at {self.timestamp}"
class SSOProvider(models.Model):
"""SSO provider configurations"""
PROVIDER_TYPES = [
('SAML', 'SAML'),
('OAUTH2', 'OAuth 2.0'),
('OIDC', 'OpenID Connect'),
('LDAP', 'LDAP'),
]
name = models.CharField(max_length=100, unique=True)
provider_type = models.CharField(max_length=20, choices=PROVIDER_TYPES)
is_active = models.BooleanField(default=True)
configuration = models.JSONField(default=dict)
attribute_mapping = models.JSONField(default=dict, help_text="Mapping of SSO attributes to user fields")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
def __str__(self):
return f"{self.name} ({self.provider_type})"
class DevicePosture(models.Model):
"""Device posture assessment for Zero Trust"""
DEVICE_TYPES = [
('DESKTOP', 'Desktop Computer'),
('LAPTOP', 'Laptop Computer'),
('MOBILE', 'Mobile Device'),
('TABLET', 'Tablet'),
('SERVER', 'Server'),
('IOT', 'IoT Device'),
('UNKNOWN', 'Unknown Device'),
]
OS_TYPES = [
('WINDOWS', 'Windows'),
('MACOS', 'macOS'),
('LINUX', 'Linux'),
('ANDROID', 'Android'),
('IOS', 'iOS'),
('UNKNOWN', 'Unknown OS'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='device_postures')
# Device identification
device_id = models.CharField(max_length=255, unique=True, help_text="Unique device identifier")
device_name = models.CharField(max_length=200, blank=True)
device_type = models.CharField(max_length=20, choices=DEVICE_TYPES, default='UNKNOWN')
os_type = models.CharField(max_length=20, choices=OS_TYPES, default='UNKNOWN')
os_version = models.CharField(max_length=100, blank=True)
browser_info = models.CharField(max_length=200, blank=True)
# Security posture
is_managed = models.BooleanField(default=False, help_text="Is device managed by organization")
has_antivirus = models.BooleanField(default=False)
antivirus_status = models.CharField(max_length=50, blank=True)
firewall_enabled = models.BooleanField(default=False)
encryption_enabled = models.BooleanField(default=False)
screen_lock_enabled = models.BooleanField(default=False)
biometric_auth = models.BooleanField(default=False)
# Network information
ip_address = models.GenericIPAddressField(null=True, blank=True)
mac_address = models.CharField(max_length=17, blank=True)
network_type = models.CharField(max_length=50, blank=True, help_text="Corporate, Public, Home, etc.")
vpn_connected = models.BooleanField(default=False)
# Risk assessment
risk_score = models.IntegerField(
default=0,
validators=[MinValueValidator(0), MaxValueValidator(100)],
help_text="Device risk score (0-100, higher = more risky)"
)
last_assessment = models.DateTimeField(auto_now=True)
assessment_details = models.JSONField(default=dict, help_text="Detailed assessment results")
# Compliance status
is_compliant = models.BooleanField(default=False)
compliance_issues = models.JSONField(default=list, help_text="List of compliance issues")
# Status
is_active = models.BooleanField(default=True)
is_trusted = models.BooleanField(default=False)
trust_level = models.CharField(
max_length=20,
choices=[
('HIGH', 'High Trust'),
('MEDIUM', 'Medium Trust'),
('LOW', 'Low Trust'),
('UNTRUSTED', 'Untrusted'),
],
default='LOW'
)
# Timestamps
first_seen = models.DateTimeField(auto_now_add=True)
last_seen = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-last_seen']
indexes = [
models.Index(fields=['user', 'is_active']),
models.Index(fields=['device_id']),
models.Index(fields=['risk_score', 'trust_level']),
models.Index(fields=['is_compliant', 'is_trusted']),
]
def __str__(self):
return f"{self.device_name or self.device_id} ({self.user.username})"
def calculate_risk_score(self) -> int:
"""Calculate device risk score based on various factors"""
score = 0
# Base risk factors
if not self.is_managed:
score += 30
if not self.has_antivirus:
score += 20
if not self.firewall_enabled:
score += 15
if not self.encryption_enabled:
score += 15
if not self.screen_lock_enabled:
score += 10
if not self.biometric_auth:
score += 5
# Network risk factors
if self.network_type == 'Public':
score += 25
elif self.network_type == 'Home':
score += 10
if not self.vpn_connected and self.network_type != 'Corporate':
score += 20
# Device type risk
if self.device_type in ['MOBILE', 'TABLET']:
score += 10
elif self.device_type == 'IOT':
score += 30
# Compliance issues
score += len(self.compliance_issues) * 5
return min(score, 100)
def update_trust_level(self):
"""Update trust level based on risk score and compliance"""
if self.risk_score <= 20 and self.is_compliant:
self.trust_level = 'HIGH'
elif self.risk_score <= 40 and self.is_compliant:
self.trust_level = 'MEDIUM'
elif self.risk_score <= 70:
self.trust_level = 'LOW'
else:
self.trust_level = 'UNTRUSTED'
self.is_trusted = self.trust_level in ['HIGH', 'MEDIUM']
class GeolocationRule(models.Model):
"""Geolocation-based access control rules"""
RULE_TYPES = [
('ALLOW', 'Allow'),
('DENY', 'Deny'),
('REQUIRE_MFA', 'Require Additional MFA'),
('RESTRICT', 'Restrict Access'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
description = models.TextField()
rule_type = models.CharField(max_length=20, choices=RULE_TYPES)
# Geographic conditions
allowed_countries = models.JSONField(default=list, help_text="List of allowed country codes")
blocked_countries = models.JSONField(default=list, help_text="List of blocked country codes")
allowed_regions = models.JSONField(default=list, help_text="List of allowed regions/states")
blocked_regions = models.JSONField(default=list, help_text="List of blocked regions/states")
allowed_cities = models.JSONField(default=list, help_text="List of allowed cities")
blocked_cities = models.JSONField(default=list, help_text="List of blocked cities")
# IP-based rules
allowed_ip_ranges = models.JSONField(default=list, help_text="List of allowed IP ranges (CIDR)")
blocked_ip_ranges = models.JSONField(default=list, help_text="List of blocked IP ranges (CIDR)")
# Time-based conditions
allowed_time_zones = models.JSONField(default=list, help_text="List of allowed time zones")
working_hours_only = models.BooleanField(default=False)
working_hours_start = models.TimeField(null=True, blank=True)
working_hours_end = models.TimeField(null=True, blank=True)
working_days = models.JSONField(default=list, help_text="List of working days (0-6, Monday=0)")
# Risk factors
max_distance_from_office = models.FloatField(null=True, blank=True, help_text="Max distance from office in km")
office_latitude = models.FloatField(null=True, blank=True)
office_longitude = models.FloatField(null=True, blank=True)
# Actions and notifications
notification_message = models.TextField(blank=True, help_text="Message to show when rule triggers")
log_violation = models.BooleanField(default=True)
require_manager_approval = models.BooleanField(default=False)
# Status
is_active = models.BooleanField(default=True)
priority = models.IntegerField(default=100, help_text="Lower numbers have higher priority")
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
class Meta:
ordering = ['priority', 'name']
indexes = [
models.Index(fields=['rule_type', 'is_active']),
models.Index(fields=['priority']),
]
def __str__(self):
return f"{self.name} ({self.rule_type})"
def evaluate_location(self, latitude: float, longitude: float, country_code: str = None,
region: str = None, city: str = None, ip_address: str = None) -> dict:
"""Evaluate geolocation against this rule"""
result = {
'matches': False,
'rule_type': self.rule_type,
'message': self.notification_message,
'requires_approval': self.require_manager_approval
}
# Check country restrictions
if country_code:
if self.blocked_countries and country_code in self.blocked_countries:
result['matches'] = True
result['reason'] = f"Country {country_code} is blocked"
return result
if self.allowed_countries and country_code not in self.allowed_countries:
result['matches'] = True
result['reason'] = f"Country {country_code} is not in allowed list"
return result
# Check region restrictions
if region:
if self.blocked_regions and region in self.blocked_regions:
result['matches'] = True
result['reason'] = f"Region {region} is blocked"
return result
if self.allowed_regions and region not in self.allowed_regions:
result['matches'] = True
result['reason'] = f"Region {region} is not in allowed list"
return result
# Check city restrictions
if city:
if self.blocked_cities and city in self.blocked_cities:
result['matches'] = True
result['reason'] = f"City {city} is blocked"
return result
if self.allowed_cities and city not in self.allowed_cities:
result['matches'] = True
result['reason'] = f"City {city} is not in allowed list"
return result
# Check IP restrictions
if ip_address:
if self.blocked_ip_ranges:
for ip_range in self.blocked_ip_ranges:
if self._ip_in_range(ip_address, ip_range):
result['matches'] = True
result['reason'] = f"IP {ip_address} is in blocked range {ip_range}"
return result
if self.allowed_ip_ranges:
allowed = False
for ip_range in self.allowed_ip_ranges:
if self._ip_in_range(ip_address, ip_range):
allowed = True
break
if not allowed:
result['matches'] = True
result['reason'] = f"IP {ip_address} is not in allowed ranges"
return result
# Check distance from office
if (latitude and longitude and self.office_latitude and self.office_longitude
and self.max_distance_from_office):
distance = self._calculate_distance(
latitude, longitude, self.office_latitude, self.office_longitude
)
if distance > self.max_distance_from_office:
result['matches'] = True
result['reason'] = f"Distance {distance:.2f}km exceeds limit {self.max_distance_from_office}km"
return result
return result
def _ip_in_range(self, ip: str, ip_range: str) -> bool:
"""Check if IP is in CIDR range"""
import ipaddress
try:
return ipaddress.ip_address(ip) in ipaddress.ip_network(ip_range)
except:
return False
def _calculate_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two coordinates in kilometers"""
import math
R = 6371 # Earth's radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2) * math.sin(dlat/2) +
math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) *
math.sin(dlon/2) * math.sin(dlon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
return R * c
class AccessPolicy(models.Model):
"""Enhanced Attribute-based access control policies with Zero Trust context"""
POLICY_TYPES = [
('ALLOW', 'Allow'),
('DENY', 'Deny'),
('REQUIRE_MFA', 'Require Additional MFA'),
('STEP_UP_AUTH', 'Step-up Authentication'),
('RISK_BASED', 'Risk-based Decision'),
]
name = models.CharField(max_length=100, unique=True)
description = models.TextField()
policy_type = models.CharField(max_length=20, choices=POLICY_TYPES)
conditions = models.JSONField(help_text="JSON conditions for policy evaluation")
resource_type = models.CharField(max_length=100, blank=True)
actions = models.JSONField(default=list, help_text="List of actions this policy applies to")
priority = models.IntegerField(default=100, help_text="Lower numbers have higher priority")
# Zero Trust context
requires_device_trust = models.BooleanField(default=False)
min_device_trust_level = models.CharField(
max_length=20,
choices=[
('HIGH', 'High Trust'),
('MEDIUM', 'Medium Trust'),
('LOW', 'Low Trust'),
],
default='LOW'
)
requires_geolocation_check = models.BooleanField(default=False)
geolocation_rule = models.ForeignKey(GeolocationRule, on_delete=models.SET_NULL, null=True, blank=True)
# Risk-based conditions
max_risk_score = models.IntegerField(
default=100,
validators=[MinValueValidator(0), MaxValueValidator(100)],
help_text="Maximum allowed risk score"
)
requires_compliant_device = models.BooleanField(default=False)
# Time-based conditions
time_restrictions = models.JSONField(default=dict, help_text="Time-based access restrictions")
# Adaptive authentication
adaptive_auth_enabled = models.BooleanField(default=False)
auth_factors_required = models.JSONField(default=list, help_text="Required authentication factors")
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['priority', 'name']
verbose_name_plural = "Access Policies"
def __str__(self):
return f"{self.name} ({self.policy_type})"
def evaluate(self, user: User, resource: Any = None, action: str = None,
context: dict = None) -> dict:
"""Evaluate policy against user, resource, and Zero Trust context"""
result = {
'allowed': False,
'policy_type': self.policy_type,
'reason': '',
'required_actions': [],
'risk_score': 0
}
if not self.is_active:
return result
# Basic policy evaluation
if self.policy_type == 'DENY':
result['allowed'] = False
result['reason'] = 'Policy explicitly denies access'
return result
# Check device trust requirements
if self.requires_device_trust and context:
device_posture = context.get('device_posture')
if not device_posture or not device_posture.is_trusted:
result['required_actions'].append('DEVICE_REGISTRATION')
result['reason'] = 'Untrusted device requires registration'
if device_posture and device_posture.trust_level not in ['HIGH', 'MEDIUM', 'LOW']:
trust_levels = ['HIGH', 'MEDIUM', 'LOW']
min_level_index = trust_levels.index(self.min_device_trust_level)
device_level_index = trust_levels.index(device_posture.trust_level)
if device_level_index > min_level_index:
result['required_actions'].append('STEP_UP_AUTH')
result['reason'] = f'Device trust level {device_posture.trust_level} below required {self.min_device_trust_level}'
# Check geolocation requirements
if self.requires_geolocation_check and self.geolocation_rule and context:
location_data = context.get('location', {})
geo_result = self.geolocation_rule.evaluate_location(**location_data)
if geo_result['matches']:
if self.geolocation_rule.rule_type == 'DENY':
result['allowed'] = False
result['reason'] = geo_result['reason']
return result
elif self.geolocation_rule.rule_type == 'REQUIRE_MFA':
result['required_actions'].append('ADDITIONAL_MFA')
result['reason'] = geo_result['reason']
# Check risk score
if context:
risk_score = context.get('risk_score', 0)
if risk_score > self.max_risk_score:
result['required_actions'].append('RISK_MITIGATION')
result['reason'] = f'Risk score {risk_score} exceeds maximum {self.max_risk_score}'
# Check device compliance
if self.requires_compliant_device and context:
device_posture = context.get('device_posture')
if not device_posture or not device_posture.is_compliant:
result['required_actions'].append('DEVICE_COMPLIANCE')
result['reason'] = 'Device compliance required'
# If no blocking conditions, allow access
if not result['required_actions'] and not result['reason']:
result['allowed'] = True
result['reason'] = 'Access granted'
return result
class RiskAssessment(models.Model):
"""Risk assessment for Zero Trust decisions"""
RISK_FACTORS = [
('DEVICE_RISK', 'Device Risk'),
('LOCATION_RISK', 'Location Risk'),
('BEHAVIOR_RISK', 'Behavior Risk'),
('NETWORK_RISK', 'Network Risk'),
('TIME_RISK', 'Time Risk'),
('USER_RISK', 'User Risk'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='risk_assessments')
# Assessment context
assessment_type = models.CharField(max_length=50, default='LOGIN', help_text="Type of assessment (LOGIN, ACCESS, TRANSACTION)")
resource_type = models.CharField(max_length=100, blank=True, help_text="Type of resource being accessed")
resource_id = models.CharField(max_length=255, blank=True, help_text="ID of resource being accessed")
# Risk factors and scores
device_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
location_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
behavior_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
network_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
time_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
user_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
# Overall risk
overall_risk_score = models.IntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
risk_level = models.CharField(
max_length=20,
choices=[
('LOW', 'Low Risk'),
('MEDIUM', 'Medium Risk'),
('HIGH', 'High Risk'),
('CRITICAL', 'Critical Risk'),
],
default='LOW'
)
# Context data
ip_address = models.GenericIPAddressField(null=True, blank=True)
user_agent = models.TextField(blank=True)
location_data = models.JSONField(default=dict, help_text="Geolocation and network data")
device_data = models.JSONField(default=dict, help_text="Device information")
behavior_data = models.JSONField(default=dict, help_text="User behavior patterns")
# Assessment details
risk_factors = models.JSONField(default=list, help_text="List of identified risk factors")
mitigation_actions = models.JSONField(default=list, help_text="Recommended mitigation actions")
assessment_details = models.JSONField(default=dict, help_text="Detailed assessment results")
# Decision
access_decision = models.CharField(
max_length=20,
choices=[
('ALLOW', 'Allow Access'),
('DENY', 'Deny Access'),
('STEP_UP', 'Step-up Authentication'),
('REVIEW', 'Manual Review Required'),
],
default='ALLOW'
)
decision_reason = models.TextField(blank=True)
# Timestamps
assessed_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-assessed_at']
indexes = [
models.Index(fields=['user', 'assessed_at']),
models.Index(fields=['overall_risk_score', 'risk_level']),
models.Index(fields=['access_decision']),
]
def __str__(self):
return f"Risk Assessment for {self.user.username} - {self.risk_level} ({self.overall_risk_score})"
def calculate_overall_risk(self):
"""Calculate overall risk score from individual factors"""
# Weighted risk calculation
weights = {
'device': 0.25,
'location': 0.20,
'behavior': 0.20,
'network': 0.15,
'time': 0.10,
'user': 0.10
}
weighted_score = (
self.device_risk_score * weights['device'] +
self.location_risk_score * weights['location'] +
self.behavior_risk_score * weights['behavior'] +
self.network_risk_score * weights['network'] +
self.time_risk_score * weights['time'] +
self.user_risk_score * weights['user']
)
self.overall_risk_score = int(weighted_score)
# Determine risk level
if self.overall_risk_score <= 25:
self.risk_level = 'LOW'
elif self.overall_risk_score <= 50:
self.risk_level = 'MEDIUM'
elif self.overall_risk_score <= 75:
self.risk_level = 'HIGH'
else:
self.risk_level = 'CRITICAL'
def determine_access_decision(self):
"""Determine access decision based on risk level"""
if self.risk_level == 'LOW':
self.access_decision = 'ALLOW'
self.decision_reason = 'Low risk - access allowed'
elif self.risk_level == 'MEDIUM':
self.access_decision = 'STEP_UP'
self.decision_reason = 'Medium risk - step-up authentication required'
elif self.risk_level == 'HIGH':
self.access_decision = 'REVIEW'
self.decision_reason = 'High risk - manual review required'
else: # CRITICAL
self.access_decision = 'DENY'
self.decision_reason = 'Critical risk - access denied'
class AdaptiveAuthentication(models.Model):
"""Adaptive authentication rules and configurations"""
AUTH_METHODS = [
('PASSWORD', 'Password'),
('MFA_TOTP', 'MFA TOTP'),
('MFA_SMS', 'MFA SMS'),
('MFA_EMAIL', 'MFA Email'),
('BIOMETRIC', 'Biometric'),
('HARDWARE_TOKEN', 'Hardware Token'),
('SSO', 'Single Sign-On'),
('CERTIFICATE', 'Certificate'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
description = models.TextField()
# Risk thresholds
low_risk_threshold = models.IntegerField(default=25, validators=[MinValueValidator(0), MaxValueValidator(100)])
medium_risk_threshold = models.IntegerField(default=50, validators=[MinValueValidator(0), MaxValueValidator(100)])
high_risk_threshold = models.IntegerField(default=75, validators=[MinValueValidator(0), MaxValueValidator(100)])
# Authentication requirements by risk level
low_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for low risk")
medium_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for medium risk")
high_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for high risk")
critical_risk_auth_methods = models.JSONField(default=list, help_text="Auth methods for critical risk")
# Context-based adjustments
device_trust_multiplier = models.FloatField(default=1.0, help_text="Multiplier for device trust")
location_trust_multiplier = models.FloatField(default=1.0, help_text="Multiplier for location trust")
time_trust_multiplier = models.FloatField(default=1.0, help_text="Multiplier for time trust")
# Behavioral analysis
enable_behavioral_analysis = models.BooleanField(default=True)
behavior_learning_period = models.IntegerField(default=30, help_text="Days to learn user behavior")
anomaly_threshold = models.FloatField(default=0.7, help_text="Threshold for behavioral anomalies")
# Machine learning features
ml_enabled = models.BooleanField(default=False)
ml_model_path = models.CharField(max_length=500, blank=True, help_text="Path to ML model file")
ml_confidence_threshold = models.FloatField(default=0.8, help_text="ML confidence threshold")
# Fallback options
fallback_auth_methods = models.JSONField(default=list, help_text="Fallback auth methods")
max_auth_attempts = models.IntegerField(default=3)
lockout_duration = models.IntegerField(default=15, help_text="Lockout duration in minutes")
# Status
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
def __str__(self):
return f"{self.name} (Adaptive Auth)"
def get_required_auth_methods(self, risk_score: int) -> list:
"""Get required authentication methods based on risk score"""
if risk_score <= self.low_risk_threshold:
return self.low_risk_auth_methods
elif risk_score <= self.medium_risk_threshold:
return self.medium_risk_auth_methods
elif risk_score <= self.high_risk_threshold:
return self.high_risk_auth_methods
else:
return self.critical_risk_auth_methods
def calculate_adjusted_risk(self, base_risk: int, context: dict) -> int:
"""Calculate adjusted risk score based on context"""
adjusted_risk = base_risk
# Apply device trust multiplier
if context.get('device_trust_level') == 'HIGH':
adjusted_risk *= self.device_trust_multiplier
elif context.get('device_trust_level') == 'LOW':
adjusted_risk *= (2.0 - self.device_trust_multiplier)
# Apply location trust multiplier
if context.get('location_trust') == 'TRUSTED':
adjusted_risk *= self.location_trust_multiplier
elif context.get('location_trust') == 'UNTRUSTED':
adjusted_risk *= (2.0 - self.location_trust_multiplier)
# Apply time trust multiplier
if context.get('time_trust') == 'NORMAL_HOURS':
adjusted_risk *= self.time_trust_multiplier
elif context.get('time_trust') == 'UNUSUAL_HOURS':
adjusted_risk *= (2.0 - self.time_trust_multiplier)
return min(int(adjusted_risk), 100)
class UserBehaviorProfile(models.Model):
"""User behavior profile for anomaly detection"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='behavior_profile')
# Login patterns
typical_login_times = models.JSONField(default=list, help_text="Typical login times")
typical_login_locations = models.JSONField(default=list, help_text="Typical login locations")
typical_login_devices = models.JSONField(default=list, help_text="Typical login devices")
# Access patterns
typical_access_times = models.JSONField(default=list, help_text="Typical resource access times")
typical_access_patterns = models.JSONField(default=list, help_text="Typical access patterns")
typical_session_duration = models.FloatField(default=0.0, help_text="Typical session duration in hours")
# Network patterns
typical_ip_ranges = models.JSONField(default=list, help_text="Typical IP address ranges")
typical_user_agents = models.JSONField(default=list, help_text="Typical user agents")
# Behavioral metrics
login_frequency = models.FloatField(default=0.0, help_text="Average logins per day")
access_frequency = models.FloatField(default=0.0, help_text="Average resource accesses per day")
anomaly_score = models.FloatField(default=0.0, help_text="Current anomaly score")
# Learning status
is_learning = models.BooleanField(default=True)
learning_start_date = models.DateTimeField(auto_now_add=True)
learning_complete_date = models.DateTimeField(null=True, blank=True)
sample_count = models.IntegerField(default=0, help_text="Number of samples used for learning")
# Timestamps
last_updated = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-last_updated']
indexes = [
models.Index(fields=['user', 'is_learning']),
models.Index(fields=['anomaly_score']),
]
def __str__(self):
return f"Behavior Profile for {self.user.username}"
def calculate_anomaly_score(self, current_behavior: dict) -> float:
"""Calculate anomaly score for current behavior"""
score = 0.0
# Time anomaly
current_time = current_behavior.get('login_time')
if current_time and self.typical_login_times:
time_anomaly = self._calculate_time_anomaly(current_time, self.typical_login_times)
score += time_anomaly * 0.3
# Location anomaly
current_location = current_behavior.get('location')
if current_location and self.typical_login_locations:
location_anomaly = self._calculate_location_anomaly(current_location, self.typical_login_locations)
score += location_anomaly * 0.3
# Device anomaly
current_device = current_behavior.get('device_id')
if current_device and self.typical_login_devices:
device_anomaly = self._calculate_device_anomaly(current_device, self.typical_login_devices)
score += device_anomaly * 0.2
# IP anomaly
current_ip = current_behavior.get('ip_address')
if current_ip and self.typical_ip_ranges:
ip_anomaly = self._calculate_ip_anomaly(current_ip, self.typical_ip_ranges)
score += ip_anomaly * 0.2
self.anomaly_score = min(score, 1.0)
return self.anomaly_score
def _calculate_time_anomaly(self, current_time, typical_times) -> float:
"""Calculate time-based anomaly score"""
# Simplified time anomaly calculation
# In production, this would use more sophisticated time series analysis
return 0.0 # Placeholder
def _calculate_location_anomaly(self, current_location, typical_locations) -> float:
"""Calculate location-based anomaly score"""
# Simplified location anomaly calculation
return 0.0 # Placeholder
def _calculate_device_anomaly(self, current_device, typical_devices) -> float:
"""Calculate device-based anomaly score"""
if current_device in typical_devices:
return 0.0
return 1.0
def _calculate_ip_anomaly(self, current_ip, typical_ranges) -> float:
"""Calculate IP-based anomaly score"""
# Simplified IP anomaly calculation
return 0.0 # Placeholder