197 lines
7.4 KiB
Python
197 lines
7.4 KiB
Python
"""
|
|
Signals for SLA & On-Call Management
|
|
"""
|
|
from django.db import models
|
|
from django.db.models.signals import post_save, pre_save
|
|
from django.dispatch import receiver
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
|
|
from sla_oncall.models import (
|
|
SLAInstance,
|
|
EscalationInstance,
|
|
SLADefinition,
|
|
EscalationPolicy,
|
|
)
|
|
from incident_intelligence.models import Incident
|
|
|
|
|
|
@receiver(post_save, sender=Incident)
|
|
def create_sla_instances(sender, instance, created, **kwargs):
|
|
"""Automatically create SLA instances when incidents are created"""
|
|
if created:
|
|
# Find applicable SLA definitions
|
|
applicable_slas = SLADefinition.objects.filter(
|
|
is_active=True,
|
|
incident_categories__contains=[instance.category] if instance.category else [],
|
|
incident_severities__contains=[instance.severity] if instance.severity else [],
|
|
incident_priorities__contains=[instance.priority] if instance.priority else [],
|
|
)
|
|
|
|
# Also check for SLAs that apply to all categories/severities/priorities
|
|
all_applicable_slas = SLADefinition.objects.filter(
|
|
is_active=True
|
|
).filter(
|
|
models.Q(incident_categories__isnull=True) | models.Q(incident_categories=[]) |
|
|
models.Q(incident_severities__isnull=True) | models.Q(incident_severities=[]) |
|
|
models.Q(incident_priorities__isnull=True) | models.Q(incident_priorities=[])
|
|
)
|
|
|
|
# Combine and deduplicate
|
|
all_slas = (applicable_slas | all_applicable_slas).distinct()
|
|
|
|
for sla_definition in all_slas:
|
|
if sla_definition.applies_to_incident(instance):
|
|
# Find applicable escalation policy
|
|
escalation_policy = None
|
|
if sla_definition.escalation_enabled:
|
|
escalation_policy = EscalationPolicy.objects.filter(
|
|
is_active=True,
|
|
incident_severities__contains=[instance.severity] if instance.severity else [],
|
|
incident_categories__contains=[instance.category] if instance.category else [],
|
|
).first()
|
|
|
|
# Calculate target time
|
|
if sla_definition.business_hours_only and sla_definition.business_hours:
|
|
target_time = sla_definition.business_hours.calculate_target_time(
|
|
instance.created_at,
|
|
sla_definition.target_duration_minutes
|
|
)
|
|
else:
|
|
target_time = instance.created_at + timedelta(
|
|
minutes=sla_definition.target_duration_minutes
|
|
)
|
|
|
|
# Create SLA instance
|
|
SLAInstance.objects.create(
|
|
sla_definition=sla_definition,
|
|
incident=instance,
|
|
target_time=target_time,
|
|
escalation_policy=escalation_policy
|
|
)
|
|
|
|
|
|
@receiver(pre_save, sender=SLAInstance)
|
|
def check_sla_breach(sender, instance, **kwargs):
|
|
"""Check for SLA breach and trigger escalation if needed"""
|
|
if instance.pk: # Only for existing instances
|
|
try:
|
|
old_instance = SLAInstance.objects.get(pk=instance.pk)
|
|
|
|
# Check if SLA just became breached
|
|
if (old_instance.status == 'ACTIVE' and
|
|
instance.status == 'BREACHED' and
|
|
not instance.breached_at):
|
|
instance.breached_at = timezone.now()
|
|
|
|
# Trigger escalation if policy exists and not already triggered
|
|
if (instance.escalation_policy and
|
|
not instance.escalation_triggered):
|
|
trigger_escalation(instance)
|
|
|
|
except SLAInstance.DoesNotExist:
|
|
pass
|
|
|
|
|
|
@receiver(post_save, sender=SLAInstance)
|
|
def schedule_escalation_check(sender, instance, created, **kwargs):
|
|
"""Schedule escalation check for active SLA instances"""
|
|
if instance.status == 'ACTIVE' and instance.escalation_policy:
|
|
# Check if escalation should be triggered based on threshold
|
|
threshold_time = instance.started_at + timedelta(
|
|
minutes=instance.sla_definition.target_duration_minutes *
|
|
(instance.escalation_policy.escalation_threshold_percent / 100)
|
|
)
|
|
|
|
if (timezone.now() >= threshold_time and
|
|
not instance.escalation_triggered):
|
|
trigger_escalation(instance)
|
|
|
|
|
|
def trigger_escalation(sla_instance):
|
|
"""Trigger escalation for an SLA instance"""
|
|
escalation_policy = sla_instance.escalation_policy
|
|
|
|
# Create escalation instance
|
|
escalation_instance = EscalationInstance.objects.create(
|
|
escalation_policy=escalation_policy,
|
|
incident=sla_instance.incident,
|
|
sla_instance=sla_instance,
|
|
status='TRIGGERED',
|
|
triggered_at=timezone.now(),
|
|
escalation_level=1
|
|
)
|
|
|
|
# Update SLA instance
|
|
sla_instance.escalation_triggered = True
|
|
sla_instance.escalation_triggered_at = timezone.now()
|
|
sla_instance.escalation_level = 1
|
|
sla_instance.save()
|
|
|
|
# Send notifications and perform escalation actions
|
|
perform_escalation_actions(escalation_instance)
|
|
|
|
|
|
def perform_escalation_actions(escalation_instance):
|
|
"""Perform escalation actions based on policy"""
|
|
policy = escalation_instance.escalation_policy
|
|
incident = escalation_instance.incident
|
|
|
|
# Get current on-call person if applicable
|
|
current_oncall = get_current_oncall_for_incident(incident)
|
|
|
|
# Send notifications based on policy configuration
|
|
notifications_sent = []
|
|
|
|
for channel in policy.notification_channels:
|
|
notification_data = {
|
|
'channel': channel,
|
|
'sent_at': timezone.now().isoformat(),
|
|
'recipients': get_notification_recipients(channel, current_oncall),
|
|
'template': policy.notification_templates.get(channel, {})
|
|
}
|
|
notifications_sent.append(notification_data)
|
|
|
|
escalation_instance.notifications_sent = notifications_sent
|
|
escalation_instance.save()
|
|
|
|
# Log escalation action
|
|
actions_taken = [{
|
|
'action': 'escalation_triggered',
|
|
'timestamp': timezone.now().isoformat(),
|
|
'level': escalation_instance.escalation_level,
|
|
'policy': policy.name
|
|
}]
|
|
|
|
escalation_instance.actions_taken = actions_taken
|
|
escalation_instance.save()
|
|
|
|
|
|
def get_current_oncall_for_incident(incident):
|
|
"""Get current on-call person for an incident"""
|
|
# This is a simplified implementation
|
|
# In a real system, you might have more complex logic to determine
|
|
# which on-call rotation applies to which incidents
|
|
from sla_oncall.models import OnCallAssignment
|
|
|
|
current_assignment = OnCallAssignment.objects.filter(
|
|
status='ACTIVE',
|
|
start_time__lte=timezone.now(),
|
|
end_time__gte=timezone.now()
|
|
).first()
|
|
|
|
return current_assignment.user if current_assignment else None
|
|
|
|
|
|
def get_notification_recipients(channel, current_oncall):
|
|
"""Get notification recipients for a channel"""
|
|
recipients = []
|
|
|
|
if current_oncall:
|
|
if channel == 'email' and current_oncall.email:
|
|
recipients.append(current_oncall.email)
|
|
elif channel == 'sms' and hasattr(current_oncall, 'phone_number'):
|
|
recipients.append(getattr(current_oncall, 'phone_number', ''))
|
|
|
|
return recipients
|