Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

223 lines
8.0 KiB
Python

"""
Signals for Compliance & Governance module
Handles integration with other modules
"""
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from django.utils import timezone
from datetime import timedelta
from .models import (
RegulatoryWorkflow,
WorkflowInstance,
EvidenceCollection,
RetentionPolicy,
LegalHold,
)
@receiver(post_save, sender='incident_intelligence.Incident')
def create_compliance_workflows(sender, instance, created, **kwargs):
"""Create compliance workflows when incidents are created"""
if not created:
return
# Check if incident triggers any compliance workflows
applicable_workflows = RegulatoryWorkflow.objects.filter(
status='ACTIVE',
triggers__contains=['incident_created']
)
for workflow in applicable_workflows:
# Check if workflow conditions are met
conditions = workflow.conditions
# Check framework conditions
if 'framework' in conditions:
# This would need to be implemented based on incident classification
# For now, we'll create workflows for all applicable frameworks
pass
# Check severity conditions
if 'severity' in conditions:
if instance.severity not in conditions['severity']:
continue
# Create workflow instance
WorkflowInstance.objects.create(
workflow=workflow,
title=f"{workflow.name} - {instance.title}",
description=f"Compliance workflow triggered by incident: {instance.title}",
related_incident=instance,
status='PENDING',
created_by=instance.reporter
)
@receiver(post_save, sender='incident_intelligence.Incident')
def check_retention_policies(sender, instance, created, **kwargs):
"""Check if incident data should be subject to retention policies"""
if not created:
return
# Get applicable retention policies
policies = RetentionPolicy.objects.filter(
is_active=True,
policy_type='INCIDENT_DATA'
)
for policy in policies:
# Check if policy applies to this incident
if policy.applies_to_data(
data_classification=instance.data_classification.name if instance.data_classification else None,
incident_category=instance.category
):
# This would trigger retention policy application
# Implementation would depend on specific retention logic
pass
@receiver(post_save, sender='security.AuditLog')
def create_evidence_from_audit_log(sender, instance, created, **kwargs):
"""Create evidence collection entries for security-relevant audit logs"""
if not created:
return
# Only create evidence for certain types of audit logs
evidence_types = [
'DATA_ACCESS',
'DATA_MODIFIED',
'DATA_DELETED',
'LOGIN_FAILED',
'ACCOUNT_LOCKED',
]
if instance.action_type in evidence_types:
# Check if there's a related incident
# This would need to be implemented based on your incident correlation logic
related_incident = None
# Only create evidence collection entry if there's a related incident
if related_incident:
EvidenceCollection.objects.create(
title=f"Audit Log Evidence - {instance.action_type}",
description=f"Audit log entry for {instance.action_type} action",
evidence_type='AUDIT_TRAIL',
incident=related_incident,
status='COLLECTED',
collection_method='AUTOMATED',
collection_notes=f"Automatically collected from audit log: {instance.id}",
collected_by=None, # System collected
)
@receiver(pre_delete, sender='incident_intelligence.Incident')
def check_legal_holds_before_deletion(sender, instance, **kwargs):
"""Check for active legal holds before deleting incident data"""
# Check if there are any active legal holds that might affect this incident
active_legal_holds = LegalHold.objects.filter(
status='ACTIVE',
related_incidents=instance
)
if active_legal_holds.exists():
# Prevent deletion if legal hold is active
raise Exception(
f"Cannot delete incident {instance.id} - it is subject to active legal holds: "
f"{', '.join([hold.case_name for hold in active_legal_holds])}"
)
@receiver(pre_delete, sender=EvidenceCollection)
def check_legal_holds_before_evidence_deletion(sender, instance, **kwargs):
"""Check for active legal holds before deleting evidence"""
# Check if there are any active legal holds that might affect this evidence
active_legal_holds = LegalHold.objects.filter(
status='ACTIVE',
related_evidence=instance
)
if active_legal_holds.exists():
# Prevent deletion if legal hold is active
raise Exception(
f"Cannot delete evidence {instance.id} - it is subject to active legal holds: "
f"{', '.join([hold.case_name for hold in active_legal_holds])}"
)
@receiver(post_save, sender=WorkflowInstance)
def send_workflow_notifications(sender, instance, created, **kwargs):
"""Send notifications for workflow events"""
if not created:
return
# Get notification rules from workflow
notification_rules = instance.workflow.notification_rules
for rule in notification_rules:
if rule.get('event') == 'workflow_started':
# Send notification to specified recipients
recipients = rule.get('recipients', [])
# Implementation would depend on your notification system
# This could integrate with email, Slack, or other notification channels
pass
@receiver(post_save, sender=RetentionPolicy)
def apply_retention_policy_to_existing_data(sender, instance, created, **kwargs):
"""Apply new retention policy to existing data"""
if not created:
return
# This would trigger a background task to apply the retention policy
# to existing data that matches the policy criteria
# Implementation would depend on your task queue system (Celery, etc.)
pass
@receiver(post_save, sender=LegalHold)
def notify_legal_hold_stakeholders(sender, instance, created, **kwargs):
"""Notify stakeholders when legal hold is created"""
if not created:
return
# Send notifications to custodians and stakeholders
custodians = instance.custodian_list
# Implementation would depend on your notification system
pass
# Integration with SLA & On-Call module
@receiver(post_save, sender=WorkflowInstance)
def create_sla_tracking(sender, instance, created, **kwargs):
"""Create SLA tracking for compliance workflows"""
if not created:
return
# Check if workflow has SLA requirements
if instance.due_date:
# This would integrate with the SLA & On-Call module
# to create SLA tracking entries for compliance deadlines
pass
# Integration with Automation Orchestration module
@receiver(post_save, sender='automation_orchestration.RunbookExecution')
def create_evidence_from_automation(sender, instance, created, **kwargs):
"""Create evidence collection entries for automation executions"""
if not created:
return
# Create evidence for automation executions that are compliance-related
if hasattr(instance, 'runbook') and instance.runbook:
EvidenceCollection.objects.create(
title=f"Automation Evidence - {instance.runbook.name}",
description=f"Evidence from runbook execution: {instance.runbook.name}",
evidence_type='AUDIT_TRAIL',
incident=instance.related_incident,
status='COLLECTED',
collection_method='AUTOMATED',
collection_notes=f"Automatically collected from runbook execution: {instance.id}",
collected_by=None, # System collected
)