Files
ETB/ETB-API/sla_oncall/signals.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

197 lines
7.4 KiB
Python

"""
Signals for SLA & On-Call Management
"""
from django.db import models
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
from django.utils import timezone
from datetime import timedelta
from sla_oncall.models import (
SLAInstance,
EscalationInstance,
SLADefinition,
EscalationPolicy,
)
from incident_intelligence.models import Incident
@receiver(post_save, sender=Incident)
def create_sla_instances(sender, instance, created, **kwargs):
"""Automatically create SLA instances when incidents are created"""
if created:
# Find applicable SLA definitions
applicable_slas = SLADefinition.objects.filter(
is_active=True,
incident_categories__contains=[instance.category] if instance.category else [],
incident_severities__contains=[instance.severity] if instance.severity else [],
incident_priorities__contains=[instance.priority] if instance.priority else [],
)
# Also check for SLAs that apply to all categories/severities/priorities
all_applicable_slas = SLADefinition.objects.filter(
is_active=True
).filter(
models.Q(incident_categories__isnull=True) | models.Q(incident_categories=[]) |
models.Q(incident_severities__isnull=True) | models.Q(incident_severities=[]) |
models.Q(incident_priorities__isnull=True) | models.Q(incident_priorities=[])
)
# Combine and deduplicate
all_slas = (applicable_slas | all_applicable_slas).distinct()
for sla_definition in all_slas:
if sla_definition.applies_to_incident(instance):
# Find applicable escalation policy
escalation_policy = None
if sla_definition.escalation_enabled:
escalation_policy = EscalationPolicy.objects.filter(
is_active=True,
incident_severities__contains=[instance.severity] if instance.severity else [],
incident_categories__contains=[instance.category] if instance.category else [],
).first()
# Calculate target time
if sla_definition.business_hours_only and sla_definition.business_hours:
target_time = sla_definition.business_hours.calculate_target_time(
instance.created_at,
sla_definition.target_duration_minutes
)
else:
target_time = instance.created_at + timedelta(
minutes=sla_definition.target_duration_minutes
)
# Create SLA instance
SLAInstance.objects.create(
sla_definition=sla_definition,
incident=instance,
target_time=target_time,
escalation_policy=escalation_policy
)
@receiver(pre_save, sender=SLAInstance)
def check_sla_breach(sender, instance, **kwargs):
"""Check for SLA breach and trigger escalation if needed"""
if instance.pk: # Only for existing instances
try:
old_instance = SLAInstance.objects.get(pk=instance.pk)
# Check if SLA just became breached
if (old_instance.status == 'ACTIVE' and
instance.status == 'BREACHED' and
not instance.breached_at):
instance.breached_at = timezone.now()
# Trigger escalation if policy exists and not already triggered
if (instance.escalation_policy and
not instance.escalation_triggered):
trigger_escalation(instance)
except SLAInstance.DoesNotExist:
pass
@receiver(post_save, sender=SLAInstance)
def schedule_escalation_check(sender, instance, created, **kwargs):
"""Schedule escalation check for active SLA instances"""
if instance.status == 'ACTIVE' and instance.escalation_policy:
# Check if escalation should be triggered based on threshold
threshold_time = instance.started_at + timedelta(
minutes=instance.sla_definition.target_duration_minutes *
(instance.escalation_policy.escalation_threshold_percent / 100)
)
if (timezone.now() >= threshold_time and
not instance.escalation_triggered):
trigger_escalation(instance)
def trigger_escalation(sla_instance):
"""Trigger escalation for an SLA instance"""
escalation_policy = sla_instance.escalation_policy
# Create escalation instance
escalation_instance = EscalationInstance.objects.create(
escalation_policy=escalation_policy,
incident=sla_instance.incident,
sla_instance=sla_instance,
status='TRIGGERED',
triggered_at=timezone.now(),
escalation_level=1
)
# Update SLA instance
sla_instance.escalation_triggered = True
sla_instance.escalation_triggered_at = timezone.now()
sla_instance.escalation_level = 1
sla_instance.save()
# Send notifications and perform escalation actions
perform_escalation_actions(escalation_instance)
def perform_escalation_actions(escalation_instance):
"""Perform escalation actions based on policy"""
policy = escalation_instance.escalation_policy
incident = escalation_instance.incident
# Get current on-call person if applicable
current_oncall = get_current_oncall_for_incident(incident)
# Send notifications based on policy configuration
notifications_sent = []
for channel in policy.notification_channels:
notification_data = {
'channel': channel,
'sent_at': timezone.now().isoformat(),
'recipients': get_notification_recipients(channel, current_oncall),
'template': policy.notification_templates.get(channel, {})
}
notifications_sent.append(notification_data)
escalation_instance.notifications_sent = notifications_sent
escalation_instance.save()
# Log escalation action
actions_taken = [{
'action': 'escalation_triggered',
'timestamp': timezone.now().isoformat(),
'level': escalation_instance.escalation_level,
'policy': policy.name
}]
escalation_instance.actions_taken = actions_taken
escalation_instance.save()
def get_current_oncall_for_incident(incident):
"""Get current on-call person for an incident"""
# This is a simplified implementation
# In a real system, you might have more complex logic to determine
# which on-call rotation applies to which incidents
from sla_oncall.models import OnCallAssignment
current_assignment = OnCallAssignment.objects.filter(
status='ACTIVE',
start_time__lte=timezone.now(),
end_time__gte=timezone.now()
).first()
return current_assignment.user if current_assignment else None
def get_notification_recipients(channel, current_oncall):
"""Get notification recipients for a channel"""
recipients = []
if current_oncall:
if channel == 'email' and current_oncall.email:
recipients.append(current_oncall.email)
elif channel == 'sms' and hasattr(current_oncall, 'phone_number'):
recipients.append(getattr(current_oncall, 'phone_number', ''))
return recipients