""" AI-driven incident classification using NLP techniques """ import re import time from typing import Dict, List, Tuple, Optional from dataclasses import dataclass from django.conf import settings @dataclass class ClassificationResult: """Result of incident classification""" category: str subcategory: str confidence: float alternative_categories: List[Dict[str, float]] keywords: List[str] sentiment_score: float urgency_indicators: List[str] class IncidentClassifier: """ AI-driven incident classifier using rule-based and ML techniques """ def __init__(self): self.model_version = "v1.0" # Predefined categories and their keywords self.categories = { 'INFRASTRUCTURE': { 'keywords': ['server', 'database', 'network', 'storage', 'disk', 'memory', 'cpu', 'load', 'bandwidth', 'connection', 'timeout', 'latency'], 'subcategories': { 'SERVER_ISSUE': ['server', 'host', 'machine', 'instance', 'vm', 'container'], 'DATABASE_ISSUE': ['database', 'db', 'sql', 'query', 'connection', 'timeout', 'deadlock'], 'NETWORK_ISSUE': ['network', 'connectivity', 'dns', 'firewall', 'routing', 'packet', 'bandwidth'], 'STORAGE_ISSUE': ['storage', 'disk', 'volume', 'space', 'capacity', 'i/o', 'read', 'write'], } }, 'APPLICATION': { 'keywords': ['application', 'app', 'service', 'api', 'endpoint', 'response', 'error', 'exception', 'crash', 'bug'], 'subcategories': { 'API_ISSUE': ['api', 'endpoint', 'response', 'status', 'code', 'timeout', 'rate', 'limit'], 'SERVICE_ISSUE': ['service', 'microservice', 'dependency', 'circuit', 'breaker', 'fallback'], 'PERFORMANCE_ISSUE': ['performance', 'slow', 'latency', 'response', 'time', 'throughput', 'bottleneck'], 'FUNCTIONALITY_ISSUE': ['bug', 'feature', 'functionality', 'behavior', 'unexpected', 'incorrect'], } }, 'SECURITY': { 'keywords': ['security', 'authentication', 'authorization', 'access', 'permission', 'breach', 'attack', 'vulnerability', 'malware'], 'subcategories': { 'AUTH_ISSUE': ['authentication', 'login', 'password', 'token', 'session', 'credential'], 'ACCESS_ISSUE': ['authorization', 'permission', 'access', 'denied', 'forbidden', 'unauthorized'], 'THREAT_ISSUE': ['attack', 'breach', 'malware', 'virus', 'intrusion', 'suspicious', 'anomaly'], 'VULNERABILITY': ['vulnerability', 'exploit', 'patch', 'update', 'security', 'fix'], } }, 'USER_EXPERIENCE': { 'keywords': ['user', 'interface', 'ui', 'ux', 'experience', 'usability', 'navigation', 'button', 'form', 'page'], 'subcategories': { 'UI_ISSUE': ['interface', 'ui', 'button', 'form', 'page', 'layout', 'display', 'rendering'], 'NAVIGATION_ISSUE': ['navigation', 'menu', 'link', 'redirect', 'routing', 'page', 'not', 'found'], 'USABILITY_ISSUE': ['usability', 'experience', 'confusing', 'difficult', 'unclear', 'intuitive'], 'MOBILE_ISSUE': ['mobile', 'app', 'responsive', 'device', 'screen', 'touch', 'gesture'], } }, 'DATA': { 'keywords': ['data', 'file', 'import', 'export', 'sync', 'backup', 'recovery', 'corruption', 'missing', 'duplicate'], 'subcategories': { 'DATA_CORRUPTION': ['corruption', 'corrupted', 'invalid', 'malformed', 'broken', 'damaged'], 'DATA_LOSS': ['missing', 'lost', 'deleted', 'removed', 'disappeared', 'not', 'found'], 'SYNC_ISSUE': ['sync', 'synchronization', 'conflict', 'merge', 'update', 'latest'], 'BACKUP_ISSUE': ['backup', 'restore', 'recovery', 'archive', 'retention', 'storage'], } }, 'INTEGRATION': { 'keywords': ['integration', 'third-party', 'external', 'webhook', 'api', 'connection', 'sync', 'import', 'export'], 'subcategories': { 'THIRD_PARTY_ISSUE': ['third-party', 'external', 'vendor', 'partner', 'service', 'provider'], 'WEBHOOK_ISSUE': ['webhook', 'callback', 'notification', 'event', 'trigger', 'delivery'], 'API_INTEGRATION': ['api', 'integration', 'endpoint', 'connection', 'authentication', 'response'], 'DATA_INTEGRATION': ['import', 'export', 'migration', 'transformation', 'mapping', 'format'], } } } # Urgency indicators self.urgency_indicators = { 'CRITICAL': ['down', 'outage', 'critical', 'emergency', 'urgent', 'immediate', 'severe', 'complete', 'total'], 'HIGH': ['major', 'significant', 'important', 'priority', 'escalate', 'escalated', 'blocking'], 'MEDIUM': ['moderate', 'some', 'partial', 'intermittent', 'occasional', 'sometimes'], 'LOW': ['minor', 'small', 'cosmetic', 'enhancement', 'improvement', 'suggestion'] } # Sentiment analysis keywords self.sentiment_keywords = { 'positive': ['working', 'fixed', 'resolved', 'good', 'excellent', 'improved', 'better', 'success'], 'negative': ['broken', 'failed', 'error', 'issue', 'problem', 'bug', 'crash', 'down', 'slow', 'terrible', 'awful'], 'neutral': ['report', 'incident', 'ticket', 'request', 'update', 'status', 'information'] } def classify_incident(self, title: str, description: str, free_text: str = "") -> ClassificationResult: """ Classify an incident based on its text content """ start_time = time.time() # Combine all text for analysis combined_text = f"{title} {description} {free_text}".lower() # Extract keywords keywords = self._extract_keywords(combined_text) # Analyze sentiment sentiment_score = self._analyze_sentiment(combined_text) # Detect urgency indicators urgency_indicators = self._detect_urgency_indicators(combined_text) # Classify category and subcategory category, subcategory, confidence, alternatives = self._classify_category(combined_text, keywords) processing_time = time.time() - start_time return ClassificationResult( category=category, subcategory=subcategory, confidence=confidence, alternative_categories=alternatives, keywords=keywords, sentiment_score=sentiment_score, urgency_indicators=urgency_indicators ) def _extract_keywords(self, text: str) -> List[str]: """Extract relevant keywords from text""" # Simple keyword extraction - in production, use more sophisticated NLP words = re.findall(r'\b[a-zA-Z]{3,}\b', text) # Filter out common stop words stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those', 'a', 'an'} keywords = [word for word in words if word not in stop_words] # Count frequency and return top keywords from collections import Counter keyword_counts = Counter(keywords) return [word for word, count in keyword_counts.most_common(10)] def _analyze_sentiment(self, text: str) -> float: """Analyze sentiment of the text (-1 to 1)""" positive_count = sum(1 for word in self.sentiment_keywords['positive'] if word in text) negative_count = sum(1 for word in self.sentiment_keywords['negative'] if word in text) total_sentiment_words = positive_count + negative_count if total_sentiment_words == 0: return 0.0 return (positive_count - negative_count) / total_sentiment_words def _detect_urgency_indicators(self, text: str) -> List[str]: """Detect urgency indicators in the text""" detected_indicators = [] for urgency_level, indicators in self.urgency_indicators.items(): for indicator in indicators: if indicator in text: detected_indicators.append(f"{urgency_level}: {indicator}") return detected_indicators def _classify_category(self, text: str, keywords: List[str]) -> Tuple[str, str, float, List[Dict[str, float]]]: """Classify the incident category and subcategory""" category_scores = {} subcategory_scores = {} # Score each category based on keyword matches for category, data in self.categories.items(): score = 0 category_keywords = data['keywords'] # Count keyword matches for keyword in category_keywords: if keyword in text: score += 1 # Also check for partial matches in keywords list for extracted_keyword in keywords: if keyword in extracted_keyword or extracted_keyword in keyword: score += 0.5 category_scores[category] = score # Score subcategories for subcategory, subcategory_keywords in data['subcategories'].items(): subcategory_score = 0 for keyword in subcategory_keywords: if keyword in text: subcategory_score += 1 for extracted_keyword in keywords: if keyword in extracted_keyword or extracted_keyword in keyword: subcategory_score += 0.5 subcategory_scores[subcategory] = subcategory_score # Find best category if not category_scores or max(category_scores.values()) == 0: best_category = 'GENERAL' best_subcategory = 'UNKNOWN' confidence = 0.1 else: best_category = max(category_scores, key=category_scores.get) max_score = max(category_scores.values()) confidence = min(max_score / 10.0, 1.0) # Normalize to 0-1 # Find best subcategory within the category if best_category in self.categories: category_subcategories = self.categories[best_category]['subcategories'] subcategory_scores_filtered = {k: v for k, v in subcategory_scores.items() if k in category_subcategories} if subcategory_scores_filtered and max(subcategory_scores_filtered.values()) > 0: best_subcategory = max(subcategory_scores_filtered, key=subcategory_scores_filtered.get) else: best_subcategory = 'GENERAL' else: best_subcategory = 'GENERAL' # Create alternative categories alternatives = [] sorted_categories = sorted(category_scores.items(), key=lambda x: x[1], reverse=True) for category, score in sorted_categories[:3]: if category != best_category and score > 0: alternatives.append({ 'category': category, 'confidence': min(score / 10.0, 1.0) }) return best_category, best_subcategory, confidence, alternatives class SeverityAnalyzer: """ AI-driven severity analyzer based on impact assessment """ def __init__(self): self.model_version = "v1.0" # Severity indicators self.severity_indicators = { 'EMERGENCY': { 'keywords': ['down', 'outage', 'critical', 'emergency', 'complete', 'total', 'all', 'entire', 'system'], 'impact_multiplier': 2.0, 'user_impact_threshold': 0.8, 'business_impact_threshold': 0.9 }, 'CRITICAL': { 'keywords': ['major', 'significant', 'severe', 'blocking', 'cannot', 'unable', 'failed', 'broken'], 'impact_multiplier': 1.5, 'user_impact_threshold': 0.6, 'business_impact_threshold': 0.7 }, 'HIGH': { 'keywords': ['important', 'priority', 'escalate', 'escalated', 'urgent', 'immediate', 'soon'], 'impact_multiplier': 1.2, 'user_impact_threshold': 0.4, 'business_impact_threshold': 0.5 }, 'MEDIUM': { 'keywords': ['moderate', 'some', 'partial', 'intermittent', 'occasional', 'sometimes', 'minor'], 'impact_multiplier': 1.0, 'user_impact_threshold': 0.2, 'business_impact_threshold': 0.3 }, 'LOW': { 'keywords': ['small', 'cosmetic', 'enhancement', 'improvement', 'suggestion', 'nice', 'to', 'have'], 'impact_multiplier': 0.5, 'user_impact_threshold': 0.1, 'business_impact_threshold': 0.1 } } def analyze_severity(self, incident_data: Dict) -> Dict: """ Analyze incident severity based on various factors """ start_time = time.time() title = incident_data.get('title', '').lower() description = incident_data.get('description', '').lower() free_text = incident_data.get('free_text', '').lower() affected_users = incident_data.get('affected_users', 0) business_impact = incident_data.get('business_impact', '').lower() combined_text = f"{title} {description} {free_text} {business_impact}" # Calculate impact scores user_impact_score = self._calculate_user_impact(affected_users, combined_text) business_impact_score = self._calculate_business_impact(business_impact, combined_text) technical_impact_score = self._calculate_technical_impact(combined_text) # Determine severity based on impact scores and keywords suggested_severity, confidence, reasoning, impact_factors = self._determine_severity( combined_text, user_impact_score, business_impact_score, technical_impact_score ) processing_time = time.time() - start_time return { 'suggested_severity': suggested_severity, 'confidence_score': confidence, 'user_impact_score': user_impact_score, 'business_impact_score': business_impact_score, 'technical_impact_score': technical_impact_score, 'reasoning': reasoning, 'impact_factors': impact_factors, 'processing_time': processing_time } def _calculate_user_impact(self, affected_users: int, text: str) -> float: """Calculate user impact score (0-1)""" # Base score from affected users count if affected_users == 0: # Try to extract from text user_indicators = ['all users', 'everyone', 'entire user base', 'all customers'] if any(indicator in text for indicator in user_indicators): base_score = 0.9 else: base_score = 0.1 elif affected_users < 10: base_score = 0.2 elif affected_users < 100: base_score = 0.4 elif affected_users < 1000: base_score = 0.6 elif affected_users < 10000: base_score = 0.8 else: base_score = 1.0 # Adjust based on text indicators if 'all' in text or 'everyone' in text: base_score = min(base_score + 0.2, 1.0) elif 'some' in text or 'few' in text: base_score = max(base_score - 0.1, 0.0) return base_score def _calculate_business_impact(self, business_impact: str, text: str) -> float: """Calculate business impact score (0-1)""" if not business_impact: # Try to infer from text high_impact_indicators = ['revenue', 'sales', 'customer', 'business', 'critical', 'essential', 'production'] if any(indicator in text for indicator in high_impact_indicators): return 0.6 return 0.3 # Analyze business impact text high_impact_keywords = ['revenue', 'sales', 'customer', 'business', 'critical', 'essential', 'production', 'outage', 'down'] medium_impact_keywords = ['service', 'feature', 'functionality', 'performance', 'slow'] low_impact_keywords = ['cosmetic', 'minor', 'enhancement', 'improvement'] score = 0.3 # Base score for keyword in high_impact_keywords: if keyword in business_impact: score += 0.1 for keyword in medium_impact_keywords: if keyword in business_impact: score += 0.05 for keyword in low_impact_keywords: if keyword in business_impact: score -= 0.05 return min(max(score, 0.0), 1.0) def _calculate_technical_impact(self, text: str) -> float: """Calculate technical impact score (0-1)""" technical_indicators = { 'high': ['down', 'outage', 'crash', 'failed', 'broken', 'unavailable', 'error', 'exception'], 'medium': ['slow', 'performance', 'latency', 'timeout', 'intermittent', 'partial'], 'low': ['cosmetic', 'display', 'ui', 'minor', 'enhancement'] } score = 0.3 # Base score for level, keywords in technical_indicators.items(): for keyword in keywords: if keyword in text: if level == 'high': score += 0.15 elif level == 'medium': score += 0.08 elif level == 'low': score -= 0.05 return min(max(score, 0.0), 1.0) def _determine_severity(self, text: str, user_impact: float, business_impact: float, technical_impact: float) -> Tuple[str, float, str, List[str]]: """Determine severity based on impact scores and text analysis""" impact_factors = [] # Calculate weighted impact score weighted_score = (user_impact * 0.4 + business_impact * 0.4 + technical_impact * 0.2) # Check for severity indicators in text severity_scores = {} for severity, data in self.severity_indicators.items(): score = 0 for keyword in data['keywords']: if keyword in text: score += 1 # Apply impact multiplier score *= data['impact_multiplier'] severity_scores[severity] = score # Find best severity match if severity_scores and max(severity_scores.values()) > 0: best_severity = max(severity_scores, key=severity_scores.get) text_confidence = min(max(severity_scores.values()) / 5.0, 1.0) else: # Fallback to impact-based severity if weighted_score >= 0.8: best_severity = 'CRITICAL' elif weighted_score >= 0.6: best_severity = 'HIGH' elif weighted_score >= 0.4: best_severity = 'MEDIUM' else: best_severity = 'LOW' text_confidence = 0.5 # Combine text and impact confidence confidence = (text_confidence + (1.0 - abs(weighted_score - self._severity_to_score(best_severity)))) / 2.0 # Generate reasoning reasoning_parts = [] if user_impact > 0.6: reasoning_parts.append(f"High user impact ({user_impact:.1%})") impact_factors.append(f"User Impact: {user_impact:.1%}") if business_impact > 0.6: reasoning_parts.append(f"Significant business impact ({business_impact:.1%})") impact_factors.append(f"Business Impact: {business_impact:.1%}") if technical_impact > 0.6: reasoning_parts.append(f"Major technical impact ({technical_impact:.1%})") impact_factors.append(f"Technical Impact: {technical_impact:.1%}") if severity_scores and max(severity_scores.values()) > 0: reasoning_parts.append("Severity indicators detected in incident description") impact_factors.append("Text Analysis: Severity keywords found") reasoning = "; ".join(reasoning_parts) if reasoning_parts else "Based on overall impact assessment" return best_severity, confidence, reasoning, impact_factors def _severity_to_score(self, severity: str) -> float: """Convert severity level to numeric score""" severity_scores = { 'LOW': 0.2, 'MEDIUM': 0.4, 'HIGH': 0.6, 'CRITICAL': 0.8, 'EMERGENCY': 1.0 } return severity_scores.get(severity, 0.4)