459 lines
21 KiB
Python
459 lines
21 KiB
Python
import logging
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime, timedelta
|
|
from django.db.models import Q, Count, Avg
|
|
from django.utils import timezone
|
|
# from sklearn.feature_extraction.text import TfidfVectorizer
|
|
# from sklearn.metrics.pairwise import cosine_similarity
|
|
# import numpy as np
|
|
|
|
from ..models import IncidentRecommendation, KnowledgeBaseArticle, LearningPattern
|
|
from incident_intelligence.models import Incident
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RecommendationEngine:
|
|
"""Service for generating incident recommendations based on similarity and patterns"""
|
|
|
|
def __init__(self):
|
|
self.model_version = "v1.0"
|
|
self.min_similarity_threshold = 0.3
|
|
self.max_recommendations = 10
|
|
|
|
def generate_recommendations(
|
|
self,
|
|
incident_id: str,
|
|
recommendation_types: Optional[List[str]] = None,
|
|
max_recommendations: int = 5,
|
|
min_confidence: float = 0.5
|
|
) -> List[Dict[str, Any]]:
|
|
"""Generate recommendations for a specific incident"""
|
|
|
|
try:
|
|
incident = Incident.objects.get(id=incident_id)
|
|
recommendations = []
|
|
|
|
if not recommendation_types:
|
|
recommendation_types = [
|
|
'SIMILAR_INCIDENT', 'KNOWLEDGE_ARTICLE', 'SOLUTION',
|
|
'EXPERT', 'PREVENTION'
|
|
]
|
|
|
|
# Generate different types of recommendations
|
|
for rec_type in recommendation_types:
|
|
if rec_type == 'SIMILAR_INCIDENT':
|
|
similar_incidents = self._find_similar_incidents(incident, max_recommendations)
|
|
recommendations.extend(similar_incidents)
|
|
|
|
elif rec_type == 'KNOWLEDGE_ARTICLE':
|
|
knowledge_articles = self._find_relevant_knowledge_articles(incident, max_recommendations)
|
|
recommendations.extend(knowledge_articles)
|
|
|
|
elif rec_type == 'SOLUTION':
|
|
solutions = self._find_solutions(incident, max_recommendations)
|
|
recommendations.extend(solutions)
|
|
|
|
elif rec_type == 'EXPERT':
|
|
experts = self._find_experts(incident, max_recommendations)
|
|
recommendations.extend(experts)
|
|
|
|
elif rec_type == 'PREVENTION':
|
|
prevention_strategies = self._find_prevention_strategies(incident, max_recommendations)
|
|
recommendations.extend(prevention_strategies)
|
|
|
|
# Filter by minimum confidence and sort by confidence score
|
|
recommendations = [
|
|
rec for rec in recommendations
|
|
if rec['confidence_score'] >= min_confidence
|
|
]
|
|
recommendations.sort(key=lambda x: x['confidence_score'], reverse=True)
|
|
|
|
# Save recommendations to database
|
|
saved_recommendations = []
|
|
for rec_data in recommendations[:max_recommendations]:
|
|
recommendation = self._save_recommendation(incident, rec_data)
|
|
saved_recommendations.append({
|
|
'id': str(recommendation.id),
|
|
'title': recommendation.title,
|
|
'type': recommendation.recommendation_type,
|
|
'confidence_score': recommendation.confidence_score,
|
|
'similarity_score': recommendation.similarity_score
|
|
})
|
|
|
|
return saved_recommendations
|
|
|
|
except Incident.DoesNotExist:
|
|
raise ValueError(f"Incident with ID {incident_id} not found")
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate recommendations for incident {incident_id}: {str(e)}")
|
|
raise
|
|
|
|
def _find_similar_incidents(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Find similar incidents based on content and metadata"""
|
|
|
|
# Get resolved incidents from the past 6 months
|
|
six_months_ago = timezone.now() - timedelta(days=180)
|
|
similar_incidents = Incident.objects.filter(
|
|
status__in=['RESOLVED', 'CLOSED'],
|
|
created_at__gte=six_months_ago
|
|
).exclude(id=incident.id)
|
|
|
|
if not similar_incidents.exists():
|
|
return []
|
|
|
|
# Calculate similarity scores
|
|
incident_text = f"{incident.title} {incident.description} {incident.category} {incident.subcategory or ''}"
|
|
|
|
similarities = []
|
|
for similar_incident in similar_incidents:
|
|
similar_text = f"{similar_incident.title} {similar_incident.description} {similar_incident.category} {similar_incident.subcategory or ''}"
|
|
|
|
# Calculate text similarity
|
|
text_similarity = self._calculate_text_similarity(incident_text, similar_text)
|
|
|
|
# Calculate metadata similarity
|
|
metadata_similarity = self._calculate_metadata_similarity(incident, similar_incident)
|
|
|
|
# Combined similarity score
|
|
combined_similarity = (text_similarity * 0.7) + (metadata_similarity * 0.3)
|
|
|
|
if combined_similarity >= self.min_similarity_threshold:
|
|
similarities.append({
|
|
'incident': similar_incident,
|
|
'similarity_score': combined_similarity,
|
|
'text_similarity': text_similarity,
|
|
'metadata_similarity': metadata_similarity
|
|
})
|
|
|
|
# Sort by similarity and return top matches
|
|
similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
|
|
|
|
recommendations = []
|
|
for sim_data in similarities[:limit]:
|
|
similar_incident = sim_data['incident']
|
|
|
|
recommendations.append({
|
|
'recommendation_type': 'SIMILAR_INCIDENT',
|
|
'title': f'Similar Incident: {similar_incident.title}',
|
|
'description': f'This incident is similar to {similar_incident.title} which was resolved on {similar_incident.resolved_at.strftime("%Y-%m-%d") if similar_incident.resolved_at else "unknown date"}.',
|
|
'similarity_score': sim_data['similarity_score'],
|
|
'confidence_score': min(sim_data['similarity_score'] * 1.2, 1.0),
|
|
'confidence_level': self._get_confidence_level(sim_data['similarity_score']),
|
|
'related_incident_id': str(similar_incident.id),
|
|
'suggested_actions': [
|
|
f'Review how {similar_incident.title} was resolved',
|
|
'Check if the same resolution approach applies',
|
|
'Contact the incident assignee for insights'
|
|
],
|
|
'expected_outcome': 'Faster resolution by applying proven solutions',
|
|
'reasoning': f'Incidents are similar based on content ({sim_data["text_similarity"]:.2%}) and metadata ({sim_data["metadata_similarity"]:.2%})',
|
|
'matching_factors': [
|
|
f'Category: {similar_incident.category}',
|
|
f'Severity: {similar_incident.severity}',
|
|
f'Text similarity: {sim_data["text_similarity"]:.2%}'
|
|
]
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _find_relevant_knowledge_articles(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Find relevant knowledge base articles"""
|
|
|
|
# Search for articles by category and keywords
|
|
articles = KnowledgeBaseArticle.objects.filter(
|
|
status='PUBLISHED'
|
|
).order_by('-view_count', '-updated_at')
|
|
|
|
if not articles.exists():
|
|
return []
|
|
|
|
# Calculate relevance scores
|
|
incident_text = f"{incident.title} {incident.description} {incident.category}"
|
|
|
|
recommendations = []
|
|
for article in articles[:limit]:
|
|
article_text = f"{article.title} {article.summary} {' '.join(article.tags)}"
|
|
|
|
# Calculate text similarity
|
|
similarity = self._calculate_text_similarity(incident_text, article_text)
|
|
|
|
if similarity >= self.min_similarity_threshold:
|
|
recommendations.append({
|
|
'recommendation_type': 'KNOWLEDGE_ARTICLE',
|
|
'title': f'Knowledge Article: {article.title}',
|
|
'description': f'This knowledge base article may help resolve the incident: {article.summary}',
|
|
'similarity_score': similarity,
|
|
'confidence_score': min(similarity * 1.1, 1.0),
|
|
'confidence_level': self._get_confidence_level(similarity),
|
|
'knowledge_article_id': str(article.id),
|
|
'suggested_actions': [
|
|
f'Read the article: {article.title}',
|
|
'Follow the procedures outlined in the article',
|
|
'Apply the troubleshooting steps if applicable'
|
|
],
|
|
'expected_outcome': 'Faster resolution using documented procedures',
|
|
'reasoning': f'Article is relevant based on content similarity ({similarity:.2%}) and category match',
|
|
'matching_factors': [
|
|
f'Category: {article.category}',
|
|
f'Type: {article.article_type}',
|
|
f'Difficulty: {article.difficulty_level}',
|
|
f'Views: {article.view_count}'
|
|
]
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _find_solutions(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Find solutions from resolved similar incidents"""
|
|
|
|
# Look for resolved incidents with similar characteristics
|
|
six_months_ago = timezone.now() - timedelta(days=180)
|
|
resolved_incidents = Incident.objects.filter(
|
|
status__in=['RESOLVED', 'CLOSED'],
|
|
category=incident.category,
|
|
created_at__gte=six_months_ago
|
|
).exclude(id=incident.id)
|
|
|
|
if not resolved_incidents.exists():
|
|
return []
|
|
|
|
recommendations = []
|
|
for resolved_incident in resolved_incidents[:limit]:
|
|
# Calculate how quickly it was resolved
|
|
resolution_time = None
|
|
if resolved_incident.resolved_at:
|
|
resolution_time = resolved_incident.resolved_at - resolved_incident.created_at
|
|
|
|
recommendations.append({
|
|
'recommendation_type': 'SOLUTION',
|
|
'title': f'Solution from {resolved_incident.title}',
|
|
'description': f'A similar incident was resolved in {resolution_time} using standard procedures.',
|
|
'similarity_score': 0.8, # High similarity for same category
|
|
'confidence_score': 0.85,
|
|
'confidence_level': 'HIGH',
|
|
'related_incident_id': str(resolved_incident.id),
|
|
'suggested_actions': [
|
|
'Follow the same resolution approach used for the similar incident',
|
|
'Check if the same root cause applies',
|
|
'Apply any documented solutions from the incident'
|
|
],
|
|
'expected_outcome': 'Faster resolution using proven solutions',
|
|
'reasoning': f'Similar incident in same category was resolved successfully',
|
|
'matching_factors': [
|
|
f'Category: {resolved_incident.category}',
|
|
f'Resolution time: {resolution_time}',
|
|
f'Assigned to: {resolved_incident.assigned_to.username if resolved_incident.assigned_to else "Unknown"}'
|
|
]
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _find_experts(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Find experts who have worked on similar incidents"""
|
|
|
|
# Find users who have resolved similar incidents
|
|
six_months_ago = timezone.now() - timedelta(days=180)
|
|
expert_incidents = Incident.objects.filter(
|
|
status__in=['RESOLVED', 'CLOSED'],
|
|
category=incident.category,
|
|
assigned_to__isnull=False,
|
|
created_at__gte=six_months_ago
|
|
).exclude(id=incident.id)
|
|
|
|
# Count incidents per expert
|
|
expert_counts = {}
|
|
for expert_incident in expert_incidents:
|
|
expert = expert_incident.assigned_to
|
|
if expert not in expert_counts:
|
|
expert_counts[expert] = {
|
|
'count': 0,
|
|
'avg_resolution_time': timedelta(),
|
|
'incidents': []
|
|
}
|
|
expert_counts[expert]['count'] += 1
|
|
expert_counts[expert]['incidents'].append(expert_incident)
|
|
|
|
if expert_incident.resolved_at:
|
|
resolution_time = expert_incident.resolved_at - expert_incident.created_at
|
|
expert_counts[expert]['avg_resolution_time'] += resolution_time
|
|
|
|
# Calculate average resolution times
|
|
for expert in expert_counts:
|
|
if expert_counts[expert]['count'] > 0:
|
|
expert_counts[expert]['avg_resolution_time'] /= expert_counts[expert]['count']
|
|
|
|
# Sort experts by experience and create recommendations
|
|
sorted_experts = sorted(
|
|
expert_counts.items(),
|
|
key=lambda x: (x[1]['count'], -x[1]['avg_resolution_time'].total_seconds()),
|
|
reverse=True
|
|
)
|
|
|
|
recommendations = []
|
|
for expert, stats in sorted_experts[:limit]:
|
|
recommendations.append({
|
|
'recommendation_type': 'EXPERT',
|
|
'title': f'Expert: {expert.username}',
|
|
'description': f'{expert.username} has resolved {stats["count"]} similar incidents with an average resolution time of {stats["avg_resolution_time"]}.',
|
|
'similarity_score': 0.9, # High similarity for category experts
|
|
'confidence_score': min(0.7 + (stats['count'] * 0.05), 1.0),
|
|
'confidence_level': 'HIGH' if stats['count'] >= 3 else 'MEDIUM',
|
|
'suggested_expert_id': str(expert.id),
|
|
'suggested_actions': [
|
|
f'Contact {expert.username} for assistance',
|
|
'Ask about their experience with similar incidents',
|
|
'Request guidance on resolution approach'
|
|
],
|
|
'expected_outcome': 'Expert guidance for faster resolution',
|
|
'reasoning': f'Expert has {stats["count"]} successful resolutions in this category',
|
|
'matching_factors': [
|
|
f'Category experience: {stats["count"]} incidents',
|
|
f'Average resolution time: {stats["avg_resolution_time"]}',
|
|
f'Success rate: 100% (all incidents resolved)'
|
|
]
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _find_prevention_strategies(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
|
|
"""Find prevention strategies from learning patterns"""
|
|
|
|
# Find relevant learning patterns
|
|
patterns = LearningPattern.objects.filter(
|
|
is_validated=True,
|
|
pattern_type__in=['PREVENTION', 'ROOT_CAUSE'],
|
|
source_incidents__category=incident.category
|
|
).distinct().order_by('-confidence_score', '-frequency')
|
|
|
|
if not patterns.exists():
|
|
return []
|
|
|
|
recommendations = []
|
|
for pattern in patterns[:limit]:
|
|
recommendations.append({
|
|
'recommendation_type': 'PREVENTION',
|
|
'title': f'Prevention Strategy: {pattern.name}',
|
|
'description': f'This prevention pattern has been validated and applied {pattern.times_applied} times with a {pattern.success_rate:.1%} success rate.',
|
|
'similarity_score': 0.8,
|
|
'confidence_score': pattern.confidence_score,
|
|
'confidence_level': self._get_confidence_level(pattern.confidence_score),
|
|
'suggested_actions': pattern.actions[:3], # Top 3 actions
|
|
'expected_outcome': f'Prevent similar incidents using validated pattern (success rate: {pattern.success_rate:.1%})',
|
|
'reasoning': f'Validated pattern with {pattern.frequency} observations and {pattern.success_rate:.1%} success rate',
|
|
'matching_factors': [
|
|
f'Pattern type: {pattern.pattern_type}',
|
|
f'Frequency: {pattern.frequency} observations',
|
|
f'Success rate: {pattern.success_rate:.1%}',
|
|
f'Validation: Expert validated'
|
|
]
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
|
|
"""Calculate text similarity using simple keyword matching (fallback)"""
|
|
try:
|
|
if not text1.strip() or not text2.strip():
|
|
return 0.0
|
|
|
|
# Simple keyword-based similarity as fallback
|
|
words1 = set(text1.lower().split())
|
|
words2 = set(text2.lower().split())
|
|
|
|
if not words1 or not words2:
|
|
return 0.0
|
|
|
|
intersection = words1.intersection(words2)
|
|
union = words1.union(words2)
|
|
|
|
return len(intersection) / len(union) if union else 0.0
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to calculate text similarity: {str(e)}")
|
|
return 0.0
|
|
|
|
def _calculate_metadata_similarity(self, incident1: Incident, incident2: Incident) -> float:
|
|
"""Calculate similarity based on incident metadata"""
|
|
similarity = 0.0
|
|
|
|
# Category similarity
|
|
if incident1.category == incident2.category:
|
|
similarity += 0.4
|
|
|
|
# Subcategory similarity
|
|
if incident1.subcategory == incident2.subcategory and incident1.subcategory:
|
|
similarity += 0.3
|
|
|
|
# Severity similarity
|
|
severity_weights = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4, 'EMERGENCY': 5}
|
|
if incident1.severity in severity_weights and incident2.severity in severity_weights:
|
|
severity_diff = abs(severity_weights[incident1.severity] - severity_weights[incident2.severity])
|
|
severity_similarity = max(0, 1 - (severity_diff / 4))
|
|
similarity += severity_similarity * 0.2
|
|
|
|
# Priority similarity
|
|
if incident1.priority == incident2.priority:
|
|
similarity += 0.1
|
|
|
|
return min(similarity, 1.0)
|
|
|
|
def _get_confidence_level(self, score: float) -> str:
|
|
"""Convert confidence score to confidence level"""
|
|
if score >= 0.8:
|
|
return 'VERY_HIGH'
|
|
elif score >= 0.6:
|
|
return 'HIGH'
|
|
elif score >= 0.4:
|
|
return 'MEDIUM'
|
|
else:
|
|
return 'LOW'
|
|
|
|
def _save_recommendation(self, incident: Incident, rec_data: Dict[str, Any]) -> IncidentRecommendation:
|
|
"""Save recommendation to database"""
|
|
|
|
# Get related objects
|
|
related_incident = None
|
|
if 'related_incident_id' in rec_data:
|
|
try:
|
|
related_incident = Incident.objects.get(id=rec_data['related_incident_id'])
|
|
except Incident.DoesNotExist:
|
|
pass
|
|
|
|
knowledge_article = None
|
|
if 'knowledge_article_id' in rec_data:
|
|
try:
|
|
knowledge_article = KnowledgeBaseArticle.objects.get(id=rec_data['knowledge_article_id'])
|
|
except KnowledgeBaseArticle.DoesNotExist:
|
|
pass
|
|
|
|
suggested_expert = None
|
|
if 'suggested_expert_id' in rec_data:
|
|
try:
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
suggested_expert = User.objects.get(id=rec_data['suggested_expert_id'])
|
|
except User.DoesNotExist:
|
|
pass
|
|
|
|
# Create recommendation
|
|
recommendation = IncidentRecommendation.objects.create(
|
|
incident=incident,
|
|
recommendation_type=rec_data['recommendation_type'],
|
|
title=rec_data['title'],
|
|
description=rec_data['description'],
|
|
similarity_score=rec_data['similarity_score'],
|
|
confidence_level=rec_data['confidence_level'],
|
|
confidence_score=rec_data['confidence_score'],
|
|
related_incident=related_incident,
|
|
knowledge_article=knowledge_article,
|
|
suggested_expert=suggested_expert,
|
|
suggested_actions=rec_data.get('suggested_actions', []),
|
|
expected_outcome=rec_data.get('expected_outcome', ''),
|
|
reasoning=rec_data['reasoning'],
|
|
matching_factors=rec_data.get('matching_factors', []),
|
|
model_version=self.model_version
|
|
)
|
|
|
|
return recommendation
|