Files
ETB/ETB-API/knowledge_learning/services/recommendation_engine.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

459 lines
21 KiB
Python

import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from django.db.models import Q, Count, Avg
from django.utils import timezone
# from sklearn.feature_extraction.text import TfidfVectorizer
# from sklearn.metrics.pairwise import cosine_similarity
# import numpy as np
from ..models import IncidentRecommendation, KnowledgeBaseArticle, LearningPattern
from incident_intelligence.models import Incident
logger = logging.getLogger(__name__)
class RecommendationEngine:
"""Service for generating incident recommendations based on similarity and patterns"""
def __init__(self):
self.model_version = "v1.0"
self.min_similarity_threshold = 0.3
self.max_recommendations = 10
def generate_recommendations(
self,
incident_id: str,
recommendation_types: Optional[List[str]] = None,
max_recommendations: int = 5,
min_confidence: float = 0.5
) -> List[Dict[str, Any]]:
"""Generate recommendations for a specific incident"""
try:
incident = Incident.objects.get(id=incident_id)
recommendations = []
if not recommendation_types:
recommendation_types = [
'SIMILAR_INCIDENT', 'KNOWLEDGE_ARTICLE', 'SOLUTION',
'EXPERT', 'PREVENTION'
]
# Generate different types of recommendations
for rec_type in recommendation_types:
if rec_type == 'SIMILAR_INCIDENT':
similar_incidents = self._find_similar_incidents(incident, max_recommendations)
recommendations.extend(similar_incidents)
elif rec_type == 'KNOWLEDGE_ARTICLE':
knowledge_articles = self._find_relevant_knowledge_articles(incident, max_recommendations)
recommendations.extend(knowledge_articles)
elif rec_type == 'SOLUTION':
solutions = self._find_solutions(incident, max_recommendations)
recommendations.extend(solutions)
elif rec_type == 'EXPERT':
experts = self._find_experts(incident, max_recommendations)
recommendations.extend(experts)
elif rec_type == 'PREVENTION':
prevention_strategies = self._find_prevention_strategies(incident, max_recommendations)
recommendations.extend(prevention_strategies)
# Filter by minimum confidence and sort by confidence score
recommendations = [
rec for rec in recommendations
if rec['confidence_score'] >= min_confidence
]
recommendations.sort(key=lambda x: x['confidence_score'], reverse=True)
# Save recommendations to database
saved_recommendations = []
for rec_data in recommendations[:max_recommendations]:
recommendation = self._save_recommendation(incident, rec_data)
saved_recommendations.append({
'id': str(recommendation.id),
'title': recommendation.title,
'type': recommendation.recommendation_type,
'confidence_score': recommendation.confidence_score,
'similarity_score': recommendation.similarity_score
})
return saved_recommendations
except Incident.DoesNotExist:
raise ValueError(f"Incident with ID {incident_id} not found")
except Exception as e:
logger.error(f"Failed to generate recommendations for incident {incident_id}: {str(e)}")
raise
def _find_similar_incidents(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
"""Find similar incidents based on content and metadata"""
# Get resolved incidents from the past 6 months
six_months_ago = timezone.now() - timedelta(days=180)
similar_incidents = Incident.objects.filter(
status__in=['RESOLVED', 'CLOSED'],
created_at__gte=six_months_ago
).exclude(id=incident.id)
if not similar_incidents.exists():
return []
# Calculate similarity scores
incident_text = f"{incident.title} {incident.description} {incident.category} {incident.subcategory or ''}"
similarities = []
for similar_incident in similar_incidents:
similar_text = f"{similar_incident.title} {similar_incident.description} {similar_incident.category} {similar_incident.subcategory or ''}"
# Calculate text similarity
text_similarity = self._calculate_text_similarity(incident_text, similar_text)
# Calculate metadata similarity
metadata_similarity = self._calculate_metadata_similarity(incident, similar_incident)
# Combined similarity score
combined_similarity = (text_similarity * 0.7) + (metadata_similarity * 0.3)
if combined_similarity >= self.min_similarity_threshold:
similarities.append({
'incident': similar_incident,
'similarity_score': combined_similarity,
'text_similarity': text_similarity,
'metadata_similarity': metadata_similarity
})
# Sort by similarity and return top matches
similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
recommendations = []
for sim_data in similarities[:limit]:
similar_incident = sim_data['incident']
recommendations.append({
'recommendation_type': 'SIMILAR_INCIDENT',
'title': f'Similar Incident: {similar_incident.title}',
'description': f'This incident is similar to {similar_incident.title} which was resolved on {similar_incident.resolved_at.strftime("%Y-%m-%d") if similar_incident.resolved_at else "unknown date"}.',
'similarity_score': sim_data['similarity_score'],
'confidence_score': min(sim_data['similarity_score'] * 1.2, 1.0),
'confidence_level': self._get_confidence_level(sim_data['similarity_score']),
'related_incident_id': str(similar_incident.id),
'suggested_actions': [
f'Review how {similar_incident.title} was resolved',
'Check if the same resolution approach applies',
'Contact the incident assignee for insights'
],
'expected_outcome': 'Faster resolution by applying proven solutions',
'reasoning': f'Incidents are similar based on content ({sim_data["text_similarity"]:.2%}) and metadata ({sim_data["metadata_similarity"]:.2%})',
'matching_factors': [
f'Category: {similar_incident.category}',
f'Severity: {similar_incident.severity}',
f'Text similarity: {sim_data["text_similarity"]:.2%}'
]
})
return recommendations
def _find_relevant_knowledge_articles(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
"""Find relevant knowledge base articles"""
# Search for articles by category and keywords
articles = KnowledgeBaseArticle.objects.filter(
status='PUBLISHED'
).order_by('-view_count', '-updated_at')
if not articles.exists():
return []
# Calculate relevance scores
incident_text = f"{incident.title} {incident.description} {incident.category}"
recommendations = []
for article in articles[:limit]:
article_text = f"{article.title} {article.summary} {' '.join(article.tags)}"
# Calculate text similarity
similarity = self._calculate_text_similarity(incident_text, article_text)
if similarity >= self.min_similarity_threshold:
recommendations.append({
'recommendation_type': 'KNOWLEDGE_ARTICLE',
'title': f'Knowledge Article: {article.title}',
'description': f'This knowledge base article may help resolve the incident: {article.summary}',
'similarity_score': similarity,
'confidence_score': min(similarity * 1.1, 1.0),
'confidence_level': self._get_confidence_level(similarity),
'knowledge_article_id': str(article.id),
'suggested_actions': [
f'Read the article: {article.title}',
'Follow the procedures outlined in the article',
'Apply the troubleshooting steps if applicable'
],
'expected_outcome': 'Faster resolution using documented procedures',
'reasoning': f'Article is relevant based on content similarity ({similarity:.2%}) and category match',
'matching_factors': [
f'Category: {article.category}',
f'Type: {article.article_type}',
f'Difficulty: {article.difficulty_level}',
f'Views: {article.view_count}'
]
})
return recommendations
def _find_solutions(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
"""Find solutions from resolved similar incidents"""
# Look for resolved incidents with similar characteristics
six_months_ago = timezone.now() - timedelta(days=180)
resolved_incidents = Incident.objects.filter(
status__in=['RESOLVED', 'CLOSED'],
category=incident.category,
created_at__gte=six_months_ago
).exclude(id=incident.id)
if not resolved_incidents.exists():
return []
recommendations = []
for resolved_incident in resolved_incidents[:limit]:
# Calculate how quickly it was resolved
resolution_time = None
if resolved_incident.resolved_at:
resolution_time = resolved_incident.resolved_at - resolved_incident.created_at
recommendations.append({
'recommendation_type': 'SOLUTION',
'title': f'Solution from {resolved_incident.title}',
'description': f'A similar incident was resolved in {resolution_time} using standard procedures.',
'similarity_score': 0.8, # High similarity for same category
'confidence_score': 0.85,
'confidence_level': 'HIGH',
'related_incident_id': str(resolved_incident.id),
'suggested_actions': [
'Follow the same resolution approach used for the similar incident',
'Check if the same root cause applies',
'Apply any documented solutions from the incident'
],
'expected_outcome': 'Faster resolution using proven solutions',
'reasoning': f'Similar incident in same category was resolved successfully',
'matching_factors': [
f'Category: {resolved_incident.category}',
f'Resolution time: {resolution_time}',
f'Assigned to: {resolved_incident.assigned_to.username if resolved_incident.assigned_to else "Unknown"}'
]
})
return recommendations
def _find_experts(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
"""Find experts who have worked on similar incidents"""
# Find users who have resolved similar incidents
six_months_ago = timezone.now() - timedelta(days=180)
expert_incidents = Incident.objects.filter(
status__in=['RESOLVED', 'CLOSED'],
category=incident.category,
assigned_to__isnull=False,
created_at__gte=six_months_ago
).exclude(id=incident.id)
# Count incidents per expert
expert_counts = {}
for expert_incident in expert_incidents:
expert = expert_incident.assigned_to
if expert not in expert_counts:
expert_counts[expert] = {
'count': 0,
'avg_resolution_time': timedelta(),
'incidents': []
}
expert_counts[expert]['count'] += 1
expert_counts[expert]['incidents'].append(expert_incident)
if expert_incident.resolved_at:
resolution_time = expert_incident.resolved_at - expert_incident.created_at
expert_counts[expert]['avg_resolution_time'] += resolution_time
# Calculate average resolution times
for expert in expert_counts:
if expert_counts[expert]['count'] > 0:
expert_counts[expert]['avg_resolution_time'] /= expert_counts[expert]['count']
# Sort experts by experience and create recommendations
sorted_experts = sorted(
expert_counts.items(),
key=lambda x: (x[1]['count'], -x[1]['avg_resolution_time'].total_seconds()),
reverse=True
)
recommendations = []
for expert, stats in sorted_experts[:limit]:
recommendations.append({
'recommendation_type': 'EXPERT',
'title': f'Expert: {expert.username}',
'description': f'{expert.username} has resolved {stats["count"]} similar incidents with an average resolution time of {stats["avg_resolution_time"]}.',
'similarity_score': 0.9, # High similarity for category experts
'confidence_score': min(0.7 + (stats['count'] * 0.05), 1.0),
'confidence_level': 'HIGH' if stats['count'] >= 3 else 'MEDIUM',
'suggested_expert_id': str(expert.id),
'suggested_actions': [
f'Contact {expert.username} for assistance',
'Ask about their experience with similar incidents',
'Request guidance on resolution approach'
],
'expected_outcome': 'Expert guidance for faster resolution',
'reasoning': f'Expert has {stats["count"]} successful resolutions in this category',
'matching_factors': [
f'Category experience: {stats["count"]} incidents',
f'Average resolution time: {stats["avg_resolution_time"]}',
f'Success rate: 100% (all incidents resolved)'
]
})
return recommendations
def _find_prevention_strategies(self, incident: Incident, limit: int = 5) -> List[Dict[str, Any]]:
"""Find prevention strategies from learning patterns"""
# Find relevant learning patterns
patterns = LearningPattern.objects.filter(
is_validated=True,
pattern_type__in=['PREVENTION', 'ROOT_CAUSE'],
source_incidents__category=incident.category
).distinct().order_by('-confidence_score', '-frequency')
if not patterns.exists():
return []
recommendations = []
for pattern in patterns[:limit]:
recommendations.append({
'recommendation_type': 'PREVENTION',
'title': f'Prevention Strategy: {pattern.name}',
'description': f'This prevention pattern has been validated and applied {pattern.times_applied} times with a {pattern.success_rate:.1%} success rate.',
'similarity_score': 0.8,
'confidence_score': pattern.confidence_score,
'confidence_level': self._get_confidence_level(pattern.confidence_score),
'suggested_actions': pattern.actions[:3], # Top 3 actions
'expected_outcome': f'Prevent similar incidents using validated pattern (success rate: {pattern.success_rate:.1%})',
'reasoning': f'Validated pattern with {pattern.frequency} observations and {pattern.success_rate:.1%} success rate',
'matching_factors': [
f'Pattern type: {pattern.pattern_type}',
f'Frequency: {pattern.frequency} observations',
f'Success rate: {pattern.success_rate:.1%}',
f'Validation: Expert validated'
]
})
return recommendations
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Calculate text similarity using simple keyword matching (fallback)"""
try:
if not text1.strip() or not text2.strip():
return 0.0
# Simple keyword-based similarity as fallback
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
except Exception as e:
logger.warning(f"Failed to calculate text similarity: {str(e)}")
return 0.0
def _calculate_metadata_similarity(self, incident1: Incident, incident2: Incident) -> float:
"""Calculate similarity based on incident metadata"""
similarity = 0.0
# Category similarity
if incident1.category == incident2.category:
similarity += 0.4
# Subcategory similarity
if incident1.subcategory == incident2.subcategory and incident1.subcategory:
similarity += 0.3
# Severity similarity
severity_weights = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4, 'EMERGENCY': 5}
if incident1.severity in severity_weights and incident2.severity in severity_weights:
severity_diff = abs(severity_weights[incident1.severity] - severity_weights[incident2.severity])
severity_similarity = max(0, 1 - (severity_diff / 4))
similarity += severity_similarity * 0.2
# Priority similarity
if incident1.priority == incident2.priority:
similarity += 0.1
return min(similarity, 1.0)
def _get_confidence_level(self, score: float) -> str:
"""Convert confidence score to confidence level"""
if score >= 0.8:
return 'VERY_HIGH'
elif score >= 0.6:
return 'HIGH'
elif score >= 0.4:
return 'MEDIUM'
else:
return 'LOW'
def _save_recommendation(self, incident: Incident, rec_data: Dict[str, Any]) -> IncidentRecommendation:
"""Save recommendation to database"""
# Get related objects
related_incident = None
if 'related_incident_id' in rec_data:
try:
related_incident = Incident.objects.get(id=rec_data['related_incident_id'])
except Incident.DoesNotExist:
pass
knowledge_article = None
if 'knowledge_article_id' in rec_data:
try:
knowledge_article = KnowledgeBaseArticle.objects.get(id=rec_data['knowledge_article_id'])
except KnowledgeBaseArticle.DoesNotExist:
pass
suggested_expert = None
if 'suggested_expert_id' in rec_data:
try:
from django.contrib.auth import get_user_model
User = get_user_model()
suggested_expert = User.objects.get(id=rec_data['suggested_expert_id'])
except User.DoesNotExist:
pass
# Create recommendation
recommendation = IncidentRecommendation.objects.create(
incident=incident,
recommendation_type=rec_data['recommendation_type'],
title=rec_data['title'],
description=rec_data['description'],
similarity_score=rec_data['similarity_score'],
confidence_level=rec_data['confidence_level'],
confidence_score=rec_data['confidence_score'],
related_incident=related_incident,
knowledge_article=knowledge_article,
suggested_expert=suggested_expert,
suggested_actions=rec_data.get('suggested_actions', []),
expected_outcome=rec_data.get('expected_outcome', ''),
reasoning=rec_data['reasoning'],
matching_factors=rec_data.get('matching_factors', []),
model_version=self.model_version
)
return recommendation