""" Duplication detection engine for identifying and merging duplicate incidents """ import time from typing import Dict, List, Tuple, Optional from dataclasses import dataclass from datetime import datetime, timedelta from .classification import IncidentClassifier @dataclass class DuplicationResult: """Result of duplication detection analysis""" duplication_type: str similarity_score: float confidence_score: float text_similarity: float temporal_proximity: float service_similarity: float recommended_action: str merge_confidence: float reasoning: str shared_elements: List[str] class DuplicationDetector: """ AI-driven duplication detector for identifying duplicate incidents """ def __init__(self): self.model_version = "v1.0" self.classifier = IncidentClassifier() # Duplication thresholds self.duplication_thresholds = { 'EXACT': 0.95, 'NEAR_DUPLICATE': 0.85, 'SIMILAR': 0.70, 'POTENTIAL_DUPLICATE': 0.50 } # Action thresholds self.action_thresholds = { 'MERGE': 0.90, 'LINK': 0.75, 'REVIEW': 0.60, 'NO_ACTION': 0.0 } # Time windows for temporal proximity self.time_windows = { 'EXACT': timedelta(minutes=30), 'NEAR_DUPLICATE': timedelta(hours=2), 'SIMILAR': timedelta(hours=24), 'POTENTIAL_DUPLICATE': timedelta(days=7) } def detect_duplication(self, incident_a: Dict, incident_b: Dict) -> Optional[DuplicationResult]: """ Detect if two incidents are duplicates """ # Calculate various similarity metrics text_similarity = self._calculate_text_similarity(incident_a, incident_b) temporal_proximity = self._calculate_temporal_proximity(incident_a, incident_b) service_similarity = self._calculate_service_similarity(incident_a, incident_b) metadata_similarity = self._calculate_metadata_similarity(incident_a, incident_b) # Calculate overall similarity score overall_similarity = ( text_similarity * 0.5 + temporal_proximity * 0.2 + service_similarity * 0.2 + metadata_similarity * 0.1 ) # Determine duplication type duplication_type = self._determine_duplication_type(overall_similarity, text_similarity, temporal_proximity) if duplication_type == 'NO_DUPLICATE': return None # Calculate confidence score confidence_score = self._calculate_confidence_score( overall_similarity, text_similarity, temporal_proximity, service_similarity ) # Determine recommended action recommended_action = self._determine_recommended_action(confidence_score, duplication_type) # Calculate merge confidence merge_confidence = self._calculate_merge_confidence( confidence_score, duplication_type, incident_a, incident_b ) # Generate reasoning reasoning = self._generate_reasoning( duplication_type, text_similarity, temporal_proximity, service_similarity ) # Extract shared elements shared_elements = self._extract_shared_elements(incident_a, incident_b) return DuplicationResult( duplication_type=duplication_type, similarity_score=overall_similarity, confidence_score=confidence_score, text_similarity=text_similarity, temporal_proximity=temporal_proximity, service_similarity=service_similarity, recommended_action=recommended_action, merge_confidence=merge_confidence, reasoning=reasoning, shared_elements=shared_elements ) def _calculate_text_similarity(self, incident_a: Dict, incident_b: Dict) -> float: """Calculate text similarity between incidents""" # Combine all text fields text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')} {incident_a.get('free_text', '')}".lower() text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')} {incident_b.get('free_text', '')}".lower() # Calculate multiple similarity metrics jaccard_similarity = self._calculate_jaccard_similarity(text_a, text_b) cosine_similarity = self._calculate_cosine_similarity(text_a, text_b) phrase_similarity = self._calculate_phrase_similarity(text_a, text_b) semantic_similarity = self._calculate_semantic_similarity(text_a, text_b) # Weighted combination return ( jaccard_similarity * 0.3 + cosine_similarity * 0.3 + phrase_similarity * 0.2 + semantic_similarity * 0.2 ) def _calculate_jaccard_similarity(self, text_a: str, text_b: str) -> float: """Calculate Jaccard similarity based on word sets""" words_a = set(text_a.split()) words_b = set(text_b.split()) if not words_a or not words_b: return 0.0 intersection = len(words_a.intersection(words_b)) union = len(words_a.union(words_b)) return intersection / union if union > 0 else 0.0 def _calculate_cosine_similarity(self, text_a: str, text_b: str) -> float: """Calculate cosine similarity based on word frequency""" from collections import Counter words_a = Counter(text_a.split()) words_b = Counter(text_b.split()) # Get all unique words all_words = set(words_a.keys()) | set(words_b.keys()) if not all_words: return 0.0 # Create vectors vector_a = [words_a.get(word, 0) for word in all_words] vector_b = [words_b.get(word, 0) for word in all_words] # Calculate cosine similarity dot_product = sum(a * b for a, b in zip(vector_a, vector_b)) magnitude_a = sum(a * a for a in vector_a) ** 0.5 magnitude_b = sum(b * b for b in vector_b) ** 0.5 if magnitude_a == 0 or magnitude_b == 0: return 0.0 return dot_product / (magnitude_a * magnitude_b) def _calculate_phrase_similarity(self, text_a: str, text_b: str) -> float: """Calculate similarity based on common phrases""" # Extract 2-3 word phrases phrases_a = set() phrases_b = set() words_a = text_a.split() words_b = text_b.split() # Extract 2-word phrases for i in range(len(words_a) - 1): phrases_a.add(f"{words_a[i]} {words_a[i+1]}") for i in range(len(words_b) - 1): phrases_b.add(f"{words_b[i]} {words_b[i+1]}") # Extract 3-word phrases for i in range(len(words_a) - 2): phrases_a.add(f"{words_a[i]} {words_a[i+1]} {words_a[i+2]}") for i in range(len(words_b) - 2): phrases_b.add(f"{words_b[i]} {words_b[i+1]} {words_b[i+2]}") if not phrases_a or not phrases_b: return 0.0 intersection = len(phrases_a.intersection(phrases_b)) union = len(phrases_a.union(phrases_b)) return intersection / union if union > 0 else 0.0 def _calculate_semantic_similarity(self, text_a: str, text_b: str) -> float: """Calculate semantic similarity using keyword analysis""" # Extract keywords using the classifier keywords_a = set(self.classifier._extract_keywords(text_a)) keywords_b = set(self.classifier._extract_keywords(text_b)) if not keywords_a or not keywords_b: return 0.0 # Calculate semantic similarity based on keyword overlap intersection = len(keywords_a.intersection(keywords_b)) union = len(keywords_a.union(keywords_b)) base_similarity = intersection / union if union > 0 else 0.0 # Boost similarity for technical terms technical_terms = { 'error', 'exception', 'timeout', 'connection', 'database', 'server', 'api', 'service', 'application', 'network', 'storage', 'memory', 'cpu', 'disk', 'bandwidth', 'latency', 'performance', 'crash' } technical_intersection = len(keywords_a.intersection(keywords_b).intersection(technical_terms)) if technical_intersection > 0: base_similarity += 0.1 * technical_intersection return min(base_similarity, 1.0) def _calculate_temporal_proximity(self, incident_a: Dict, incident_b: Dict) -> float: """Calculate temporal proximity between incidents""" created_a = incident_a.get('created_at') created_b = incident_b.get('created_at') if not created_a or not created_b: return 0.0 # Convert to datetime if needed if isinstance(created_a, str): created_a = datetime.fromisoformat(created_a.replace('Z', '+00:00')) if isinstance(created_b, str): created_b = datetime.fromisoformat(created_b.replace('Z', '+00:00')) time_diff = abs((created_a - created_b).total_seconds()) # Calculate proximity score based on time difference if time_diff <= 300: # 5 minutes return 1.0 elif time_diff <= 1800: # 30 minutes return 0.9 elif time_diff <= 3600: # 1 hour return 0.7 elif time_diff <= 7200: # 2 hours return 0.5 elif time_diff <= 86400: # 24 hours return 0.3 elif time_diff <= 604800: # 7 days return 0.1 else: return 0.0 def _calculate_service_similarity(self, incident_a: Dict, incident_b: Dict) -> float: """Calculate service/component similarity""" # Extract service information from text text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')}".lower() text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')}".lower() # Service/component keywords service_keywords = [ 'api', 'service', 'database', 'server', 'application', 'website', 'mobile', 'frontend', 'backend', 'microservice', 'gateway', 'load balancer', 'cache', 'queue', 'message', 'notification', 'email', 'sms', 'payment', 'auth', 'user service', 'order service', 'payment service', 'notification service' ] services_a = set() services_b = set() for keyword in service_keywords: if keyword in text_a: services_a.add(keyword) if keyword in text_b: services_b.add(keyword) if not services_a or not services_b: return 0.0 intersection = len(services_a.intersection(services_b)) union = len(services_a.union(services_b)) return intersection / union if union > 0 else 0.0 def _calculate_metadata_similarity(self, incident_a: Dict, incident_b: Dict) -> float: """Calculate similarity based on metadata fields""" similarity_score = 0.0 total_fields = 0 # Compare severity if incident_a.get('severity') == incident_b.get('severity'): similarity_score += 1.0 total_fields += 1 # Compare status if incident_a.get('status') == incident_b.get('status'): similarity_score += 1.0 total_fields += 1 # Compare category if incident_a.get('category') == incident_b.get('category'): similarity_score += 1.0 total_fields += 1 # Compare assigned user if incident_a.get('assigned_to') == incident_b.get('assigned_to'): similarity_score += 1.0 total_fields += 1 # Compare reporter if incident_a.get('reporter') == incident_b.get('reporter'): similarity_score += 1.0 total_fields += 1 return similarity_score / total_fields if total_fields > 0 else 0.0 def _determine_duplication_type(self, overall_similarity: float, text_similarity: float, temporal_proximity: float) -> str: """Determine the type of duplication""" if overall_similarity >= self.duplication_thresholds['EXACT']: return 'EXACT' elif overall_similarity >= self.duplication_thresholds['NEAR_DUPLICATE']: return 'NEAR_DUPLICATE' elif overall_similarity >= self.duplication_thresholds['SIMILAR']: return 'SIMILAR' elif overall_similarity >= self.duplication_thresholds['POTENTIAL_DUPLICATE']: return 'POTENTIAL_DUPLICATE' else: return 'NO_DUPLICATE' def _calculate_confidence_score(self, overall_similarity: float, text_similarity: float, temporal_proximity: float, service_similarity: float) -> float: """Calculate confidence score for duplication detection""" base_confidence = overall_similarity # Boost confidence for high text similarity if text_similarity > 0.8: base_confidence += 0.1 # Boost confidence for high temporal proximity if temporal_proximity > 0.8: base_confidence += 0.1 # Boost confidence for high service similarity if service_similarity > 0.8: base_confidence += 0.05 return min(base_confidence, 1.0) def _determine_recommended_action(self, confidence_score: float, duplication_type: str) -> str: """Determine recommended action based on confidence and duplication type""" if confidence_score >= self.action_thresholds['MERGE']: return 'MERGE' elif confidence_score >= self.action_thresholds['LINK']: return 'LINK' elif confidence_score >= self.action_thresholds['REVIEW']: return 'REVIEW' else: return 'NO_ACTION' def _calculate_merge_confidence(self, confidence_score: float, duplication_type: str, incident_a: Dict, incident_b: Dict) -> float: """Calculate confidence for merging incidents""" merge_confidence = confidence_score # Adjust based on duplication type type_adjustments = { 'EXACT': 0.1, 'NEAR_DUPLICATE': 0.05, 'SIMILAR': 0.0, 'POTENTIAL_DUPLICATE': -0.1 } merge_confidence += type_adjustments.get(duplication_type, 0.0) # Adjust based on incident status if incident_a.get('status') == incident_b.get('status'): merge_confidence += 0.05 # Adjust based on severity if incident_a.get('severity') == incident_b.get('severity'): merge_confidence += 0.03 return min(max(merge_confidence, 0.0), 1.0) def _generate_reasoning(self, duplication_type: str, text_similarity: float, temporal_proximity: float, service_similarity: float) -> str: """Generate human-readable reasoning for duplication detection""" reasoning_parts = [] if text_similarity > 0.8: reasoning_parts.append(f"Very high text similarity ({text_similarity:.1%})") elif text_similarity > 0.6: reasoning_parts.append(f"High text similarity ({text_similarity:.1%})") elif text_similarity > 0.4: reasoning_parts.append(f"Moderate text similarity ({text_similarity:.1%})") if temporal_proximity > 0.8: reasoning_parts.append(f"Very close temporal proximity ({temporal_proximity:.1%})") elif temporal_proximity > 0.6: reasoning_parts.append(f"Close temporal proximity ({temporal_proximity:.1%})") if service_similarity > 0.8: reasoning_parts.append(f"Very high service similarity ({service_similarity:.1%})") elif service_similarity > 0.6: reasoning_parts.append(f"High service similarity ({service_similarity:.1%})") if duplication_type == 'EXACT': reasoning_parts.append("Incidents appear to be exact duplicates") elif duplication_type == 'NEAR_DUPLICATE': reasoning_parts.append("Incidents appear to be near duplicates") elif duplication_type == 'SIMILAR': reasoning_parts.append("Incidents appear to be similar") elif duplication_type == 'POTENTIAL_DUPLICATE': reasoning_parts.append("Incidents may be duplicates") return "; ".join(reasoning_parts) if reasoning_parts else "Based on overall similarity analysis" def _extract_shared_elements(self, incident_a: Dict, incident_b: Dict) -> List[str]: """Extract elements shared between incidents""" shared_elements = [] # Shared keywords text_a = f"{incident_a.get('title', '')} {incident_a.get('description', '')}".lower() text_b = f"{incident_b.get('title', '')} {incident_b.get('description', '')}".lower() keywords_a = set(self.classifier._extract_keywords(text_a)) keywords_b = set(self.classifier._extract_keywords(text_b)) shared_keywords = keywords_a.intersection(keywords_b) if shared_keywords: shared_elements.append(f"Keywords: {', '.join(list(shared_keywords)[:5])}") # Shared services service_keywords = [ 'api', 'service', 'database', 'server', 'application', 'website', 'mobile' ] services_a = set() services_b = set() for keyword in service_keywords: if keyword in text_a: services_a.add(keyword) if keyword in text_b: services_b.add(keyword) shared_services = services_a.intersection(services_b) if shared_services: shared_elements.append(f"Services: {', '.join(shared_services)}") # Shared metadata if incident_a.get('severity') == incident_b.get('severity'): shared_elements.append(f"Severity: {incident_a.get('severity')}") if incident_a.get('category') == incident_b.get('category'): shared_elements.append(f"Category: {incident_a.get('category')}") if incident_a.get('status') == incident_b.get('status'): shared_elements.append(f"Status: {incident_a.get('status')}") return shared_elements def find_duplicate_candidates(self, target_incident: Dict, all_incidents: List[Dict], limit: int = 10) -> List[Tuple[Dict, DuplicationResult]]: """Find incidents that might be duplicates of the target incident""" candidates = [] for incident in all_incidents: if incident['id'] == target_incident['id']: continue duplication = self.detect_duplication(target_incident, incident) if duplication: candidates.append((incident, duplication)) # Sort by confidence score and return top results candidates.sort(key=lambda x: x[1].confidence_score, reverse=True) return candidates[:limit] def batch_detect_duplicates(self, incidents: List[Dict]) -> List[Tuple[Dict, Dict, DuplicationResult]]: """Batch detect duplicates in a list of incidents""" duplicates = [] processed_pairs = set() for i, incident_a in enumerate(incidents): for j, incident_b in enumerate(incidents[i+1:], i+1): # Create a unique pair identifier pair_id = tuple(sorted([incident_a['id'], incident_b['id']])) if pair_id in processed_pairs: continue processed_pairs.add(pair_id) duplication = self.detect_duplication(incident_a, incident_b) if duplication: duplicates.append((incident_a, incident_b, duplication)) # Sort by confidence score duplicates.sort(key=lambda x: x[2].confidence_score, reverse=True) return duplicates