517 lines
20 KiB
Python
517 lines
20 KiB
Python
"""
|
|
Duplication detection engine for identifying and merging duplicate incidents
|
|
"""
|
|
import time
|
|
from typing import Dict, List, Tuple, Optional
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from .classification import IncidentClassifier
|
|
|
|
|
|
@dataclass
|
|
class DuplicationResult:
|
|
"""Result of duplication detection analysis"""
|
|
duplication_type: str
|
|
similarity_score: float
|
|
confidence_score: float
|
|
text_similarity: float
|
|
temporal_proximity: float
|
|
service_similarity: float
|
|
recommended_action: str
|
|
merge_confidence: float
|
|
reasoning: str
|
|
shared_elements: List[str]
|
|
|
|
|
|
class DuplicationDetector:
|
|
"""
|
|
AI-driven duplication detector for identifying duplicate incidents
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.model_version = "v1.0"
|
|
self.classifier = IncidentClassifier()
|
|
|
|
# Duplication thresholds
|
|
self.duplication_thresholds = {
|
|
'EXACT': 0.95,
|
|
'NEAR_DUPLICATE': 0.85,
|
|
'SIMILAR': 0.70,
|
|
'POTENTIAL_DUPLICATE': 0.50
|
|
}
|
|
|
|
# Action thresholds
|
|
self.action_thresholds = {
|
|
'MERGE': 0.90,
|
|
'LINK': 0.75,
|
|
'REVIEW': 0.60,
|
|
'NO_ACTION': 0.0
|
|
}
|
|
|
|
# Time windows for temporal proximity
|
|
self.time_windows = {
|
|
'EXACT': timedelta(minutes=30),
|
|
'NEAR_DUPLICATE': timedelta(hours=2),
|
|
'SIMILAR': timedelta(hours=24),
|
|
'POTENTIAL_DUPLICATE': timedelta(days=7)
|
|
}
|
|
|
|
def detect_duplication(self, incident_a: Dict, incident_b: Dict) -> Optional[DuplicationResult]:
|
|
"""
|
|
Detect if two incidents are duplicates
|
|
"""
|
|
# Calculate various similarity metrics
|
|
text_similarity = self._calculate_text_similarity(incident_a, incident_b)
|
|
temporal_proximity = self._calculate_temporal_proximity(incident_a, incident_b)
|
|
service_similarity = self._calculate_service_similarity(incident_a, incident_b)
|
|
metadata_similarity = self._calculate_metadata_similarity(incident_a, incident_b)
|
|
|
|
# Calculate overall similarity score
|
|
overall_similarity = (
|
|
text_similarity * 0.5 +
|
|
temporal_proximity * 0.2 +
|
|
service_similarity * 0.2 +
|
|
metadata_similarity * 0.1
|
|
)
|
|
|
|
# Determine duplication type
|
|
duplication_type = self._determine_duplication_type(overall_similarity, text_similarity, temporal_proximity)
|
|
|
|
if duplication_type == 'NO_DUPLICATE':
|
|
return None
|
|
|
|
# Calculate confidence score
|
|
confidence_score = self._calculate_confidence_score(
|
|
overall_similarity, text_similarity, temporal_proximity, service_similarity
|
|
)
|
|
|
|
# Determine recommended action
|
|
recommended_action = self._determine_recommended_action(confidence_score, duplication_type)
|
|
|
|
# Calculate merge confidence
|
|
merge_confidence = self._calculate_merge_confidence(
|
|
confidence_score, duplication_type, incident_a, incident_b
|
|
)
|
|
|
|
# Generate reasoning
|
|
reasoning = self._generate_reasoning(
|
|
duplication_type, text_similarity, temporal_proximity, service_similarity
|
|
)
|
|
|
|
# Extract shared elements
|
|
shared_elements = self._extract_shared_elements(incident_a, incident_b)
|
|
|
|
return DuplicationResult(
|
|
duplication_type=duplication_type,
|
|
similarity_score=overall_similarity,
|
|
confidence_score=confidence_score,
|
|
text_similarity=text_similarity,
|
|
temporal_proximity=temporal_proximity,
|
|
service_similarity=service_similarity,
|
|
recommended_action=recommended_action,
|
|
merge_confidence=merge_confidence,
|
|
reasoning=reasoning,
|
|
shared_elements=shared_elements
|
|
)
|
|
|
|
def _calculate_text_similarity(self, incident_a: Dict, incident_b: Dict) -> float:
|
|
"""Calculate text similarity between incidents"""
|
|
# Combine all text fields
|
|
text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')} {incident_a.get('free_text', '')}".lower()
|
|
text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')} {incident_b.get('free_text', '')}".lower()
|
|
|
|
# Calculate multiple similarity metrics
|
|
jaccard_similarity = self._calculate_jaccard_similarity(text_a, text_b)
|
|
cosine_similarity = self._calculate_cosine_similarity(text_a, text_b)
|
|
phrase_similarity = self._calculate_phrase_similarity(text_a, text_b)
|
|
semantic_similarity = self._calculate_semantic_similarity(text_a, text_b)
|
|
|
|
# Weighted combination
|
|
return (
|
|
jaccard_similarity * 0.3 +
|
|
cosine_similarity * 0.3 +
|
|
phrase_similarity * 0.2 +
|
|
semantic_similarity * 0.2
|
|
)
|
|
|
|
def _calculate_jaccard_similarity(self, text_a: str, text_b: str) -> float:
|
|
"""Calculate Jaccard similarity based on word sets"""
|
|
words_a = set(text_a.split())
|
|
words_b = set(text_b.split())
|
|
|
|
if not words_a or not words_b:
|
|
return 0.0
|
|
|
|
intersection = len(words_a.intersection(words_b))
|
|
union = len(words_a.union(words_b))
|
|
|
|
return intersection / union if union > 0 else 0.0
|
|
|
|
def _calculate_cosine_similarity(self, text_a: str, text_b: str) -> float:
|
|
"""Calculate cosine similarity based on word frequency"""
|
|
from collections import Counter
|
|
|
|
words_a = Counter(text_a.split())
|
|
words_b = Counter(text_b.split())
|
|
|
|
# Get all unique words
|
|
all_words = set(words_a.keys()) | set(words_b.keys())
|
|
|
|
if not all_words:
|
|
return 0.0
|
|
|
|
# Create vectors
|
|
vector_a = [words_a.get(word, 0) for word in all_words]
|
|
vector_b = [words_b.get(word, 0) for word in all_words]
|
|
|
|
# Calculate cosine similarity
|
|
dot_product = sum(a * b for a, b in zip(vector_a, vector_b))
|
|
magnitude_a = sum(a * a for a in vector_a) ** 0.5
|
|
magnitude_b = sum(b * b for b in vector_b) ** 0.5
|
|
|
|
if magnitude_a == 0 or magnitude_b == 0:
|
|
return 0.0
|
|
|
|
return dot_product / (magnitude_a * magnitude_b)
|
|
|
|
def _calculate_phrase_similarity(self, text_a: str, text_b: str) -> float:
|
|
"""Calculate similarity based on common phrases"""
|
|
# Extract 2-3 word phrases
|
|
phrases_a = set()
|
|
phrases_b = set()
|
|
|
|
words_a = text_a.split()
|
|
words_b = text_b.split()
|
|
|
|
# Extract 2-word phrases
|
|
for i in range(len(words_a) - 1):
|
|
phrases_a.add(f"{words_a[i]} {words_a[i+1]}")
|
|
|
|
for i in range(len(words_b) - 1):
|
|
phrases_b.add(f"{words_b[i]} {words_b[i+1]}")
|
|
|
|
# Extract 3-word phrases
|
|
for i in range(len(words_a) - 2):
|
|
phrases_a.add(f"{words_a[i]} {words_a[i+1]} {words_a[i+2]}")
|
|
|
|
for i in range(len(words_b) - 2):
|
|
phrases_b.add(f"{words_b[i]} {words_b[i+1]} {words_b[i+2]}")
|
|
|
|
if not phrases_a or not phrases_b:
|
|
return 0.0
|
|
|
|
intersection = len(phrases_a.intersection(phrases_b))
|
|
union = len(phrases_a.union(phrases_b))
|
|
|
|
return intersection / union if union > 0 else 0.0
|
|
|
|
def _calculate_semantic_similarity(self, text_a: str, text_b: str) -> float:
|
|
"""Calculate semantic similarity using keyword analysis"""
|
|
# Extract keywords using the classifier
|
|
keywords_a = set(self.classifier._extract_keywords(text_a))
|
|
keywords_b = set(self.classifier._extract_keywords(text_b))
|
|
|
|
if not keywords_a or not keywords_b:
|
|
return 0.0
|
|
|
|
# Calculate semantic similarity based on keyword overlap
|
|
intersection = len(keywords_a.intersection(keywords_b))
|
|
union = len(keywords_a.union(keywords_b))
|
|
|
|
base_similarity = intersection / union if union > 0 else 0.0
|
|
|
|
# Boost similarity for technical terms
|
|
technical_terms = {
|
|
'error', 'exception', 'timeout', 'connection', 'database', 'server',
|
|
'api', 'service', 'application', 'network', 'storage', 'memory',
|
|
'cpu', 'disk', 'bandwidth', 'latency', 'performance', 'crash'
|
|
}
|
|
|
|
technical_intersection = len(keywords_a.intersection(keywords_b).intersection(technical_terms))
|
|
if technical_intersection > 0:
|
|
base_similarity += 0.1 * technical_intersection
|
|
|
|
return min(base_similarity, 1.0)
|
|
|
|
def _calculate_temporal_proximity(self, incident_a: Dict, incident_b: Dict) -> float:
|
|
"""Calculate temporal proximity between incidents"""
|
|
created_a = incident_a.get('created_at')
|
|
created_b = incident_b.get('created_at')
|
|
|
|
if not created_a or not created_b:
|
|
return 0.0
|
|
|
|
# Convert to datetime if needed
|
|
if isinstance(created_a, str):
|
|
created_a = datetime.fromisoformat(created_a.replace('Z', '+00:00'))
|
|
if isinstance(created_b, str):
|
|
created_b = datetime.fromisoformat(created_b.replace('Z', '+00:00'))
|
|
|
|
time_diff = abs((created_a - created_b).total_seconds())
|
|
|
|
# Calculate proximity score based on time difference
|
|
if time_diff <= 300: # 5 minutes
|
|
return 1.0
|
|
elif time_diff <= 1800: # 30 minutes
|
|
return 0.9
|
|
elif time_diff <= 3600: # 1 hour
|
|
return 0.7
|
|
elif time_diff <= 7200: # 2 hours
|
|
return 0.5
|
|
elif time_diff <= 86400: # 24 hours
|
|
return 0.3
|
|
elif time_diff <= 604800: # 7 days
|
|
return 0.1
|
|
else:
|
|
return 0.0
|
|
|
|
def _calculate_service_similarity(self, incident_a: Dict, incident_b: Dict) -> float:
|
|
"""Calculate service/component similarity"""
|
|
# Extract service information from text
|
|
text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')}".lower()
|
|
text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')}".lower()
|
|
|
|
# Service/component keywords
|
|
service_keywords = [
|
|
'api', 'service', 'database', 'server', 'application', 'website', 'mobile',
|
|
'frontend', 'backend', 'microservice', 'gateway', 'load balancer', 'cache',
|
|
'queue', 'message', 'notification', 'email', 'sms', 'payment', 'auth',
|
|
'user service', 'order service', 'payment service', 'notification service'
|
|
]
|
|
|
|
services_a = set()
|
|
services_b = set()
|
|
|
|
for keyword in service_keywords:
|
|
if keyword in text_a:
|
|
services_a.add(keyword)
|
|
if keyword in text_b:
|
|
services_b.add(keyword)
|
|
|
|
if not services_a or not services_b:
|
|
return 0.0
|
|
|
|
intersection = len(services_a.intersection(services_b))
|
|
union = len(services_a.union(services_b))
|
|
|
|
return intersection / union if union > 0 else 0.0
|
|
|
|
def _calculate_metadata_similarity(self, incident_a: Dict, incident_b: Dict) -> float:
|
|
"""Calculate similarity based on metadata fields"""
|
|
similarity_score = 0.0
|
|
total_fields = 0
|
|
|
|
# Compare severity
|
|
if incident_a.get('severity') == incident_b.get('severity'):
|
|
similarity_score += 1.0
|
|
total_fields += 1
|
|
|
|
# Compare status
|
|
if incident_a.get('status') == incident_b.get('status'):
|
|
similarity_score += 1.0
|
|
total_fields += 1
|
|
|
|
# Compare category
|
|
if incident_a.get('category') == incident_b.get('category'):
|
|
similarity_score += 1.0
|
|
total_fields += 1
|
|
|
|
# Compare assigned user
|
|
if incident_a.get('assigned_to') == incident_b.get('assigned_to'):
|
|
similarity_score += 1.0
|
|
total_fields += 1
|
|
|
|
# Compare reporter
|
|
if incident_a.get('reporter') == incident_b.get('reporter'):
|
|
similarity_score += 1.0
|
|
total_fields += 1
|
|
|
|
return similarity_score / total_fields if total_fields > 0 else 0.0
|
|
|
|
def _determine_duplication_type(self, overall_similarity: float, text_similarity: float,
|
|
temporal_proximity: float) -> str:
|
|
"""Determine the type of duplication"""
|
|
if overall_similarity >= self.duplication_thresholds['EXACT']:
|
|
return 'EXACT'
|
|
elif overall_similarity >= self.duplication_thresholds['NEAR_DUPLICATE']:
|
|
return 'NEAR_DUPLICATE'
|
|
elif overall_similarity >= self.duplication_thresholds['SIMILAR']:
|
|
return 'SIMILAR'
|
|
elif overall_similarity >= self.duplication_thresholds['POTENTIAL_DUPLICATE']:
|
|
return 'POTENTIAL_DUPLICATE'
|
|
else:
|
|
return 'NO_DUPLICATE'
|
|
|
|
def _calculate_confidence_score(self, overall_similarity: float, text_similarity: float,
|
|
temporal_proximity: float, service_similarity: float) -> float:
|
|
"""Calculate confidence score for duplication detection"""
|
|
base_confidence = overall_similarity
|
|
|
|
# Boost confidence for high text similarity
|
|
if text_similarity > 0.8:
|
|
base_confidence += 0.1
|
|
|
|
# Boost confidence for high temporal proximity
|
|
if temporal_proximity > 0.8:
|
|
base_confidence += 0.1
|
|
|
|
# Boost confidence for high service similarity
|
|
if service_similarity > 0.8:
|
|
base_confidence += 0.05
|
|
|
|
return min(base_confidence, 1.0)
|
|
|
|
def _determine_recommended_action(self, confidence_score: float, duplication_type: str) -> str:
|
|
"""Determine recommended action based on confidence and duplication type"""
|
|
if confidence_score >= self.action_thresholds['MERGE']:
|
|
return 'MERGE'
|
|
elif confidence_score >= self.action_thresholds['LINK']:
|
|
return 'LINK'
|
|
elif confidence_score >= self.action_thresholds['REVIEW']:
|
|
return 'REVIEW'
|
|
else:
|
|
return 'NO_ACTION'
|
|
|
|
def _calculate_merge_confidence(self, confidence_score: float, duplication_type: str,
|
|
incident_a: Dict, incident_b: Dict) -> float:
|
|
"""Calculate confidence for merging incidents"""
|
|
merge_confidence = confidence_score
|
|
|
|
# Adjust based on duplication type
|
|
type_adjustments = {
|
|
'EXACT': 0.1,
|
|
'NEAR_DUPLICATE': 0.05,
|
|
'SIMILAR': 0.0,
|
|
'POTENTIAL_DUPLICATE': -0.1
|
|
}
|
|
|
|
merge_confidence += type_adjustments.get(duplication_type, 0.0)
|
|
|
|
# Adjust based on incident status
|
|
if incident_a.get('status') == incident_b.get('status'):
|
|
merge_confidence += 0.05
|
|
|
|
# Adjust based on severity
|
|
if incident_a.get('severity') == incident_b.get('severity'):
|
|
merge_confidence += 0.03
|
|
|
|
return min(max(merge_confidence, 0.0), 1.0)
|
|
|
|
def _generate_reasoning(self, duplication_type: str, text_similarity: float,
|
|
temporal_proximity: float, service_similarity: float) -> str:
|
|
"""Generate human-readable reasoning for duplication detection"""
|
|
reasoning_parts = []
|
|
|
|
if text_similarity > 0.8:
|
|
reasoning_parts.append(f"Very high text similarity ({text_similarity:.1%})")
|
|
elif text_similarity > 0.6:
|
|
reasoning_parts.append(f"High text similarity ({text_similarity:.1%})")
|
|
elif text_similarity > 0.4:
|
|
reasoning_parts.append(f"Moderate text similarity ({text_similarity:.1%})")
|
|
|
|
if temporal_proximity > 0.8:
|
|
reasoning_parts.append(f"Very close temporal proximity ({temporal_proximity:.1%})")
|
|
elif temporal_proximity > 0.6:
|
|
reasoning_parts.append(f"Close temporal proximity ({temporal_proximity:.1%})")
|
|
|
|
if service_similarity > 0.8:
|
|
reasoning_parts.append(f"Very high service similarity ({service_similarity:.1%})")
|
|
elif service_similarity > 0.6:
|
|
reasoning_parts.append(f"High service similarity ({service_similarity:.1%})")
|
|
|
|
if duplication_type == 'EXACT':
|
|
reasoning_parts.append("Incidents appear to be exact duplicates")
|
|
elif duplication_type == 'NEAR_DUPLICATE':
|
|
reasoning_parts.append("Incidents appear to be near duplicates")
|
|
elif duplication_type == 'SIMILAR':
|
|
reasoning_parts.append("Incidents appear to be similar")
|
|
elif duplication_type == 'POTENTIAL_DUPLICATE':
|
|
reasoning_parts.append("Incidents may be duplicates")
|
|
|
|
return "; ".join(reasoning_parts) if reasoning_parts else "Based on overall similarity analysis"
|
|
|
|
def _extract_shared_elements(self, incident_a: Dict, incident_b: Dict) -> List[str]:
|
|
"""Extract elements shared between incidents"""
|
|
shared_elements = []
|
|
|
|
# Shared keywords
|
|
text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')}".lower()
|
|
text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')}".lower()
|
|
|
|
keywords_a = set(self.classifier._extract_keywords(text_a))
|
|
keywords_b = set(self.classifier._extract_keywords(text_b))
|
|
shared_keywords = keywords_a.intersection(keywords_b)
|
|
|
|
if shared_keywords:
|
|
shared_elements.append(f"Keywords: {', '.join(list(shared_keywords)[:5])}")
|
|
|
|
# Shared services
|
|
service_keywords = [
|
|
'api', 'service', 'database', 'server', 'application', 'website', 'mobile'
|
|
]
|
|
|
|
services_a = set()
|
|
services_b = set()
|
|
|
|
for keyword in service_keywords:
|
|
if keyword in text_a:
|
|
services_a.add(keyword)
|
|
if keyword in text_b:
|
|
services_b.add(keyword)
|
|
|
|
shared_services = services_a.intersection(services_b)
|
|
if shared_services:
|
|
shared_elements.append(f"Services: {', '.join(shared_services)}")
|
|
|
|
# Shared metadata
|
|
if incident_a.get('severity') == incident_b.get('severity'):
|
|
shared_elements.append(f"Severity: {incident_a.get('severity')}")
|
|
|
|
if incident_a.get('category') == incident_b.get('category'):
|
|
shared_elements.append(f"Category: {incident_a.get('category')}")
|
|
|
|
if incident_a.get('status') == incident_b.get('status'):
|
|
shared_elements.append(f"Status: {incident_a.get('status')}")
|
|
|
|
return shared_elements
|
|
|
|
def find_duplicate_candidates(self, target_incident: Dict, all_incidents: List[Dict],
|
|
limit: int = 10) -> List[Tuple[Dict, DuplicationResult]]:
|
|
"""Find incidents that might be duplicates of the target incident"""
|
|
candidates = []
|
|
|
|
for incident in all_incidents:
|
|
if incident['id'] == target_incident['id']:
|
|
continue
|
|
|
|
duplication = self.detect_duplication(target_incident, incident)
|
|
if duplication:
|
|
candidates.append((incident, duplication))
|
|
|
|
# Sort by confidence score and return top results
|
|
candidates.sort(key=lambda x: x[1].confidence_score, reverse=True)
|
|
return candidates[:limit]
|
|
|
|
def batch_detect_duplicates(self, incidents: List[Dict]) -> List[Tuple[Dict, Dict, DuplicationResult]]:
|
|
"""Batch detect duplicates in a list of incidents"""
|
|
duplicates = []
|
|
processed_pairs = set()
|
|
|
|
for i, incident_a in enumerate(incidents):
|
|
for j, incident_b in enumerate(incidents[i+1:], i+1):
|
|
# Create a unique pair identifier
|
|
pair_id = tuple(sorted([incident_a['id'], incident_b['id']]))
|
|
|
|
if pair_id in processed_pairs:
|
|
continue
|
|
|
|
processed_pairs.add(pair_id)
|
|
|
|
duplication = self.detect_duplication(incident_a, incident_b)
|
|
if duplication:
|
|
duplicates.append((incident_a, incident_b, duplication))
|
|
|
|
# Sort by confidence score
|
|
duplicates.sort(key=lambda x: x[2].confidence_score, reverse=True)
|
|
return duplicates
|