Files
ETB/ETB-API/incident_intelligence/ai/duplication.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

517 lines
20 KiB
Python

"""
Duplication detection engine for identifying and merging duplicate incidents
"""
import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from .classification import IncidentClassifier
@dataclass
class DuplicationResult:
"""Result of duplication detection analysis"""
duplication_type: str
similarity_score: float
confidence_score: float
text_similarity: float
temporal_proximity: float
service_similarity: float
recommended_action: str
merge_confidence: float
reasoning: str
shared_elements: List[str]
class DuplicationDetector:
"""
AI-driven duplication detector for identifying duplicate incidents
"""
def __init__(self):
self.model_version = "v1.0"
self.classifier = IncidentClassifier()
# Duplication thresholds
self.duplication_thresholds = {
'EXACT': 0.95,
'NEAR_DUPLICATE': 0.85,
'SIMILAR': 0.70,
'POTENTIAL_DUPLICATE': 0.50
}
# Action thresholds
self.action_thresholds = {
'MERGE': 0.90,
'LINK': 0.75,
'REVIEW': 0.60,
'NO_ACTION': 0.0
}
# Time windows for temporal proximity
self.time_windows = {
'EXACT': timedelta(minutes=30),
'NEAR_DUPLICATE': timedelta(hours=2),
'SIMILAR': timedelta(hours=24),
'POTENTIAL_DUPLICATE': timedelta(days=7)
}
def detect_duplication(self, incident_a: Dict, incident_b: Dict) -> Optional[DuplicationResult]:
"""
Detect if two incidents are duplicates
"""
# Calculate various similarity metrics
text_similarity = self._calculate_text_similarity(incident_a, incident_b)
temporal_proximity = self._calculate_temporal_proximity(incident_a, incident_b)
service_similarity = self._calculate_service_similarity(incident_a, incident_b)
metadata_similarity = self._calculate_metadata_similarity(incident_a, incident_b)
# Calculate overall similarity score
overall_similarity = (
text_similarity * 0.5 +
temporal_proximity * 0.2 +
service_similarity * 0.2 +
metadata_similarity * 0.1
)
# Determine duplication type
duplication_type = self._determine_duplication_type(overall_similarity, text_similarity, temporal_proximity)
if duplication_type == 'NO_DUPLICATE':
return None
# Calculate confidence score
confidence_score = self._calculate_confidence_score(
overall_similarity, text_similarity, temporal_proximity, service_similarity
)
# Determine recommended action
recommended_action = self._determine_recommended_action(confidence_score, duplication_type)
# Calculate merge confidence
merge_confidence = self._calculate_merge_confidence(
confidence_score, duplication_type, incident_a, incident_b
)
# Generate reasoning
reasoning = self._generate_reasoning(
duplication_type, text_similarity, temporal_proximity, service_similarity
)
# Extract shared elements
shared_elements = self._extract_shared_elements(incident_a, incident_b)
return DuplicationResult(
duplication_type=duplication_type,
similarity_score=overall_similarity,
confidence_score=confidence_score,
text_similarity=text_similarity,
temporal_proximity=temporal_proximity,
service_similarity=service_similarity,
recommended_action=recommended_action,
merge_confidence=merge_confidence,
reasoning=reasoning,
shared_elements=shared_elements
)
def _calculate_text_similarity(self, incident_a: Dict, incident_b: Dict) -> float:
"""Calculate text similarity between incidents"""
# Combine all text fields
text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')} {incident_a.get('free_text', '')}".lower()
text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')} {incident_b.get('free_text', '')}".lower()
# Calculate multiple similarity metrics
jaccard_similarity = self._calculate_jaccard_similarity(text_a, text_b)
cosine_similarity = self._calculate_cosine_similarity(text_a, text_b)
phrase_similarity = self._calculate_phrase_similarity(text_a, text_b)
semantic_similarity = self._calculate_semantic_similarity(text_a, text_b)
# Weighted combination
return (
jaccard_similarity * 0.3 +
cosine_similarity * 0.3 +
phrase_similarity * 0.2 +
semantic_similarity * 0.2
)
def _calculate_jaccard_similarity(self, text_a: str, text_b: str) -> float:
"""Calculate Jaccard similarity based on word sets"""
words_a = set(text_a.split())
words_b = set(text_b.split())
if not words_a or not words_b:
return 0.0
intersection = len(words_a.intersection(words_b))
union = len(words_a.union(words_b))
return intersection / union if union > 0 else 0.0
def _calculate_cosine_similarity(self, text_a: str, text_b: str) -> float:
"""Calculate cosine similarity based on word frequency"""
from collections import Counter
words_a = Counter(text_a.split())
words_b = Counter(text_b.split())
# Get all unique words
all_words = set(words_a.keys()) | set(words_b.keys())
if not all_words:
return 0.0
# Create vectors
vector_a = [words_a.get(word, 0) for word in all_words]
vector_b = [words_b.get(word, 0) for word in all_words]
# Calculate cosine similarity
dot_product = sum(a * b for a, b in zip(vector_a, vector_b))
magnitude_a = sum(a * a for a in vector_a) ** 0.5
magnitude_b = sum(b * b for b in vector_b) ** 0.5
if magnitude_a == 0 or magnitude_b == 0:
return 0.0
return dot_product / (magnitude_a * magnitude_b)
def _calculate_phrase_similarity(self, text_a: str, text_b: str) -> float:
"""Calculate similarity based on common phrases"""
# Extract 2-3 word phrases
phrases_a = set()
phrases_b = set()
words_a = text_a.split()
words_b = text_b.split()
# Extract 2-word phrases
for i in range(len(words_a) - 1):
phrases_a.add(f"{words_a[i]} {words_a[i+1]}")
for i in range(len(words_b) - 1):
phrases_b.add(f"{words_b[i]} {words_b[i+1]}")
# Extract 3-word phrases
for i in range(len(words_a) - 2):
phrases_a.add(f"{words_a[i]} {words_a[i+1]} {words_a[i+2]}")
for i in range(len(words_b) - 2):
phrases_b.add(f"{words_b[i]} {words_b[i+1]} {words_b[i+2]}")
if not phrases_a or not phrases_b:
return 0.0
intersection = len(phrases_a.intersection(phrases_b))
union = len(phrases_a.union(phrases_b))
return intersection / union if union > 0 else 0.0
def _calculate_semantic_similarity(self, text_a: str, text_b: str) -> float:
"""Calculate semantic similarity using keyword analysis"""
# Extract keywords using the classifier
keywords_a = set(self.classifier._extract_keywords(text_a))
keywords_b = set(self.classifier._extract_keywords(text_b))
if not keywords_a or not keywords_b:
return 0.0
# Calculate semantic similarity based on keyword overlap
intersection = len(keywords_a.intersection(keywords_b))
union = len(keywords_a.union(keywords_b))
base_similarity = intersection / union if union > 0 else 0.0
# Boost similarity for technical terms
technical_terms = {
'error', 'exception', 'timeout', 'connection', 'database', 'server',
'api', 'service', 'application', 'network', 'storage', 'memory',
'cpu', 'disk', 'bandwidth', 'latency', 'performance', 'crash'
}
technical_intersection = len(keywords_a.intersection(keywords_b).intersection(technical_terms))
if technical_intersection > 0:
base_similarity += 0.1 * technical_intersection
return min(base_similarity, 1.0)
def _calculate_temporal_proximity(self, incident_a: Dict, incident_b: Dict) -> float:
"""Calculate temporal proximity between incidents"""
created_a = incident_a.get('created_at')
created_b = incident_b.get('created_at')
if not created_a or not created_b:
return 0.0
# Convert to datetime if needed
if isinstance(created_a, str):
created_a = datetime.fromisoformat(created_a.replace('Z', '+00:00'))
if isinstance(created_b, str):
created_b = datetime.fromisoformat(created_b.replace('Z', '+00:00'))
time_diff = abs((created_a - created_b).total_seconds())
# Calculate proximity score based on time difference
if time_diff <= 300: # 5 minutes
return 1.0
elif time_diff <= 1800: # 30 minutes
return 0.9
elif time_diff <= 3600: # 1 hour
return 0.7
elif time_diff <= 7200: # 2 hours
return 0.5
elif time_diff <= 86400: # 24 hours
return 0.3
elif time_diff <= 604800: # 7 days
return 0.1
else:
return 0.0
def _calculate_service_similarity(self, incident_a: Dict, incident_b: Dict) -> float:
"""Calculate service/component similarity"""
# Extract service information from text
text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')}".lower()
text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')}".lower()
# Service/component keywords
service_keywords = [
'api', 'service', 'database', 'server', 'application', 'website', 'mobile',
'frontend', 'backend', 'microservice', 'gateway', 'load balancer', 'cache',
'queue', 'message', 'notification', 'email', 'sms', 'payment', 'auth',
'user service', 'order service', 'payment service', 'notification service'
]
services_a = set()
services_b = set()
for keyword in service_keywords:
if keyword in text_a:
services_a.add(keyword)
if keyword in text_b:
services_b.add(keyword)
if not services_a or not services_b:
return 0.0
intersection = len(services_a.intersection(services_b))
union = len(services_a.union(services_b))
return intersection / union if union > 0 else 0.0
def _calculate_metadata_similarity(self, incident_a: Dict, incident_b: Dict) -> float:
"""Calculate similarity based on metadata fields"""
similarity_score = 0.0
total_fields = 0
# Compare severity
if incident_a.get('severity') == incident_b.get('severity'):
similarity_score += 1.0
total_fields += 1
# Compare status
if incident_a.get('status') == incident_b.get('status'):
similarity_score += 1.0
total_fields += 1
# Compare category
if incident_a.get('category') == incident_b.get('category'):
similarity_score += 1.0
total_fields += 1
# Compare assigned user
if incident_a.get('assigned_to') == incident_b.get('assigned_to'):
similarity_score += 1.0
total_fields += 1
# Compare reporter
if incident_a.get('reporter') == incident_b.get('reporter'):
similarity_score += 1.0
total_fields += 1
return similarity_score / total_fields if total_fields > 0 else 0.0
def _determine_duplication_type(self, overall_similarity: float, text_similarity: float,
temporal_proximity: float) -> str:
"""Determine the type of duplication"""
if overall_similarity >= self.duplication_thresholds['EXACT']:
return 'EXACT'
elif overall_similarity >= self.duplication_thresholds['NEAR_DUPLICATE']:
return 'NEAR_DUPLICATE'
elif overall_similarity >= self.duplication_thresholds['SIMILAR']:
return 'SIMILAR'
elif overall_similarity >= self.duplication_thresholds['POTENTIAL_DUPLICATE']:
return 'POTENTIAL_DUPLICATE'
else:
return 'NO_DUPLICATE'
def _calculate_confidence_score(self, overall_similarity: float, text_similarity: float,
temporal_proximity: float, service_similarity: float) -> float:
"""Calculate confidence score for duplication detection"""
base_confidence = overall_similarity
# Boost confidence for high text similarity
if text_similarity > 0.8:
base_confidence += 0.1
# Boost confidence for high temporal proximity
if temporal_proximity > 0.8:
base_confidence += 0.1
# Boost confidence for high service similarity
if service_similarity > 0.8:
base_confidence += 0.05
return min(base_confidence, 1.0)
def _determine_recommended_action(self, confidence_score: float, duplication_type: str) -> str:
"""Determine recommended action based on confidence and duplication type"""
if confidence_score >= self.action_thresholds['MERGE']:
return 'MERGE'
elif confidence_score >= self.action_thresholds['LINK']:
return 'LINK'
elif confidence_score >= self.action_thresholds['REVIEW']:
return 'REVIEW'
else:
return 'NO_ACTION'
def _calculate_merge_confidence(self, confidence_score: float, duplication_type: str,
incident_a: Dict, incident_b: Dict) -> float:
"""Calculate confidence for merging incidents"""
merge_confidence = confidence_score
# Adjust based on duplication type
type_adjustments = {
'EXACT': 0.1,
'NEAR_DUPLICATE': 0.05,
'SIMILAR': 0.0,
'POTENTIAL_DUPLICATE': -0.1
}
merge_confidence += type_adjustments.get(duplication_type, 0.0)
# Adjust based on incident status
if incident_a.get('status') == incident_b.get('status'):
merge_confidence += 0.05
# Adjust based on severity
if incident_a.get('severity') == incident_b.get('severity'):
merge_confidence += 0.03
return min(max(merge_confidence, 0.0), 1.0)
def _generate_reasoning(self, duplication_type: str, text_similarity: float,
temporal_proximity: float, service_similarity: float) -> str:
"""Generate human-readable reasoning for duplication detection"""
reasoning_parts = []
if text_similarity > 0.8:
reasoning_parts.append(f"Very high text similarity ({text_similarity:.1%})")
elif text_similarity > 0.6:
reasoning_parts.append(f"High text similarity ({text_similarity:.1%})")
elif text_similarity > 0.4:
reasoning_parts.append(f"Moderate text similarity ({text_similarity:.1%})")
if temporal_proximity > 0.8:
reasoning_parts.append(f"Very close temporal proximity ({temporal_proximity:.1%})")
elif temporal_proximity > 0.6:
reasoning_parts.append(f"Close temporal proximity ({temporal_proximity:.1%})")
if service_similarity > 0.8:
reasoning_parts.append(f"Very high service similarity ({service_similarity:.1%})")
elif service_similarity > 0.6:
reasoning_parts.append(f"High service similarity ({service_similarity:.1%})")
if duplication_type == 'EXACT':
reasoning_parts.append("Incidents appear to be exact duplicates")
elif duplication_type == 'NEAR_DUPLICATE':
reasoning_parts.append("Incidents appear to be near duplicates")
elif duplication_type == 'SIMILAR':
reasoning_parts.append("Incidents appear to be similar")
elif duplication_type == 'POTENTIAL_DUPLICATE':
reasoning_parts.append("Incidents may be duplicates")
return "; ".join(reasoning_parts) if reasoning_parts else "Based on overall similarity analysis"
def _extract_shared_elements(self, incident_a: Dict, incident_b: Dict) -> List[str]:
"""Extract elements shared between incidents"""
shared_elements = []
# Shared keywords
text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')}".lower()
text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')}".lower()
keywords_a = set(self.classifier._extract_keywords(text_a))
keywords_b = set(self.classifier._extract_keywords(text_b))
shared_keywords = keywords_a.intersection(keywords_b)
if shared_keywords:
shared_elements.append(f"Keywords: {', '.join(list(shared_keywords)[:5])}")
# Shared services
service_keywords = [
'api', 'service', 'database', 'server', 'application', 'website', 'mobile'
]
services_a = set()
services_b = set()
for keyword in service_keywords:
if keyword in text_a:
services_a.add(keyword)
if keyword in text_b:
services_b.add(keyword)
shared_services = services_a.intersection(services_b)
if shared_services:
shared_elements.append(f"Services: {', '.join(shared_services)}")
# Shared metadata
if incident_a.get('severity') == incident_b.get('severity'):
shared_elements.append(f"Severity: {incident_a.get('severity')}")
if incident_a.get('category') == incident_b.get('category'):
shared_elements.append(f"Category: {incident_a.get('category')}")
if incident_a.get('status') == incident_b.get('status'):
shared_elements.append(f"Status: {incident_a.get('status')}")
return shared_elements
def find_duplicate_candidates(self, target_incident: Dict, all_incidents: List[Dict],
limit: int = 10) -> List[Tuple[Dict, DuplicationResult]]:
"""Find incidents that might be duplicates of the target incident"""
candidates = []
for incident in all_incidents:
if incident['id'] == target_incident['id']:
continue
duplication = self.detect_duplication(target_incident, incident)
if duplication:
candidates.append((incident, duplication))
# Sort by confidence score and return top results
candidates.sort(key=lambda x: x[1].confidence_score, reverse=True)
return candidates[:limit]
def batch_detect_duplicates(self, incidents: List[Dict]) -> List[Tuple[Dict, Dict, DuplicationResult]]:
"""Batch detect duplicates in a list of incidents"""
duplicates = []
processed_pairs = set()
for i, incident_a in enumerate(incidents):
for j, incident_b in enumerate(incidents[i+1:], i+1):
# Create a unique pair identifier
pair_id = tuple(sorted([incident_a['id'], incident_b['id']]))
if pair_id in processed_pairs:
continue
processed_pairs.add(pair_id)
duplication = self.detect_duplication(incident_a, incident_b)
if duplication:
duplicates.append((incident_a, incident_b, duplication))
# Sort by confidence score
duplicates.sort(key=lambda x: x[2].confidence_score, reverse=True)
return duplicates