262 lines
12 KiB
Python
262 lines
12 KiB
Python
"""
|
|
Signals for Collaboration & War Rooms module
|
|
Handles automatic war room creation, timeline events, and integration with other modules
|
|
"""
|
|
from django.db.models.signals import post_save, pre_save, post_delete
|
|
from django.dispatch import receiver
|
|
from django.utils import timezone
|
|
|
|
from .models import WarRoom, TimelineEvent, IncidentCommandRole, WarRoomMessage, MessageReaction, ChatCommand
|
|
from incident_intelligence.models import Incident
|
|
from automation_orchestration.models import RunbookExecution, AutoRemediationExecution
|
|
from sla_oncall.models import SLAInstance, EscalationInstance
|
|
|
|
|
|
@receiver(post_save, sender=Incident)
|
|
def create_war_room_for_incident(sender, instance, created, **kwargs):
|
|
"""Automatically create a war room when a new incident is created"""
|
|
if created:
|
|
# Create war room for the incident
|
|
war_room = WarRoom.objects.create(
|
|
name=f"Incident {instance.id} - {instance.title[:50]}",
|
|
description=f"War room for incident: {instance.title}",
|
|
incident=instance,
|
|
created_by=instance.reporter,
|
|
privacy_level='PRIVATE'
|
|
)
|
|
|
|
# Add incident reporter and assignee to war room
|
|
if instance.reporter:
|
|
war_room.add_participant(instance.reporter)
|
|
if instance.assigned_to:
|
|
war_room.add_participant(instance.assigned_to)
|
|
|
|
# Create timeline event for war room creation
|
|
TimelineEvent.create_system_event(
|
|
incident=instance,
|
|
event_type='WAR_ROOM_CREATED',
|
|
title='War Room Created',
|
|
description=f'War room "{war_room.name}" was automatically created for this incident',
|
|
related_war_room=war_room,
|
|
event_data={'war_room_id': str(war_room.id)}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=Incident)
|
|
def create_timeline_events_for_incident_changes(sender, instance, created, **kwargs):
|
|
"""Create timeline events for incident changes"""
|
|
if not created:
|
|
# Check for status changes
|
|
if hasattr(instance, '_old_status') and instance._old_status != instance.status:
|
|
TimelineEvent.create_system_event(
|
|
incident=instance,
|
|
event_type='STATUS_CHANGED',
|
|
title=f'Status Changed to {instance.get_status_display()}',
|
|
description=f'Incident status changed from {instance._old_status} to {instance.status}',
|
|
event_data={
|
|
'old_status': instance._old_status,
|
|
'new_status': instance.status
|
|
}
|
|
)
|
|
|
|
# Check for severity changes
|
|
if hasattr(instance, '_old_severity') and instance._old_severity != instance.severity:
|
|
TimelineEvent.create_system_event(
|
|
incident=instance,
|
|
event_type='SEVERITY_CHANGED',
|
|
title=f'Severity Changed to {instance.get_severity_display()}',
|
|
description=f'Incident severity changed from {instance._old_severity} to {instance.severity}',
|
|
event_data={
|
|
'old_severity': instance._old_severity,
|
|
'new_severity': instance.severity
|
|
}
|
|
)
|
|
|
|
# Check for assignment changes
|
|
if hasattr(instance, '_old_assigned_to') and instance._old_assigned_to != instance.assigned_to:
|
|
old_user = instance._old_assigned_to.username if instance._old_assigned_to else 'Unassigned'
|
|
new_user = instance.assigned_to.username if instance.assigned_to else 'Unassigned'
|
|
|
|
TimelineEvent.create_system_event(
|
|
incident=instance,
|
|
event_type='ASSIGNMENT_CHANGED',
|
|
title=f'Assignment Changed to {new_user}',
|
|
description=f'Incident assignment changed from {old_user} to {new_user}',
|
|
event_data={
|
|
'old_assigned_to': str(instance._old_assigned_to.id) if instance._old_assigned_to else None,
|
|
'new_assigned_to': str(instance.assigned_to.id) if instance.assigned_to else None
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(pre_save, sender=Incident)
|
|
def store_old_values_for_incident(sender, instance, **kwargs):
|
|
"""Store old values before saving to detect changes"""
|
|
if instance.pk:
|
|
try:
|
|
old_instance = Incident.objects.get(pk=instance.pk)
|
|
instance._old_status = old_instance.status
|
|
instance._old_severity = old_instance.severity
|
|
instance._old_assigned_to = old_instance.assigned_to
|
|
except Incident.DoesNotExist:
|
|
pass
|
|
|
|
|
|
@receiver(post_save, sender=RunbookExecution)
|
|
def create_timeline_event_for_runbook_execution(sender, instance, created, **kwargs):
|
|
"""Create timeline event when runbook is executed"""
|
|
if created and instance.incident:
|
|
TimelineEvent.create_system_event(
|
|
incident=instance.incident,
|
|
event_type='RUNBOOK_EXECUTED',
|
|
title=f'Runbook Executed: {instance.runbook.name}',
|
|
description=f'Runbook "{instance.runbook.name}" was executed for this incident',
|
|
related_runbook_execution=instance,
|
|
event_data={
|
|
'runbook_id': str(instance.runbook.id),
|
|
'runbook_name': instance.runbook.name,
|
|
'triggered_by': str(instance.triggered_by.id) if instance.triggered_by else None
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=AutoRemediationExecution)
|
|
def create_timeline_event_for_auto_remediation(sender, instance, created, **kwargs):
|
|
"""Create timeline event when auto-remediation is executed"""
|
|
if created:
|
|
TimelineEvent.create_system_event(
|
|
incident=instance.incident,
|
|
event_type='AUTO_REMEDIATION_ATTEMPTED',
|
|
title=f'Auto-remediation Attempted: {instance.auto_remediation.name}',
|
|
description=f'Auto-remediation "{instance.auto_remediation.name}" was attempted for this incident',
|
|
related_auto_remediation=instance,
|
|
event_data={
|
|
'auto_remediation_id': str(instance.auto_remediation.id),
|
|
'auto_remediation_name': instance.auto_remediation.name,
|
|
'status': instance.status
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=SLAInstance)
|
|
def create_timeline_event_for_sla_breach(sender, instance, created, **kwargs):
|
|
"""Create timeline event when SLA is breached"""
|
|
if not created and instance.status == 'BREACHED':
|
|
TimelineEvent.create_system_event(
|
|
incident=instance.incident,
|
|
event_type='SLA_BREACHED',
|
|
title=f'SLA Breached: {instance.sla_definition.name}',
|
|
description=f'SLA "{instance.sla_definition.name}" has been breached',
|
|
related_sla_instance=instance,
|
|
event_data={
|
|
'sla_definition_id': str(instance.sla_definition.id),
|
|
'sla_definition_name': instance.sla_definition.name,
|
|
'breach_time': instance.breached_at.isoformat() if instance.breached_at else None
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=EscalationInstance)
|
|
def create_timeline_event_for_escalation(sender, instance, created, **kwargs):
|
|
"""Create timeline event when escalation is triggered"""
|
|
if created:
|
|
TimelineEvent.create_system_event(
|
|
incident=instance.incident,
|
|
event_type='ESCALATION_TRIGGERED',
|
|
title=f'Escalation Triggered: {instance.escalation_policy.name}',
|
|
description=f'Escalation policy "{instance.escalation_policy.name}" was triggered',
|
|
related_escalation=instance,
|
|
event_data={
|
|
'escalation_policy_id': str(instance.escalation_policy.id),
|
|
'escalation_policy_name': instance.escalation_policy.name,
|
|
'escalation_level': instance.escalation_level
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=IncidentCommandRole)
|
|
def create_timeline_event_for_command_role_assignment(sender, instance, created, **kwargs):
|
|
"""Create timeline event when command role is assigned"""
|
|
if created and instance.assigned_user:
|
|
TimelineEvent.create_system_event(
|
|
incident=instance.incident,
|
|
event_type='COMMAND_ROLE_ASSIGNED',
|
|
title=f'Command Role Assigned: {instance.get_role_type_display()}',
|
|
description=f'{instance.get_role_type_display()} role assigned to {instance.assigned_user.username}',
|
|
related_command_role=instance,
|
|
event_data={
|
|
'role_type': instance.role_type,
|
|
'assigned_user_id': str(instance.assigned_user.id),
|
|
'assigned_user_name': instance.assigned_user.username
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=WarRoomMessage)
|
|
def create_timeline_event_for_important_messages(sender, instance, created, **kwargs):
|
|
"""Create timeline event for important messages (pinned, commands, etc.)"""
|
|
if created:
|
|
# Create timeline event for command messages
|
|
if instance.message_type == 'COMMAND':
|
|
TimelineEvent.create_user_event(
|
|
incident=instance.war_room.incident,
|
|
user=instance.sender,
|
|
event_type='MANUAL_EVENT',
|
|
title='Chat Command Executed',
|
|
description=f'Command "{instance.content[:50]}..." was executed in war room',
|
|
event_data={
|
|
'message_id': str(instance.id),
|
|
'command_type': 'chat_command'
|
|
}
|
|
)
|
|
|
|
# Create timeline event for system messages
|
|
elif instance.message_type == 'SYSTEM':
|
|
TimelineEvent.create_system_event(
|
|
incident=instance.war_room.incident,
|
|
event_type='MANUAL_EVENT',
|
|
title='System Message Posted',
|
|
description=f'System message posted in war room: {instance.content[:50]}...',
|
|
event_data={
|
|
'message_id': str(instance.id),
|
|
'message_type': 'system'
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=MessageReaction)
|
|
def create_timeline_event_for_reactions(sender, instance, created, **kwargs):
|
|
"""Create timeline event for reactions on important messages"""
|
|
if created and instance.message.is_pinned:
|
|
TimelineEvent.create_user_event(
|
|
incident=instance.message.war_room.incident,
|
|
user=instance.user,
|
|
event_type='MANUAL_EVENT',
|
|
title='Reaction Added to Pinned Message',
|
|
description=f'User {instance.user.username} reacted {instance.emoji} to pinned message',
|
|
event_data={
|
|
'message_id': str(instance.message.id),
|
|
'reaction_emoji': instance.emoji,
|
|
'user_id': str(instance.user.id)
|
|
}
|
|
)
|
|
|
|
|
|
@receiver(post_save, sender=ChatCommand)
|
|
def create_timeline_event_for_chat_commands(sender, instance, created, **kwargs):
|
|
"""Create timeline event when chat commands are executed"""
|
|
if created:
|
|
TimelineEvent.create_user_event(
|
|
incident=instance.message.war_room.incident,
|
|
user=instance.executed_by,
|
|
event_type='MANUAL_EVENT',
|
|
title=f'ChatOps Command Executed: {instance.command_type}',
|
|
description=f'ChatOps command "{instance.command_text}" was executed',
|
|
event_data={
|
|
'command_id': str(instance.id),
|
|
'command_type': instance.command_type,
|
|
'command_text': instance.command_text,
|
|
'execution_status': instance.execution_status
|
|
}
|
|
)
|