""" Compliance & Governance models for Enterprise Incident Management API Implements regulatory workflows, evidence collection, retention policies, and export APIs """ import uuid import json from datetime import datetime, timedelta from typing import Dict, Any, Optional, List from django.db import models from django.contrib.auth import get_user_model from django.core.validators import MinValueValidator, MaxValueValidator from django.utils import timezone from django.core.exceptions import ValidationError User = get_user_model() class RegulatoryFramework(models.Model): """Regulatory frameworks and standards (GDPR, HIPAA, SOX, ISO27001)""" FRAMEWORK_TYPES = [ ('GDPR', 'General Data Protection Regulation'), ('HIPAA', 'Health Insurance Portability and Accountability Act'), ('SOX', 'Sarbanes-Oxley Act'), ('ISO27001', 'ISO/IEC 27001'), ('PCI_DSS', 'Payment Card Industry Data Security Standard'), ('NIST', 'NIST Cybersecurity Framework'), ('CUSTOM', 'Custom Framework'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) name = models.CharField(max_length=100, unique=True) framework_type = models.CharField(max_length=20, choices=FRAMEWORK_TYPES) version = models.CharField(max_length=20, default='1.0') description = models.TextField() # Framework details applicable_regions = models.JSONField(default=list, help_text="List of applicable regions/countries") industry_sectors = models.JSONField(default=list, help_text="List of applicable industry sectors") compliance_requirements = models.JSONField(default=list, help_text="List of compliance requirements") # Status and dates is_active = models.BooleanField(default=True) effective_date = models.DateField() review_date = models.DateField(null=True, blank=True) # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) class Meta: ordering = ['name', 'version'] indexes = [ models.Index(fields=['framework_type', 'is_active']), models.Index(fields=['effective_date']), ] def __str__(self): return f"{self.name} v{self.version}" class ComplianceRequirement(models.Model): """Individual compliance requirements within frameworks""" REQUIREMENT_TYPES = [ ('TECHNICAL', 'Technical Control'), ('ADMINISTRATIVE', 'Administrative Control'), ('PHYSICAL', 'Physical Control'), ('PROCEDURAL', 'Procedural Control'), ('DOCUMENTATION', 'Documentation Requirement'), ] PRIORITY_LEVELS = [ ('CRITICAL', 'Critical'), ('HIGH', 'High'), ('MEDIUM', 'Medium'), ('LOW', 'Low'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) framework = models.ForeignKey(RegulatoryFramework, on_delete=models.CASCADE, related_name='requirements') # Requirement details requirement_id = models.CharField(max_length=50, help_text="Unique identifier within framework") title = models.CharField(max_length=200) description = models.TextField() requirement_type = models.CharField(max_length=20, choices=REQUIREMENT_TYPES) priority = models.CharField(max_length=10, choices=PRIORITY_LEVELS, default='MEDIUM') # Implementation details implementation_guidance = models.TextField(blank=True, null=True) evidence_requirements = models.JSONField(default=list, help_text="Types of evidence required") testing_procedures = models.TextField(blank=True, null=True) # Compliance tracking is_implemented = models.BooleanField(default=False) implementation_date = models.DateField(null=True, blank=True) last_assessment_date = models.DateField(null=True, blank=True) next_assessment_date = models.DateField(null=True, blank=True) compliance_status = models.CharField( max_length=20, choices=[ ('COMPLIANT', 'Compliant'), ('PARTIALLY_COMPLIANT', 'Partially Compliant'), ('NON_COMPLIANT', 'Non-Compliant'), ('NOT_ASSESSED', 'Not Assessed'), ], default='NOT_ASSESSED' ) # Relationships responsible_team = models.CharField(max_length=100, blank=True, null=True) assigned_to = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['framework', 'requirement_id'] unique_together = ['framework', 'requirement_id'] indexes = [ models.Index(fields=['framework', 'compliance_status']), models.Index(fields=['requirement_type', 'priority']), models.Index(fields=['next_assessment_date']), ] def __str__(self): return f"{self.framework.name} - {self.requirement_id}: {self.title}" class RegulatoryWorkflow(models.Model): """Workflow definitions for regulatory compliance processes""" WORKFLOW_TYPES = [ ('INCIDENT_RESPONSE', 'Incident Response'), ('DATA_BREACH', 'Data Breach Response'), ('AUDIT', 'Audit Process'), ('ASSESSMENT', 'Compliance Assessment'), ('REMEDIATION', 'Remediation Process'), ('DOCUMENTATION', 'Documentation Process'), ] STATUS_CHOICES = [ ('DRAFT', 'Draft'), ('ACTIVE', 'Active'), ('SUSPENDED', 'Suspended'), ('ARCHIVED', 'Archived'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) name = models.CharField(max_length=200) workflow_type = models.CharField(max_length=30, choices=WORKFLOW_TYPES) description = models.TextField() # Framework association applicable_frameworks = models.ManyToManyField(RegulatoryFramework, blank=True) # Workflow definition workflow_definition = models.JSONField(help_text="JSON definition of workflow steps and transitions") triggers = models.JSONField(default=list, help_text="Events that trigger this workflow") conditions = models.JSONField(default=dict, help_text="Conditions for workflow execution") # Status and lifecycle status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT') version = models.CharField(max_length=20, default='1.0') is_template = models.BooleanField(default=False) # Notifications and escalations notification_rules = models.JSONField(default=list, help_text="Notification rules for workflow events") escalation_rules = models.JSONField(default=list, help_text="Escalation rules for workflow delays") # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) class Meta: ordering = ['name', 'version'] indexes = [ models.Index(fields=['workflow_type', 'status']), models.Index(fields=['is_template']), ] def __str__(self): return f"{self.name} v{self.version}" class WorkflowInstance(models.Model): """Instance of a regulatory workflow execution""" STATUS_CHOICES = [ ('PENDING', 'Pending'), ('IN_PROGRESS', 'In Progress'), ('COMPLETED', 'Completed'), ('SUSPENDED', 'Suspended'), ('CANCELLED', 'Cancelled'), ('FAILED', 'Failed'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) workflow = models.ForeignKey(RegulatoryWorkflow, on_delete=models.CASCADE, related_name='instances') # Instance details title = models.CharField(max_length=200) description = models.TextField(blank=True, null=True) status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING') # Related objects related_incident = models.ForeignKey( 'incident_intelligence.Incident', on_delete=models.SET_NULL, null=True, blank=True, related_name='regulatory_workflows' ) related_requirement = models.ForeignKey( ComplianceRequirement, on_delete=models.SET_NULL, null=True, blank=True, related_name='workflow_instances' ) # Execution tracking current_step = models.CharField(max_length=100, blank=True, null=True) execution_data = models.JSONField(default=dict, help_text="Runtime data for workflow execution") completed_steps = models.JSONField(default=list, help_text="List of completed workflow steps") # Assignments and responsibilities assigned_to = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) stakeholders = models.ManyToManyField(User, blank=True, related_name='workflow_stakeholders') # Timestamps started_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True, blank=True) due_date = models.DateTimeField(null=True, blank=True) # Metadata created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='created_workflows') updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-started_at'] indexes = [ models.Index(fields=['workflow', 'status']), models.Index(fields=['status', 'due_date']), models.Index(fields=['assigned_to', 'status']), ] def __str__(self): return f"{self.workflow.name} - {self.title} ({self.status})" class EvidenceCollection(models.Model): """Evidence collection and linking to incidents for compliance""" EVIDENCE_TYPES = [ ('LOG_FILE', 'Log File'), ('SCREENSHOT', 'Screenshot'), ('DOCUMENT', 'Document'), ('EMAIL', 'Email'), ('DATABASE_RECORD', 'Database Record'), ('CONFIGURATION', 'Configuration File'), ('AUDIT_TRAIL', 'Audit Trail'), ('TESTIMONY', 'Testimony/Statement'), ('PHYSICAL_EVIDENCE', 'Physical Evidence'), ('DIGITAL_FORENSICS', 'Digital Forensics'), ] STATUS_CHOICES = [ ('COLLECTED', 'Collected'), ('VERIFIED', 'Verified'), ('ANALYZED', 'Analyzed'), ('ARCHIVED', 'Archived'), ('DESTROYED', 'Destroyed'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) # Evidence details title = models.CharField(max_length=200) description = models.TextField() evidence_type = models.CharField(max_length=30, choices=EVIDENCE_TYPES) status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='COLLECTED') # Related objects incident = models.ForeignKey( 'incident_intelligence.Incident', on_delete=models.CASCADE, related_name='evidence_collection' ) workflow_instance = models.ForeignKey( WorkflowInstance, on_delete=models.SET_NULL, null=True, blank=True, related_name='evidence_collection' ) compliance_requirement = models.ForeignKey( ComplianceRequirement, on_delete=models.SET_NULL, null=True, blank=True, related_name='evidence_collection' ) # Evidence metadata file_path = models.CharField(max_length=500, blank=True, null=True) file_hash = models.CharField(max_length=64, blank=True, null=True, help_text="SHA-256 hash for integrity") file_size = models.BigIntegerField(null=True, blank=True) mime_type = models.CharField(max_length=100, blank=True, null=True) # Collection details collection_method = models.CharField(max_length=100, blank=True, null=True) collection_timestamp = models.DateTimeField(auto_now_add=True) collection_location = models.CharField(max_length=200, blank=True, null=True) collection_notes = models.TextField(blank=True, null=True) # Chain of custody collected_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='collected_evidence') verified_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='verified_evidence') custody_chain = models.JSONField(default=list, help_text="Chain of custody records") # Retention and disposal retention_period = models.DurationField(null=True, blank=True) disposal_date = models.DateTimeField(null=True, blank=True) disposal_method = models.CharField(max_length=100, blank=True, null=True) # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-collection_timestamp'] indexes = [ models.Index(fields=['incident', 'evidence_type']), models.Index(fields=['status', 'collection_timestamp']), models.Index(fields=['compliance_requirement']), ] def __str__(self): return f"{self.title} ({self.evidence_type})" def add_custody_record(self, user: User, action: str, notes: str = None): """Add a custody record to the chain of custody""" custody_record = { 'timestamp': timezone.now().isoformat(), 'user_id': str(user.id), 'user_name': user.get_full_name() or user.username, 'action': action, 'notes': notes or '' } self.custody_chain.append(custody_record) self.save() class RetentionPolicy(models.Model): """Data retention policies for compliance and governance""" POLICY_TYPES = [ ('INCIDENT_DATA', 'Incident Data'), ('AUDIT_LOGS', 'Audit Logs'), ('EVIDENCE', 'Evidence'), ('USER_DATA', 'User Data'), ('SYSTEM_LOGS', 'System Logs'), ('BACKUP_DATA', 'Backup Data'), ('DOCUMENTATION', 'Documentation'), ] RETENTION_UNITS = [ ('DAYS', 'Days'), ('WEEKS', 'Weeks'), ('MONTHS', 'Months'), ('YEARS', 'Years'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) name = models.CharField(max_length=200, unique=True) description = models.TextField() policy_type = models.CharField(max_length=30, choices=POLICY_TYPES) # Framework association applicable_frameworks = models.ManyToManyField(RegulatoryFramework, blank=True) # Retention rules retention_period = models.PositiveIntegerField(help_text="Retention period value") retention_unit = models.CharField(max_length=10, choices=RETENTION_UNITS, default='YEARS') auto_archive = models.BooleanField(default=True, help_text="Automatically archive after retention period") auto_delete = models.BooleanField(default=False, help_text="Automatically delete after retention period") # Conditions and filters data_classification_levels = models.JSONField( default=list, help_text="Data classification levels this policy applies to" ) incident_categories = models.JSONField( default=list, help_text="Incident categories this policy applies to" ) custom_filters = models.JSONField( default=dict, help_text="Custom filters for policy application" ) # Legal holds and exceptions legal_hold_override = models.BooleanField( default=True, help_text="Whether legal holds can override this policy" ) exception_conditions = models.JSONField( default=list, help_text="Conditions that create exceptions to this policy" ) # Status and lifecycle is_active = models.BooleanField(default=True) effective_date = models.DateField() review_date = models.DateField(null=True, blank=True) # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) class Meta: ordering = ['name'] indexes = [ models.Index(fields=['policy_type', 'is_active']), models.Index(fields=['effective_date']), ] def __str__(self): return f"{self.name} ({self.retention_period} {self.retention_unit})" def get_retention_duration(self): """Get retention duration as timedelta""" unit_multipliers = { 'DAYS': 1, 'WEEKS': 7, 'MONTHS': 30, 'YEARS': 365 } days = self.retention_period * unit_multipliers.get(self.retention_unit, 365) return timedelta(days=days) def applies_to_data(self, data_classification: str = None, incident_category: str = None) -> bool: """Check if this policy applies to given data""" if not self.is_active: return False # Check data classification if data_classification and self.data_classification_levels: if data_classification not in self.data_classification_levels: return False # Check incident category if incident_category and self.incident_categories: if incident_category not in self.incident_categories: return False return True class ExportRequest(models.Model): """Export requests for regulators and auditors""" REQUEST_TYPES = [ ('REGULATORY', 'Regulatory Request'), ('AUDIT', 'Audit Request'), ('LEGAL', 'Legal Request'), ('INTERNAL', 'Internal Request'), ] STATUS_CHOICES = [ ('PENDING', 'Pending'), ('APPROVED', 'Approved'), ('IN_PROGRESS', 'In Progress'), ('COMPLETED', 'Completed'), ('REJECTED', 'Rejected'), ('CANCELLED', 'Cancelled'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) # Request details title = models.CharField(max_length=200) description = models.TextField() request_type = models.CharField(max_length=20, choices=REQUEST_TYPES) status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING') # Requester information requester_name = models.CharField(max_length=200) requester_organization = models.CharField(max_length=200, blank=True, null=True) requester_email = models.EmailField() requester_phone = models.CharField(max_length=20, blank=True, null=True) legal_authority = models.CharField(max_length=200, blank=True, null=True) # Request parameters data_scope = models.JSONField(help_text="Scope of data to be exported") date_range_start = models.DateTimeField(null=True, blank=True) date_range_end = models.DateTimeField(null=True, blank=True) data_classification_levels = models.JSONField(default=list) incident_categories = models.JSONField(default=list) # Framework association applicable_frameworks = models.ManyToManyField(RegulatoryFramework, blank=True) # Export details export_format = models.CharField(max_length=20, default='JSON', help_text="Export format (JSON, CSV, XML, PDF)") include_evidence = models.BooleanField(default=True) include_audit_trails = models.BooleanField(default=True) redaction_required = models.BooleanField(default=False) # Approval workflow approval_required = models.BooleanField(default=True) approved_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='approved_exports') approved_at = models.DateTimeField(null=True, blank=True) approval_notes = models.TextField(blank=True, null=True) # Execution details executed_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='executed_exports') export_file_path = models.CharField(max_length=500, blank=True, null=True) export_file_hash = models.CharField(max_length=64, blank=True, null=True) export_file_size = models.BigIntegerField(null=True, blank=True) # Timestamps requested_at = models.DateTimeField(auto_now_add=True) due_date = models.DateTimeField(null=True, blank=True) completed_at = models.DateTimeField(null=True, blank=True) # Metadata created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='created_exports') updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-requested_at'] indexes = [ models.Index(fields=['request_type', 'status']), models.Index(fields=['status', 'due_date']), models.Index(fields=['requester_email']), ] def __str__(self): return f"{self.title} - {self.requester_organization or 'Internal'}" def is_overdue(self) -> bool: """Check if the export request is overdue""" if not self.due_date: return False return timezone.now() > self.due_date and self.status not in ['COMPLETED', 'CANCELLED'] class ComplianceReport(models.Model): """Compliance reports and assessments""" REPORT_TYPES = [ ('ASSESSMENT', 'Compliance Assessment'), ('AUDIT', 'Audit Report'), ('REMEDIATION', 'Remediation Report'), ('STATUS', 'Status Report'), ('EXCEPTION', 'Exception Report'), ] STATUS_CHOICES = [ ('DRAFT', 'Draft'), ('REVIEW', 'Under Review'), ('APPROVED', 'Approved'), ('PUBLISHED', 'Published'), ('ARCHIVED', 'Archived'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) # Report details title = models.CharField(max_length=200) report_type = models.CharField(max_length=20, choices=REPORT_TYPES) description = models.TextField() status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT') # Framework association framework = models.ForeignKey(RegulatoryFramework, on_delete=models.CASCADE, related_name='reports') applicable_requirements = models.ManyToManyField(ComplianceRequirement, blank=True) # Report content executive_summary = models.TextField(blank=True, null=True) findings = models.JSONField(default=list, help_text="List of findings and observations") recommendations = models.JSONField(default=list, help_text="List of recommendations") action_items = models.JSONField(default=list, help_text="List of action items") # Assessment details overall_compliance_score = models.FloatField( null=True, blank=True, validators=[MinValueValidator(0.0), MaxValueValidator(100.0)], help_text="Overall compliance score (0-100)" ) compliance_gaps = models.JSONField(default=list, help_text="List of compliance gaps") risk_assessment = models.JSONField(default=dict, help_text="Risk assessment details") # Report metadata report_period_start = models.DateField() report_period_end = models.DateField() report_file_path = models.CharField(max_length=500, blank=True, null=True) # Approval and review prepared_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='prepared_reports') reviewed_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='reviewed_reports') approved_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='approved_reports') # Timestamps created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) published_at = models.DateTimeField(null=True, blank=True) class Meta: ordering = ['-created_at'] indexes = [ models.Index(fields=['framework', 'report_type']), models.Index(fields=['status', 'report_period_end']), ] def __str__(self): return f"{self.title} - {self.framework.name}" class LegalHold(models.Model): """Legal holds to prevent data deletion during litigation or investigations""" STATUS_CHOICES = [ ('ACTIVE', 'Active'), ('SUSPENDED', 'Suspended'), ('RELEASED', 'Released'), ('EXPIRED', 'Expired'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) # Hold details case_name = models.CharField(max_length=200) case_number = models.CharField(max_length=100, blank=True, null=True) description = models.TextField() status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='ACTIVE') # Legal information legal_counsel = models.CharField(max_length=200, blank=True, null=True) law_firm = models.CharField(max_length=200, blank=True, null=True) court_jurisdiction = models.CharField(max_length=200, blank=True, null=True) # Scope and criteria data_scope = models.JSONField(help_text="Scope of data covered by this hold") custodian_list = models.JSONField(default=list, help_text="List of data custodians") search_criteria = models.JSONField(default=dict, help_text="Search criteria for data collection") # Dates hold_date = models.DateField(help_text="Date the legal hold was issued") release_date = models.DateField(null=True, blank=True) expiration_date = models.DateField(null=True, blank=True) # Related objects related_incidents = models.ManyToManyField( 'incident_intelligence.Incident', blank=True, related_name='legal_holds' ) related_evidence = models.ManyToManyField( EvidenceCollection, blank=True, related_name='legal_holds' ) # Notifications notification_sent = models.BooleanField(default=False) notification_date = models.DateTimeField(null=True, blank=True) reminder_sent = models.BooleanField(default=False) reminder_date = models.DateTimeField(null=True, blank=True) # Metadata created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: ordering = ['-hold_date'] indexes = [ models.Index(fields=['status', 'hold_date']), models.Index(fields=['expiration_date']), ] def __str__(self): return f"{self.case_name} - {self.case_number or 'No Case Number'}" def is_active(self) -> bool: """Check if the legal hold is currently active""" if self.status != 'ACTIVE': return False if self.expiration_date and timezone.now().date() > self.expiration_date: return False return True def get_affected_data_count(self) -> int: """Get count of data items affected by this legal hold""" incident_count = self.related_incidents.count() evidence_count = self.related_evidence.count() return incident_count + evidence_count