Files
ETB/ETB-API/knowledge_learning/services/postmortem_generator.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

362 lines
17 KiB
Python

import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from django.utils import timezone
from django.db import transaction
from ..models import Postmortem, AutomatedPostmortemGeneration
from incident_intelligence.models import Incident
logger = logging.getLogger(__name__)
class PostmortemGenerator:
"""Service for generating automated postmortems from incident data"""
def __init__(self):
self.model_version = "v1.0"
def generate_postmortem_for_incident(
self,
incident_id: str,
include_timeline: bool = True,
include_logs: bool = True,
trigger: str = "manual"
) -> Dict[str, Any]:
"""Generate a postmortem for a specific incident"""
try:
incident = Incident.objects.get(id=incident_id)
# Create generation log
generation_log = AutomatedPostmortemGeneration.objects.create(
incident=incident,
status='PROCESSING',
generation_trigger=trigger,
incident_data=self._extract_incident_data(incident),
timeline_data=self._extract_timeline_data(incident) if include_timeline else [],
log_data=self._extract_log_data(incident) if include_logs else []
)
# Generate postmortem content
generated_content = self._generate_content(incident, generation_log)
# Create postmortem
postmortem = Postmortem.objects.create(
title=f"Postmortem: {incident.title}",
incident=incident,
executive_summary=generated_content.get('executive_summary', ''),
timeline=generated_content.get('timeline', []),
root_cause_analysis=generated_content.get('root_cause_analysis', ''),
impact_assessment=generated_content.get('impact_assessment', ''),
lessons_learned=generated_content.get('lessons_learned', ''),
action_items=generated_content.get('action_items', []),
is_automated=True,
generation_confidence=generated_content.get('confidence_score', 0.0),
auto_generated_sections=generated_content.get('generated_sections', []),
status='DRAFT',
severity=incident.severity,
affected_services=self._extract_affected_services(incident),
affected_teams=self._extract_affected_teams(incident),
due_date=timezone.now() + timedelta(days=7) # Due in 7 days
)
# Update generation log
generation_log.status = 'COMPLETED'
generation_log.generated_content = generated_content
generation_log.generated_postmortem = postmortem
generation_log.confidence_scores = generated_content.get('confidence_scores', {})
generation_log.quality_metrics = generated_content.get('quality_metrics', {})
generation_log.completed_at = timezone.now()
generation_log.save()
return {
'generation_id': str(generation_log.id),
'postmortem_id': str(postmortem.id),
'status': 'completed',
'confidence_score': postmortem.generation_confidence
}
except Incident.DoesNotExist:
raise ValueError(f"Incident with ID {incident_id} not found")
except Exception as e:
logger.error(f"Failed to generate postmortem for incident {incident_id}: {str(e)}")
# Update generation log with error
if 'generation_log' in locals():
generation_log.status = 'FAILED'
generation_log.error_message = str(e)
generation_log.completed_at = timezone.now()
generation_log.save()
raise
def generate_postmortem(self, incident: Incident) -> Dict[str, Any]:
"""Generate postmortem content for an incident"""
return self._generate_content(incident, None)
def _generate_content(self, incident: Incident, generation_log: Optional[AutomatedPostmortemGeneration] = None) -> Dict[str, Any]:
"""Generate the actual postmortem content using AI/ML models"""
# This is a simplified implementation - in production, you would integrate with
# actual AI/ML services like OpenAI, Azure Cognitive Services, or custom models
content = {
'executive_summary': self._generate_executive_summary(incident),
'timeline': self._generate_timeline(incident),
'root_cause_analysis': self._generate_root_cause_analysis(incident),
'impact_assessment': self._generate_impact_assessment(incident),
'lessons_learned': self._generate_lessons_learned(incident),
'action_items': self._generate_action_items(incident),
'confidence_score': self._calculate_confidence_score(incident),
'generated_sections': ['executive_summary', 'timeline', 'root_cause_analysis', 'impact_assessment', 'lessons_learned', 'action_items'],
'confidence_scores': {
'executive_summary': 0.85,
'timeline': 0.90,
'root_cause_analysis': 0.75,
'impact_assessment': 0.80,
'lessons_learned': 0.70,
'action_items': 0.75
},
'quality_metrics': {
'completeness': 0.85,
'accuracy': 0.80,
'actionability': 0.75
}
}
return content
def _generate_executive_summary(self, incident: Incident) -> str:
"""Generate executive summary"""
return f"""
On {incident.created_at.strftime('%Y-%m-%d %H:%M')}, a {incident.severity.lower()} severity incident occurred affecting {incident.affected_users} users.
The incident was categorized as {incident.category} and was resolved after {self._calculate_resolution_time(incident)}.
Key Impact:
- {incident.affected_users} users affected
- Business impact: {incident.business_impact or 'Not specified'}
- Resolution time: {self._calculate_resolution_time(incident)}
This postmortem outlines the timeline, root causes, and preventive measures to avoid similar incidents in the future.
"""
def _generate_timeline(self, incident: Incident) -> List[Dict[str, Any]]:
"""Generate incident timeline"""
timeline = [
{
'timestamp': incident.created_at.isoformat(),
'event': 'Incident reported',
'description': f'Incident "{incident.title}" was reported',
'actor': incident.reporter.username if incident.reporter else 'System'
}
]
if incident.assigned_to:
timeline.append({
'timestamp': incident.updated_at.isoformat(),
'event': 'Incident assigned',
'description': f'Incident assigned to {incident.assigned_to.username}',
'actor': 'System'
})
if incident.resolved_at:
timeline.append({
'timestamp': incident.resolved_at.isoformat(),
'event': 'Incident resolved',
'description': f'Incident resolved with status: {incident.status}',
'actor': incident.assigned_to.username if incident.assigned_to else 'System'
})
return timeline
def _generate_root_cause_analysis(self, incident: Incident) -> str:
"""Generate root cause analysis"""
return f"""
Root Cause Analysis for {incident.title}:
Primary Factors:
1. Technical Issue: {incident.category} - {incident.subcategory or 'Not specified'}
2. System Component: {incident.description[:200]}...
3. User Impact: {incident.affected_users} users affected
Contributing Factors:
- Incident severity: {incident.severity}
- Priority level: {incident.priority}
- Resolution time: {self._calculate_resolution_time(incident)}
Analysis:
Based on the incident details and timeline, the root cause appears to be related to {incident.category}.
The incident was classified with {incident.classification_confidence or 0.0:.2%} confidence,
suggesting {incident.subcategory or 'a system component failure'} as the primary cause.
Recommendations for further investigation:
1. Review system logs for the time period {incident.created_at} to {incident.resolved_at or incident.updated_at}
2. Analyze similar incidents in the past 30 days
3. Check for any recent deployments or configuration changes
"""
def _generate_impact_assessment(self, incident: Incident) -> str:
"""Generate impact assessment"""
return f"""
Impact Assessment for {incident.title}:
User Impact:
- Total users affected: {incident.affected_users}
- Severity level: {incident.severity}
- Duration of impact: {self._calculate_resolution_time(incident)}
Business Impact:
{incident.business_impact or 'Business impact not specified in incident details.'}
Technical Impact:
- Affected services: {', '.join(self._extract_affected_services(incident))}
- System components: {incident.category} - {incident.subcategory or 'Not specified'}
- Estimated downtime: {incident.estimated_downtime or 'Not specified'}
Financial Impact:
- Estimated cost: To be calculated based on user impact and downtime
- SLA impact: {'SLA override applied' if incident.sla_override else 'Normal SLA applies'}
Reputation Impact:
- Customer satisfaction: Potentially affected due to service disruption
- Trust level: Impact depends on resolution time and communication
"""
def _generate_lessons_learned(self, incident: Incident) -> str:
"""Generate lessons learned"""
return f"""
Lessons Learned from {incident.title}:
What Went Well:
1. Incident was properly categorized as {incident.category}
2. {'Automated remediation was attempted' if incident.auto_remediation_attempted else 'Manual intervention was required'}
3. {'Runbook was suggested' if incident.runbook_suggested else 'No runbook was available'}
What Could Be Improved:
1. {'Faster detection and response time needed' if self._calculate_resolution_time(incident) > timedelta(hours=1) else 'Response time was acceptable'}
2. {'Better automation coverage needed' if not incident.auto_remediation_attempted else 'Automation worked as expected'}
3. {'More detailed incident description needed' if len(incident.description) < 100 else 'Incident description was adequate'}
Key Insights:
1. {incident.category} incidents require {'immediate' if incident.severity in ['CRITICAL', 'EMERGENCY'] else 'standard'} response procedures
2. {'Automation can help' if incident.automation_enabled else 'Manual processes need improvement'} in similar scenarios
3. {'SLA override was necessary' if incident.sla_override else 'Standard SLA procedures were sufficient'}
Process Improvements:
1. Review and update runbooks for {incident.category} incidents
2. {'Enhance monitoring' if incident.severity in ['HIGH', 'CRITICAL', 'EMERGENCY'] else 'Maintain current monitoring'} for early detection
3. {'Improve automation' if not incident.auto_remediation_attempted else 'Automation is working well'} for faster resolution
"""
def _generate_action_items(self, incident: Incident) -> List[Dict[str, Any]]:
"""Generate action items"""
action_items = [
{
'title': f'Review and update {incident.category} runbook',
'description': f'Update the runbook for {incident.category} incidents based on lessons learned',
'priority': 'HIGH' if incident.severity in ['CRITICAL', 'EMERGENCY'] else 'MEDIUM',
'assignee': 'TBD',
'due_date': (timezone.now() + timedelta(days=14)).isoformat(),
'category': 'Process Improvement'
},
{
'title': 'Enhance monitoring and alerting',
'description': f'Improve monitoring for {incident.category} to detect similar issues earlier',
'priority': 'MEDIUM',
'assignee': 'TBD',
'due_date': (timezone.now() + timedelta(days=21)).isoformat(),
'category': 'Technical Improvement'
}
]
if not incident.auto_remediation_attempted and incident.automation_enabled:
action_items.append({
'title': 'Implement automated remediation',
'description': f'Develop automated remediation for {incident.category} incidents',
'priority': 'HIGH' if incident.severity in ['CRITICAL', 'EMERGENCY'] else 'MEDIUM',
'assignee': 'TBD',
'due_date': (timezone.now() + timedelta(days=30)).isoformat(),
'category': 'Automation'
})
return action_items
def _calculate_confidence_score(self, incident: Incident) -> float:
"""Calculate overall confidence score for the generated postmortem"""
base_confidence = 0.7
# Adjust based on incident data quality
if incident.classification_confidence:
base_confidence += incident.classification_confidence * 0.2
if len(incident.description) > 200:
base_confidence += 0.1
if incident.business_impact:
base_confidence += 0.1
return min(base_confidence, 1.0)
def _calculate_resolution_time(self, incident: Incident) -> str:
"""Calculate resolution time"""
if incident.resolved_at and incident.created_at:
duration = incident.resolved_at - incident.created_at
return str(duration)
return "Not resolved yet"
def _extract_incident_data(self, incident: Incident) -> Dict[str, Any]:
"""Extract relevant incident data for postmortem generation"""
return {
'id': str(incident.id),
'title': incident.title,
'description': incident.description,
'category': incident.category,
'subcategory': incident.subcategory,
'severity': incident.severity,
'priority': incident.priority,
'status': incident.status,
'affected_users': incident.affected_users,
'business_impact': incident.business_impact,
'created_at': incident.created_at.isoformat(),
'resolved_at': incident.resolved_at.isoformat() if incident.resolved_at else None,
'assigned_to': incident.assigned_to.username if incident.assigned_to else None,
'reporter': incident.reporter.username if incident.reporter else None
}
def _extract_timeline_data(self, incident: Incident) -> List[Dict[str, Any]]:
"""Extract timeline data from incident"""
# In a real implementation, this would extract from incident logs, comments, etc.
return self._generate_timeline(incident)
def _extract_log_data(self, incident: Incident) -> List[Dict[str, Any]]:
"""Extract relevant log data for the incident"""
# In a real implementation, this would query log systems
return [
{
'timestamp': incident.created_at.isoformat(),
'level': 'ERROR',
'message': f'Incident {incident.title} reported',
'source': 'incident_system'
}
]
def _extract_affected_services(self, incident: Incident) -> List[str]:
"""Extract affected services from incident"""
services = []
if incident.category:
services.append(incident.category)
if incident.subcategory:
services.append(incident.subcategory)
return services
def _extract_affected_teams(self, incident: Incident) -> List[str]:
"""Extract affected teams from incident"""
teams = []
if incident.assigned_to:
teams.append(incident.assigned_to.username)
if incident.reporter:
teams.append(incident.reporter.username)
return teams