223 lines
8.0 KiB
Python
223 lines
8.0 KiB
Python
"""
|
|
Signals for Compliance & Governance module
|
|
Handles integration with other modules
|
|
"""
|
|
from django.db.models.signals import post_save, pre_delete
|
|
from django.dispatch import receiver
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
|
|
from .models import (
|
|
RegulatoryWorkflow,
|
|
WorkflowInstance,
|
|
EvidenceCollection,
|
|
RetentionPolicy,
|
|
LegalHold,
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender='incident_intelligence.Incident')
|
|
def create_compliance_workflows(sender, instance, created, **kwargs):
|
|
"""Create compliance workflows when incidents are created"""
|
|
if not created:
|
|
return
|
|
|
|
# Check if incident triggers any compliance workflows
|
|
applicable_workflows = RegulatoryWorkflow.objects.filter(
|
|
status='ACTIVE',
|
|
triggers__contains=['incident_created']
|
|
)
|
|
|
|
for workflow in applicable_workflows:
|
|
# Check if workflow conditions are met
|
|
conditions = workflow.conditions
|
|
|
|
# Check framework conditions
|
|
if 'framework' in conditions:
|
|
# This would need to be implemented based on incident classification
|
|
# For now, we'll create workflows for all applicable frameworks
|
|
pass
|
|
|
|
# Check severity conditions
|
|
if 'severity' in conditions:
|
|
if instance.severity not in conditions['severity']:
|
|
continue
|
|
|
|
# Create workflow instance
|
|
WorkflowInstance.objects.create(
|
|
workflow=workflow,
|
|
title=f"{workflow.name} - {instance.title}",
|
|
description=f"Compliance workflow triggered by incident: {instance.title}",
|
|
related_incident=instance,
|
|
status='PENDING',
|
|
created_by=instance.reporter
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender='incident_intelligence.Incident')
|
|
def check_retention_policies(sender, instance, created, **kwargs):
|
|
"""Check if incident data should be subject to retention policies"""
|
|
if not created:
|
|
return
|
|
|
|
# Get applicable retention policies
|
|
policies = RetentionPolicy.objects.filter(
|
|
is_active=True,
|
|
policy_type='INCIDENT_DATA'
|
|
)
|
|
|
|
for policy in policies:
|
|
# Check if policy applies to this incident
|
|
if policy.applies_to_data(
|
|
data_classification=instance.data_classification.name if instance.data_classification else None,
|
|
incident_category=instance.category
|
|
):
|
|
# This would trigger retention policy application
|
|
# Implementation would depend on specific retention logic
|
|
pass
|
|
|
|
|
|
@receiver(post_save, sender='security.AuditLog')
|
|
def create_evidence_from_audit_log(sender, instance, created, **kwargs):
|
|
"""Create evidence collection entries for security-relevant audit logs"""
|
|
if not created:
|
|
return
|
|
|
|
# Only create evidence for certain types of audit logs
|
|
evidence_types = [
|
|
'DATA_ACCESS',
|
|
'DATA_MODIFIED',
|
|
'DATA_DELETED',
|
|
'LOGIN_FAILED',
|
|
'ACCOUNT_LOCKED',
|
|
]
|
|
|
|
if instance.action_type in evidence_types:
|
|
# Check if there's a related incident
|
|
# This would need to be implemented based on your incident correlation logic
|
|
related_incident = None
|
|
|
|
# Only create evidence collection entry if there's a related incident
|
|
if related_incident:
|
|
EvidenceCollection.objects.create(
|
|
title=f"Audit Log Evidence - {instance.action_type}",
|
|
description=f"Audit log entry for {instance.action_type} action",
|
|
evidence_type='AUDIT_TRAIL',
|
|
incident=related_incident,
|
|
status='COLLECTED',
|
|
collection_method='AUTOMATED',
|
|
collection_notes=f"Automatically collected from audit log: {instance.id}",
|
|
collected_by=None, # System collected
|
|
)
|
|
|
|
|
|
@receiver(pre_delete, sender='incident_intelligence.Incident')
|
|
def check_legal_holds_before_deletion(sender, instance, **kwargs):
|
|
"""Check for active legal holds before deleting incident data"""
|
|
# Check if there are any active legal holds that might affect this incident
|
|
active_legal_holds = LegalHold.objects.filter(
|
|
status='ACTIVE',
|
|
related_incidents=instance
|
|
)
|
|
|
|
if active_legal_holds.exists():
|
|
# Prevent deletion if legal hold is active
|
|
raise Exception(
|
|
f"Cannot delete incident {instance.id} - it is subject to active legal holds: "
|
|
f"{', '.join([hold.case_name for hold in active_legal_holds])}"
|
|
)
|
|
|
|
|
|
@receiver(pre_delete, sender=EvidenceCollection)
|
|
def check_legal_holds_before_evidence_deletion(sender, instance, **kwargs):
|
|
"""Check for active legal holds before deleting evidence"""
|
|
# Check if there are any active legal holds that might affect this evidence
|
|
active_legal_holds = LegalHold.objects.filter(
|
|
status='ACTIVE',
|
|
related_evidence=instance
|
|
)
|
|
|
|
if active_legal_holds.exists():
|
|
# Prevent deletion if legal hold is active
|
|
raise Exception(
|
|
f"Cannot delete evidence {instance.id} - it is subject to active legal holds: "
|
|
f"{', '.join([hold.case_name for hold in active_legal_holds])}"
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=WorkflowInstance)
|
|
def send_workflow_notifications(sender, instance, created, **kwargs):
|
|
"""Send notifications for workflow events"""
|
|
if not created:
|
|
return
|
|
|
|
# Get notification rules from workflow
|
|
notification_rules = instance.workflow.notification_rules
|
|
|
|
for rule in notification_rules:
|
|
if rule.get('event') == 'workflow_started':
|
|
# Send notification to specified recipients
|
|
recipients = rule.get('recipients', [])
|
|
# Implementation would depend on your notification system
|
|
# This could integrate with email, Slack, or other notification channels
|
|
pass
|
|
|
|
|
|
@receiver(post_save, sender=RetentionPolicy)
|
|
def apply_retention_policy_to_existing_data(sender, instance, created, **kwargs):
|
|
"""Apply new retention policy to existing data"""
|
|
if not created:
|
|
return
|
|
|
|
# This would trigger a background task to apply the retention policy
|
|
# to existing data that matches the policy criteria
|
|
# Implementation would depend on your task queue system (Celery, etc.)
|
|
pass
|
|
|
|
|
|
@receiver(post_save, sender=LegalHold)
|
|
def notify_legal_hold_stakeholders(sender, instance, created, **kwargs):
|
|
"""Notify stakeholders when legal hold is created"""
|
|
if not created:
|
|
return
|
|
|
|
# Send notifications to custodians and stakeholders
|
|
custodians = instance.custodian_list
|
|
# Implementation would depend on your notification system
|
|
pass
|
|
|
|
|
|
# Integration with SLA & On-Call module
|
|
@receiver(post_save, sender=WorkflowInstance)
|
|
def create_sla_tracking(sender, instance, created, **kwargs):
|
|
"""Create SLA tracking for compliance workflows"""
|
|
if not created:
|
|
return
|
|
|
|
# Check if workflow has SLA requirements
|
|
if instance.due_date:
|
|
# This would integrate with the SLA & On-Call module
|
|
# to create SLA tracking entries for compliance deadlines
|
|
pass
|
|
|
|
|
|
# Integration with Automation Orchestration module
|
|
@receiver(post_save, sender='automation_orchestration.RunbookExecution')
|
|
def create_evidence_from_automation(sender, instance, created, **kwargs):
|
|
"""Create evidence collection entries for automation executions"""
|
|
if not created:
|
|
return
|
|
|
|
# Create evidence for automation executions that are compliance-related
|
|
if hasattr(instance, 'runbook') and instance.runbook:
|
|
EvidenceCollection.objects.create(
|
|
title=f"Automation Evidence - {instance.runbook.name}",
|
|
description=f"Evidence from runbook execution: {instance.runbook.name}",
|
|
evidence_type='AUDIT_TRAIL',
|
|
incident=instance.related_incident,
|
|
status='COLLECTED',
|
|
collection_method='AUTOMATED',
|
|
collection_notes=f"Automatically collected from runbook execution: {instance.id}",
|
|
collected_by=None, # System collected
|
|
)
|