Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

708 lines
27 KiB
Python

"""
Compliance & Governance models for Enterprise Incident Management API
Implements regulatory workflows, evidence collection, retention policies, and export APIs
"""
import uuid
import json
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.utils import timezone
from django.core.exceptions import ValidationError
User = get_user_model()
class RegulatoryFramework(models.Model):
"""Regulatory frameworks and standards (GDPR, HIPAA, SOX, ISO27001)"""
FRAMEWORK_TYPES = [
('GDPR', 'General Data Protection Regulation'),
('HIPAA', 'Health Insurance Portability and Accountability Act'),
('SOX', 'Sarbanes-Oxley Act'),
('ISO27001', 'ISO/IEC 27001'),
('PCI_DSS', 'Payment Card Industry Data Security Standard'),
('NIST', 'NIST Cybersecurity Framework'),
('CUSTOM', 'Custom Framework'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
framework_type = models.CharField(max_length=20, choices=FRAMEWORK_TYPES)
version = models.CharField(max_length=20, default='1.0')
description = models.TextField()
# Framework details
applicable_regions = models.JSONField(default=list, help_text="List of applicable regions/countries")
industry_sectors = models.JSONField(default=list, help_text="List of applicable industry sectors")
compliance_requirements = models.JSONField(default=list, help_text="List of compliance requirements")
# Status and dates
is_active = models.BooleanField(default=True)
effective_date = models.DateField()
review_date = models.DateField(null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
class Meta:
ordering = ['name', 'version']
indexes = [
models.Index(fields=['framework_type', 'is_active']),
models.Index(fields=['effective_date']),
]
def __str__(self):
return f"{self.name} v{self.version}"
class ComplianceRequirement(models.Model):
"""Individual compliance requirements within frameworks"""
REQUIREMENT_TYPES = [
('TECHNICAL', 'Technical Control'),
('ADMINISTRATIVE', 'Administrative Control'),
('PHYSICAL', 'Physical Control'),
('PROCEDURAL', 'Procedural Control'),
('DOCUMENTATION', 'Documentation Requirement'),
]
PRIORITY_LEVELS = [
('CRITICAL', 'Critical'),
('HIGH', 'High'),
('MEDIUM', 'Medium'),
('LOW', 'Low'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
framework = models.ForeignKey(RegulatoryFramework, on_delete=models.CASCADE, related_name='requirements')
# Requirement details
requirement_id = models.CharField(max_length=50, help_text="Unique identifier within framework")
title = models.CharField(max_length=200)
description = models.TextField()
requirement_type = models.CharField(max_length=20, choices=REQUIREMENT_TYPES)
priority = models.CharField(max_length=10, choices=PRIORITY_LEVELS, default='MEDIUM')
# Implementation details
implementation_guidance = models.TextField(blank=True, null=True)
evidence_requirements = models.JSONField(default=list, help_text="Types of evidence required")
testing_procedures = models.TextField(blank=True, null=True)
# Compliance tracking
is_implemented = models.BooleanField(default=False)
implementation_date = models.DateField(null=True, blank=True)
last_assessment_date = models.DateField(null=True, blank=True)
next_assessment_date = models.DateField(null=True, blank=True)
compliance_status = models.CharField(
max_length=20,
choices=[
('COMPLIANT', 'Compliant'),
('PARTIALLY_COMPLIANT', 'Partially Compliant'),
('NON_COMPLIANT', 'Non-Compliant'),
('NOT_ASSESSED', 'Not Assessed'),
],
default='NOT_ASSESSED'
)
# Relationships
responsible_team = models.CharField(max_length=100, blank=True, null=True)
assigned_to = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['framework', 'requirement_id']
unique_together = ['framework', 'requirement_id']
indexes = [
models.Index(fields=['framework', 'compliance_status']),
models.Index(fields=['requirement_type', 'priority']),
models.Index(fields=['next_assessment_date']),
]
def __str__(self):
return f"{self.framework.name} - {self.requirement_id}: {self.title}"
class RegulatoryWorkflow(models.Model):
"""Workflow definitions for regulatory compliance processes"""
WORKFLOW_TYPES = [
('INCIDENT_RESPONSE', 'Incident Response'),
('DATA_BREACH', 'Data Breach Response'),
('AUDIT', 'Audit Process'),
('ASSESSMENT', 'Compliance Assessment'),
('REMEDIATION', 'Remediation Process'),
('DOCUMENTATION', 'Documentation Process'),
]
STATUS_CHOICES = [
('DRAFT', 'Draft'),
('ACTIVE', 'Active'),
('SUSPENDED', 'Suspended'),
('ARCHIVED', 'Archived'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
workflow_type = models.CharField(max_length=30, choices=WORKFLOW_TYPES)
description = models.TextField()
# Framework association
applicable_frameworks = models.ManyToManyField(RegulatoryFramework, blank=True)
# Workflow definition
workflow_definition = models.JSONField(help_text="JSON definition of workflow steps and transitions")
triggers = models.JSONField(default=list, help_text="Events that trigger this workflow")
conditions = models.JSONField(default=dict, help_text="Conditions for workflow execution")
# Status and lifecycle
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT')
version = models.CharField(max_length=20, default='1.0')
is_template = models.BooleanField(default=False)
# Notifications and escalations
notification_rules = models.JSONField(default=list, help_text="Notification rules for workflow events")
escalation_rules = models.JSONField(default=list, help_text="Escalation rules for workflow delays")
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
class Meta:
ordering = ['name', 'version']
indexes = [
models.Index(fields=['workflow_type', 'status']),
models.Index(fields=['is_template']),
]
def __str__(self):
return f"{self.name} v{self.version}"
class WorkflowInstance(models.Model):
"""Instance of a regulatory workflow execution"""
STATUS_CHOICES = [
('PENDING', 'Pending'),
('IN_PROGRESS', 'In Progress'),
('COMPLETED', 'Completed'),
('SUSPENDED', 'Suspended'),
('CANCELLED', 'Cancelled'),
('FAILED', 'Failed'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
workflow = models.ForeignKey(RegulatoryWorkflow, on_delete=models.CASCADE, related_name='instances')
# Instance details
title = models.CharField(max_length=200)
description = models.TextField(blank=True, null=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
# Related objects
related_incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='regulatory_workflows'
)
related_requirement = models.ForeignKey(
ComplianceRequirement,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='workflow_instances'
)
# Execution tracking
current_step = models.CharField(max_length=100, blank=True, null=True)
execution_data = models.JSONField(default=dict, help_text="Runtime data for workflow execution")
completed_steps = models.JSONField(default=list, help_text="List of completed workflow steps")
# Assignments and responsibilities
assigned_to = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
stakeholders = models.ManyToManyField(User, blank=True, related_name='workflow_stakeholders')
# Timestamps
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
due_date = models.DateTimeField(null=True, blank=True)
# Metadata
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='created_workflows')
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-started_at']
indexes = [
models.Index(fields=['workflow', 'status']),
models.Index(fields=['status', 'due_date']),
models.Index(fields=['assigned_to', 'status']),
]
def __str__(self):
return f"{self.workflow.name} - {self.title} ({self.status})"
class EvidenceCollection(models.Model):
"""Evidence collection and linking to incidents for compliance"""
EVIDENCE_TYPES = [
('LOG_FILE', 'Log File'),
('SCREENSHOT', 'Screenshot'),
('DOCUMENT', 'Document'),
('EMAIL', 'Email'),
('DATABASE_RECORD', 'Database Record'),
('CONFIGURATION', 'Configuration File'),
('AUDIT_TRAIL', 'Audit Trail'),
('TESTIMONY', 'Testimony/Statement'),
('PHYSICAL_EVIDENCE', 'Physical Evidence'),
('DIGITAL_FORENSICS', 'Digital Forensics'),
]
STATUS_CHOICES = [
('COLLECTED', 'Collected'),
('VERIFIED', 'Verified'),
('ANALYZED', 'Analyzed'),
('ARCHIVED', 'Archived'),
('DESTROYED', 'Destroyed'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# Evidence details
title = models.CharField(max_length=200)
description = models.TextField()
evidence_type = models.CharField(max_length=30, choices=EVIDENCE_TYPES)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='COLLECTED')
# Related objects
incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.CASCADE,
related_name='evidence_collection'
)
workflow_instance = models.ForeignKey(
WorkflowInstance,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='evidence_collection'
)
compliance_requirement = models.ForeignKey(
ComplianceRequirement,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='evidence_collection'
)
# Evidence metadata
file_path = models.CharField(max_length=500, blank=True, null=True)
file_hash = models.CharField(max_length=64, blank=True, null=True, help_text="SHA-256 hash for integrity")
file_size = models.BigIntegerField(null=True, blank=True)
mime_type = models.CharField(max_length=100, blank=True, null=True)
# Collection details
collection_method = models.CharField(max_length=100, blank=True, null=True)
collection_timestamp = models.DateTimeField(auto_now_add=True)
collection_location = models.CharField(max_length=200, blank=True, null=True)
collection_notes = models.TextField(blank=True, null=True)
# Chain of custody
collected_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='collected_evidence')
verified_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='verified_evidence')
custody_chain = models.JSONField(default=list, help_text="Chain of custody records")
# Retention and disposal
retention_period = models.DurationField(null=True, blank=True)
disposal_date = models.DateTimeField(null=True, blank=True)
disposal_method = models.CharField(max_length=100, blank=True, null=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-collection_timestamp']
indexes = [
models.Index(fields=['incident', 'evidence_type']),
models.Index(fields=['status', 'collection_timestamp']),
models.Index(fields=['compliance_requirement']),
]
def __str__(self):
return f"{self.title} ({self.evidence_type})"
def add_custody_record(self, user: User, action: str, notes: str = None):
"""Add a custody record to the chain of custody"""
custody_record = {
'timestamp': timezone.now().isoformat(),
'user_id': str(user.id),
'user_name': user.get_full_name() or user.username,
'action': action,
'notes': notes or ''
}
self.custody_chain.append(custody_record)
self.save()
class RetentionPolicy(models.Model):
"""Data retention policies for compliance and governance"""
POLICY_TYPES = [
('INCIDENT_DATA', 'Incident Data'),
('AUDIT_LOGS', 'Audit Logs'),
('EVIDENCE', 'Evidence'),
('USER_DATA', 'User Data'),
('SYSTEM_LOGS', 'System Logs'),
('BACKUP_DATA', 'Backup Data'),
('DOCUMENTATION', 'Documentation'),
]
RETENTION_UNITS = [
('DAYS', 'Days'),
('WEEKS', 'Weeks'),
('MONTHS', 'Months'),
('YEARS', 'Years'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, unique=True)
description = models.TextField()
policy_type = models.CharField(max_length=30, choices=POLICY_TYPES)
# Framework association
applicable_frameworks = models.ManyToManyField(RegulatoryFramework, blank=True)
# Retention rules
retention_period = models.PositiveIntegerField(help_text="Retention period value")
retention_unit = models.CharField(max_length=10, choices=RETENTION_UNITS, default='YEARS')
auto_archive = models.BooleanField(default=True, help_text="Automatically archive after retention period")
auto_delete = models.BooleanField(default=False, help_text="Automatically delete after retention period")
# Conditions and filters
data_classification_levels = models.JSONField(
default=list,
help_text="Data classification levels this policy applies to"
)
incident_categories = models.JSONField(
default=list,
help_text="Incident categories this policy applies to"
)
custom_filters = models.JSONField(
default=dict,
help_text="Custom filters for policy application"
)
# Legal holds and exceptions
legal_hold_override = models.BooleanField(
default=True,
help_text="Whether legal holds can override this policy"
)
exception_conditions = models.JSONField(
default=list,
help_text="Conditions that create exceptions to this policy"
)
# Status and lifecycle
is_active = models.BooleanField(default=True)
effective_date = models.DateField()
review_date = models.DateField(null=True, blank=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['policy_type', 'is_active']),
models.Index(fields=['effective_date']),
]
def __str__(self):
return f"{self.name} ({self.retention_period} {self.retention_unit})"
def get_retention_duration(self):
"""Get retention duration as timedelta"""
unit_multipliers = {
'DAYS': 1,
'WEEKS': 7,
'MONTHS': 30,
'YEARS': 365
}
days = self.retention_period * unit_multipliers.get(self.retention_unit, 365)
return timedelta(days=days)
def applies_to_data(self, data_classification: str = None, incident_category: str = None) -> bool:
"""Check if this policy applies to given data"""
if not self.is_active:
return False
# Check data classification
if data_classification and self.data_classification_levels:
if data_classification not in self.data_classification_levels:
return False
# Check incident category
if incident_category and self.incident_categories:
if incident_category not in self.incident_categories:
return False
return True
class ExportRequest(models.Model):
"""Export requests for regulators and auditors"""
REQUEST_TYPES = [
('REGULATORY', 'Regulatory Request'),
('AUDIT', 'Audit Request'),
('LEGAL', 'Legal Request'),
('INTERNAL', 'Internal Request'),
]
STATUS_CHOICES = [
('PENDING', 'Pending'),
('APPROVED', 'Approved'),
('IN_PROGRESS', 'In Progress'),
('COMPLETED', 'Completed'),
('REJECTED', 'Rejected'),
('CANCELLED', 'Cancelled'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# Request details
title = models.CharField(max_length=200)
description = models.TextField()
request_type = models.CharField(max_length=20, choices=REQUEST_TYPES)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
# Requester information
requester_name = models.CharField(max_length=200)
requester_organization = models.CharField(max_length=200, blank=True, null=True)
requester_email = models.EmailField()
requester_phone = models.CharField(max_length=20, blank=True, null=True)
legal_authority = models.CharField(max_length=200, blank=True, null=True)
# Request parameters
data_scope = models.JSONField(help_text="Scope of data to be exported")
date_range_start = models.DateTimeField(null=True, blank=True)
date_range_end = models.DateTimeField(null=True, blank=True)
data_classification_levels = models.JSONField(default=list)
incident_categories = models.JSONField(default=list)
# Framework association
applicable_frameworks = models.ManyToManyField(RegulatoryFramework, blank=True)
# Export details
export_format = models.CharField(max_length=20, default='JSON', help_text="Export format (JSON, CSV, XML, PDF)")
include_evidence = models.BooleanField(default=True)
include_audit_trails = models.BooleanField(default=True)
redaction_required = models.BooleanField(default=False)
# Approval workflow
approval_required = models.BooleanField(default=True)
approved_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='approved_exports')
approved_at = models.DateTimeField(null=True, blank=True)
approval_notes = models.TextField(blank=True, null=True)
# Execution details
executed_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='executed_exports')
export_file_path = models.CharField(max_length=500, blank=True, null=True)
export_file_hash = models.CharField(max_length=64, blank=True, null=True)
export_file_size = models.BigIntegerField(null=True, blank=True)
# Timestamps
requested_at = models.DateTimeField(auto_now_add=True)
due_date = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
# Metadata
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='created_exports')
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-requested_at']
indexes = [
models.Index(fields=['request_type', 'status']),
models.Index(fields=['status', 'due_date']),
models.Index(fields=['requester_email']),
]
def __str__(self):
return f"{self.title} - {self.requester_organization or 'Internal'}"
def is_overdue(self) -> bool:
"""Check if the export request is overdue"""
if not self.due_date:
return False
return timezone.now() > self.due_date and self.status not in ['COMPLETED', 'CANCELLED']
class ComplianceReport(models.Model):
"""Compliance reports and assessments"""
REPORT_TYPES = [
('ASSESSMENT', 'Compliance Assessment'),
('AUDIT', 'Audit Report'),
('REMEDIATION', 'Remediation Report'),
('STATUS', 'Status Report'),
('EXCEPTION', 'Exception Report'),
]
STATUS_CHOICES = [
('DRAFT', 'Draft'),
('REVIEW', 'Under Review'),
('APPROVED', 'Approved'),
('PUBLISHED', 'Published'),
('ARCHIVED', 'Archived'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# Report details
title = models.CharField(max_length=200)
report_type = models.CharField(max_length=20, choices=REPORT_TYPES)
description = models.TextField()
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT')
# Framework association
framework = models.ForeignKey(RegulatoryFramework, on_delete=models.CASCADE, related_name='reports')
applicable_requirements = models.ManyToManyField(ComplianceRequirement, blank=True)
# Report content
executive_summary = models.TextField(blank=True, null=True)
findings = models.JSONField(default=list, help_text="List of findings and observations")
recommendations = models.JSONField(default=list, help_text="List of recommendations")
action_items = models.JSONField(default=list, help_text="List of action items")
# Assessment details
overall_compliance_score = models.FloatField(
null=True, blank=True,
validators=[MinValueValidator(0.0), MaxValueValidator(100.0)],
help_text="Overall compliance score (0-100)"
)
compliance_gaps = models.JSONField(default=list, help_text="List of compliance gaps")
risk_assessment = models.JSONField(default=dict, help_text="Risk assessment details")
# Report metadata
report_period_start = models.DateField()
report_period_end = models.DateField()
report_file_path = models.CharField(max_length=500, blank=True, null=True)
# Approval and review
prepared_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='prepared_reports')
reviewed_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='reviewed_reports')
approved_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='approved_reports')
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
published_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['framework', 'report_type']),
models.Index(fields=['status', 'report_period_end']),
]
def __str__(self):
return f"{self.title} - {self.framework.name}"
class LegalHold(models.Model):
"""Legal holds to prevent data deletion during litigation or investigations"""
STATUS_CHOICES = [
('ACTIVE', 'Active'),
('SUSPENDED', 'Suspended'),
('RELEASED', 'Released'),
('EXPIRED', 'Expired'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# Hold details
case_name = models.CharField(max_length=200)
case_number = models.CharField(max_length=100, blank=True, null=True)
description = models.TextField()
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='ACTIVE')
# Legal information
legal_counsel = models.CharField(max_length=200, blank=True, null=True)
law_firm = models.CharField(max_length=200, blank=True, null=True)
court_jurisdiction = models.CharField(max_length=200, blank=True, null=True)
# Scope and criteria
data_scope = models.JSONField(help_text="Scope of data covered by this hold")
custodian_list = models.JSONField(default=list, help_text="List of data custodians")
search_criteria = models.JSONField(default=dict, help_text="Search criteria for data collection")
# Dates
hold_date = models.DateField(help_text="Date the legal hold was issued")
release_date = models.DateField(null=True, blank=True)
expiration_date = models.DateField(null=True, blank=True)
# Related objects
related_incidents = models.ManyToManyField(
'incident_intelligence.Incident',
blank=True,
related_name='legal_holds'
)
related_evidence = models.ManyToManyField(
EvidenceCollection,
blank=True,
related_name='legal_holds'
)
# Notifications
notification_sent = models.BooleanField(default=False)
notification_date = models.DateTimeField(null=True, blank=True)
reminder_sent = models.BooleanField(default=False)
reminder_date = models.DateTimeField(null=True, blank=True)
# Metadata
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-hold_date']
indexes = [
models.Index(fields=['status', 'hold_date']),
models.Index(fields=['expiration_date']),
]
def __str__(self):
return f"{self.case_name} - {self.case_number or 'No Case Number'}"
def is_active(self) -> bool:
"""Check if the legal hold is currently active"""
if self.status != 'ACTIVE':
return False
if self.expiration_date and timezone.now().date() > self.expiration_date:
return False
return True
def get_affected_data_count(self) -> int:
"""Get count of data items affected by this legal hold"""
incident_count = self.related_incidents.count()
evidence_count = self.related_evidence.count()
return incident_count + evidence_count