""" Signals for Collaboration & War Rooms module Handles automatic war room creation, timeline events, and integration with other modules """ from django.db.models.signals import post_save, pre_save, post_delete from django.dispatch import receiver from django.utils import timezone from .models import WarRoom, TimelineEvent, IncidentCommandRole, WarRoomMessage, MessageReaction, ChatCommand from incident_intelligence.models import Incident from automation_orchestration.models import RunbookExecution, AutoRemediationExecution from sla_oncall.models import SLAInstance, EscalationInstance @receiver(post_save, sender=Incident) def create_war_room_for_incident(sender, instance, created, **kwargs): """Automatically create a war room when a new incident is created""" if created: # Create war room for the incident war_room = WarRoom.objects.create( name=f"Incident {instance.id} - {instance.title[:50]}", description=f"War room for incident: {instance.title}", incident=instance, created_by=instance.reporter, privacy_level='PRIVATE' ) # Add incident reporter and assignee to war room if instance.reporter: war_room.add_participant(instance.reporter) if instance.assigned_to: war_room.add_participant(instance.assigned_to) # Create timeline event for war room creation TimelineEvent.create_system_event( incident=instance, event_type='WAR_ROOM_CREATED', title='War Room Created', description=f'War room "{war_room.name}" was automatically created for this incident', related_war_room=war_room, event_data={'war_room_id': str(war_room.id)} ) @receiver(post_save, sender=Incident) def create_timeline_events_for_incident_changes(sender, instance, created, **kwargs): """Create timeline events for incident changes""" if not created: # Check for status changes if hasattr(instance, '_old_status') and instance._old_status != instance.status: TimelineEvent.create_system_event( incident=instance, event_type='STATUS_CHANGED', title=f'Status Changed to {instance.get_status_display()}', description=f'Incident status changed from {instance._old_status} to {instance.status}', event_data={ 'old_status': instance._old_status, 'new_status': instance.status } ) # Check for severity changes if hasattr(instance, '_old_severity') and instance._old_severity != instance.severity: TimelineEvent.create_system_event( incident=instance, event_type='SEVERITY_CHANGED', title=f'Severity Changed to {instance.get_severity_display()}', description=f'Incident severity changed from {instance._old_severity} to {instance.severity}', event_data={ 'old_severity': instance._old_severity, 'new_severity': instance.severity } ) # Check for assignment changes if hasattr(instance, '_old_assigned_to') and instance._old_assigned_to != instance.assigned_to: old_user = instance._old_assigned_to.username if instance._old_assigned_to else 'Unassigned' new_user = instance.assigned_to.username if instance.assigned_to else 'Unassigned' TimelineEvent.create_system_event( incident=instance, event_type='ASSIGNMENT_CHANGED', title=f'Assignment Changed to {new_user}', description=f'Incident assignment changed from {old_user} to {new_user}', event_data={ 'old_assigned_to': str(instance._old_assigned_to.id) if instance._old_assigned_to else None, 'new_assigned_to': str(instance.assigned_to.id) if instance.assigned_to else None } ) @receiver(pre_save, sender=Incident) def store_old_values_for_incident(sender, instance, **kwargs): """Store old values before saving to detect changes""" if instance.pk: try: old_instance = Incident.objects.get(pk=instance.pk) instance._old_status = old_instance.status instance._old_severity = old_instance.severity instance._old_assigned_to = old_instance.assigned_to except Incident.DoesNotExist: pass @receiver(post_save, sender=RunbookExecution) def create_timeline_event_for_runbook_execution(sender, instance, created, **kwargs): """Create timeline event when runbook is executed""" if created and instance.incident: TimelineEvent.create_system_event( incident=instance.incident, event_type='RUNBOOK_EXECUTED', title=f'Runbook Executed: {instance.runbook.name}', description=f'Runbook "{instance.runbook.name}" was executed for this incident', related_runbook_execution=instance, event_data={ 'runbook_id': str(instance.runbook.id), 'runbook_name': instance.runbook.name, 'triggered_by': str(instance.triggered_by.id) if instance.triggered_by else None } ) @receiver(post_save, sender=AutoRemediationExecution) def create_timeline_event_for_auto_remediation(sender, instance, created, **kwargs): """Create timeline event when auto-remediation is executed""" if created: TimelineEvent.create_system_event( incident=instance.incident, event_type='AUTO_REMEDIATION_ATTEMPTED', title=f'Auto-remediation Attempted: {instance.auto_remediation.name}', description=f'Auto-remediation "{instance.auto_remediation.name}" was attempted for this incident', related_auto_remediation=instance, event_data={ 'auto_remediation_id': str(instance.auto_remediation.id), 'auto_remediation_name': instance.auto_remediation.name, 'status': instance.status } ) @receiver(post_save, sender=SLAInstance) def create_timeline_event_for_sla_breach(sender, instance, created, **kwargs): """Create timeline event when SLA is breached""" if not created and instance.status == 'BREACHED': TimelineEvent.create_system_event( incident=instance.incident, event_type='SLA_BREACHED', title=f'SLA Breached: {instance.sla_definition.name}', description=f'SLA "{instance.sla_definition.name}" has been breached', related_sla_instance=instance, event_data={ 'sla_definition_id': str(instance.sla_definition.id), 'sla_definition_name': instance.sla_definition.name, 'breach_time': instance.breached_at.isoformat() if instance.breached_at else None } ) @receiver(post_save, sender=EscalationInstance) def create_timeline_event_for_escalation(sender, instance, created, **kwargs): """Create timeline event when escalation is triggered""" if created: TimelineEvent.create_system_event( incident=instance.incident, event_type='ESCALATION_TRIGGERED', title=f'Escalation Triggered: {instance.escalation_policy.name}', description=f'Escalation policy "{instance.escalation_policy.name}" was triggered', related_escalation=instance, event_data={ 'escalation_policy_id': str(instance.escalation_policy.id), 'escalation_policy_name': instance.escalation_policy.name, 'escalation_level': instance.escalation_level } ) @receiver(post_save, sender=IncidentCommandRole) def create_timeline_event_for_command_role_assignment(sender, instance, created, **kwargs): """Create timeline event when command role is assigned""" if created and instance.assigned_user: TimelineEvent.create_system_event( incident=instance.incident, event_type='COMMAND_ROLE_ASSIGNED', title=f'Command Role Assigned: {instance.get_role_type_display()}', description=f'{instance.get_role_type_display()} role assigned to {instance.assigned_user.username}', related_command_role=instance, event_data={ 'role_type': instance.role_type, 'assigned_user_id': str(instance.assigned_user.id), 'assigned_user_name': instance.assigned_user.username } ) @receiver(post_save, sender=WarRoomMessage) def create_timeline_event_for_important_messages(sender, instance, created, **kwargs): """Create timeline event for important messages (pinned, commands, etc.)""" if created: # Create timeline event for command messages if instance.message_type == 'COMMAND': TimelineEvent.create_user_event( incident=instance.war_room.incident, user=instance.sender, event_type='MANUAL_EVENT', title='Chat Command Executed', description=f'Command "{instance.content[:50]}..." was executed in war room', event_data={ 'message_id': str(instance.id), 'command_type': 'chat_command' } ) # Create timeline event for system messages elif instance.message_type == 'SYSTEM': TimelineEvent.create_system_event( incident=instance.war_room.incident, event_type='MANUAL_EVENT', title='System Message Posted', description=f'System message posted in war room: {instance.content[:50]}...', event_data={ 'message_id': str(instance.id), 'message_type': 'system' } ) @receiver(post_save, sender=MessageReaction) def create_timeline_event_for_reactions(sender, instance, created, **kwargs): """Create timeline event for reactions on important messages""" if created and instance.message.is_pinned: TimelineEvent.create_user_event( incident=instance.message.war_room.incident, user=instance.user, event_type='MANUAL_EVENT', title='Reaction Added to Pinned Message', description=f'User {instance.user.username} reacted {instance.emoji} to pinned message', event_data={ 'message_id': str(instance.message.id), 'reaction_emoji': instance.emoji, 'user_id': str(instance.user.id) } ) @receiver(post_save, sender=ChatCommand) def create_timeline_event_for_chat_commands(sender, instance, created, **kwargs): """Create timeline event when chat commands are executed""" if created: TimelineEvent.create_user_event( incident=instance.message.war_room.incident, user=instance.executed_by, event_type='MANUAL_EVENT', title=f'ChatOps Command Executed: {instance.command_type}', description=f'ChatOps command "{instance.command_text}" was executed', event_data={ 'command_id': str(instance.id), 'command_type': instance.command_type, 'command_text': instance.command_text, 'execution_status': instance.execution_status } )