""" Signals for SLA & On-Call Management """ from django.db import models from django.db.models.signals import post_save, pre_save from django.dispatch import receiver from django.utils import timezone from datetime import timedelta from sla_oncall.models import ( SLAInstance, EscalationInstance, SLADefinition, EscalationPolicy, ) from incident_intelligence.models import Incident @receiver(post_save, sender=Incident) def create_sla_instances(sender, instance, created, **kwargs): """Automatically create SLA instances when incidents are created""" if created: # Find applicable SLA definitions applicable_slas = SLADefinition.objects.filter( is_active=True, incident_categories__contains=[instance.category] if instance.category else [], incident_severities__contains=[instance.severity] if instance.severity else [], incident_priorities__contains=[instance.priority] if instance.priority else [], ) # Also check for SLAs that apply to all categories/severities/priorities all_applicable_slas = SLADefinition.objects.filter( is_active=True ).filter( models.Q(incident_categories__isnull=True) | models.Q(incident_categories=[]) | models.Q(incident_severities__isnull=True) | models.Q(incident_severities=[]) | models.Q(incident_priorities__isnull=True) | models.Q(incident_priorities=[]) ) # Combine and deduplicate all_slas = (applicable_slas | all_applicable_slas).distinct() for sla_definition in all_slas: if sla_definition.applies_to_incident(instance): # Find applicable escalation policy escalation_policy = None if sla_definition.escalation_enabled: escalation_policy = EscalationPolicy.objects.filter( is_active=True, incident_severities__contains=[instance.severity] if instance.severity else [], incident_categories__contains=[instance.category] if instance.category else [], ).first() # Calculate target time if sla_definition.business_hours_only and sla_definition.business_hours: target_time = sla_definition.business_hours.calculate_target_time( instance.created_at, sla_definition.target_duration_minutes ) else: target_time = instance.created_at + timedelta( minutes=sla_definition.target_duration_minutes ) # Create SLA instance SLAInstance.objects.create( sla_definition=sla_definition, incident=instance, target_time=target_time, escalation_policy=escalation_policy ) @receiver(pre_save, sender=SLAInstance) def check_sla_breach(sender, instance, **kwargs): """Check for SLA breach and trigger escalation if needed""" if instance.pk: # Only for existing instances try: old_instance = SLAInstance.objects.get(pk=instance.pk) # Check if SLA just became breached if (old_instance.status == 'ACTIVE' and instance.status == 'BREACHED' and not instance.breached_at): instance.breached_at = timezone.now() # Trigger escalation if policy exists and not already triggered if (instance.escalation_policy and not instance.escalation_triggered): trigger_escalation(instance) except SLAInstance.DoesNotExist: pass @receiver(post_save, sender=SLAInstance) def schedule_escalation_check(sender, instance, created, **kwargs): """Schedule escalation check for active SLA instances""" if instance.status == 'ACTIVE' and instance.escalation_policy: # Check if escalation should be triggered based on threshold threshold_time = instance.started_at + timedelta( minutes=instance.sla_definition.target_duration_minutes * (instance.escalation_policy.escalation_threshold_percent / 100) ) if (timezone.now() >= threshold_time and not instance.escalation_triggered): trigger_escalation(instance) def trigger_escalation(sla_instance): """Trigger escalation for an SLA instance""" escalation_policy = sla_instance.escalation_policy # Create escalation instance escalation_instance = EscalationInstance.objects.create( escalation_policy=escalation_policy, incident=sla_instance.incident, sla_instance=sla_instance, status='TRIGGERED', triggered_at=timezone.now(), escalation_level=1 ) # Update SLA instance sla_instance.escalation_triggered = True sla_instance.escalation_triggered_at = timezone.now() sla_instance.escalation_level = 1 sla_instance.save() # Send notifications and perform escalation actions perform_escalation_actions(escalation_instance) def perform_escalation_actions(escalation_instance): """Perform escalation actions based on policy""" policy = escalation_instance.escalation_policy incident = escalation_instance.incident # Get current on-call person if applicable current_oncall = get_current_oncall_for_incident(incident) # Send notifications based on policy configuration notifications_sent = [] for channel in policy.notification_channels: notification_data = { 'channel': channel, 'sent_at': timezone.now().isoformat(), 'recipients': get_notification_recipients(channel, current_oncall), 'template': policy.notification_templates.get(channel, {}) } notifications_sent.append(notification_data) escalation_instance.notifications_sent = notifications_sent escalation_instance.save() # Log escalation action actions_taken = [{ 'action': 'escalation_triggered', 'timestamp': timezone.now().isoformat(), 'level': escalation_instance.escalation_level, 'policy': policy.name }] escalation_instance.actions_taken = actions_taken escalation_instance.save() def get_current_oncall_for_incident(incident): """Get current on-call person for an incident""" # This is a simplified implementation # In a real system, you might have more complex logic to determine # which on-call rotation applies to which incidents from sla_oncall.models import OnCallAssignment current_assignment = OnCallAssignment.objects.filter( status='ACTIVE', start_time__lte=timezone.now(), end_time__gte=timezone.now() ).first() return current_assignment.user if current_assignment else None def get_notification_recipients(channel, current_oncall): """Get notification recipients for a channel""" recipients = [] if current_oncall: if channel == 'email' and current_oncall.email: recipients.append(current_oncall.email) elif channel == 'sms' and hasattr(current_oncall, 'phone_number'): recipients.append(getattr(current_oncall, 'phone_number', '')) return recipients