Files
ETB/ETB-API/incident_intelligence/ai/classification.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

472 lines
21 KiB
Python

"""
AI-driven incident classification using NLP techniques
"""
import re
import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from django.conf import settings
@dataclass
class ClassificationResult:
"""Result of incident classification"""
category: str
subcategory: str
confidence: float
alternative_categories: List[Dict[str, float]]
keywords: List[str]
sentiment_score: float
urgency_indicators: List[str]
class IncidentClassifier:
"""
AI-driven incident classifier using rule-based and ML techniques
"""
def __init__(self):
self.model_version = "v1.0"
# Predefined categories and their keywords
self.categories = {
'INFRASTRUCTURE': {
'keywords': ['server', 'database', 'network', 'storage', 'disk', 'memory', 'cpu', 'load', 'bandwidth', 'connection', 'timeout', 'latency'],
'subcategories': {
'SERVER_ISSUE': ['server', 'host', 'machine', 'instance', 'vm', 'container'],
'DATABASE_ISSUE': ['database', 'db', 'sql', 'query', 'connection', 'timeout', 'deadlock'],
'NETWORK_ISSUE': ['network', 'connectivity', 'dns', 'firewall', 'routing', 'packet', 'bandwidth'],
'STORAGE_ISSUE': ['storage', 'disk', 'volume', 'space', 'capacity', 'i/o', 'read', 'write'],
}
},
'APPLICATION': {
'keywords': ['application', 'app', 'service', 'api', 'endpoint', 'response', 'error', 'exception', 'crash', 'bug'],
'subcategories': {
'API_ISSUE': ['api', 'endpoint', 'response', 'status', 'code', 'timeout', 'rate', 'limit'],
'SERVICE_ISSUE': ['service', 'microservice', 'dependency', 'circuit', 'breaker', 'fallback'],
'PERFORMANCE_ISSUE': ['performance', 'slow', 'latency', 'response', 'time', 'throughput', 'bottleneck'],
'FUNCTIONALITY_ISSUE': ['bug', 'feature', 'functionality', 'behavior', 'unexpected', 'incorrect'],
}
},
'SECURITY': {
'keywords': ['security', 'authentication', 'authorization', 'access', 'permission', 'breach', 'attack', 'vulnerability', 'malware'],
'subcategories': {
'AUTH_ISSUE': ['authentication', 'login', 'password', 'token', 'session', 'credential'],
'ACCESS_ISSUE': ['authorization', 'permission', 'access', 'denied', 'forbidden', 'unauthorized'],
'THREAT_ISSUE': ['attack', 'breach', 'malware', 'virus', 'intrusion', 'suspicious', 'anomaly'],
'VULNERABILITY': ['vulnerability', 'exploit', 'patch', 'update', 'security', 'fix'],
}
},
'USER_EXPERIENCE': {
'keywords': ['user', 'interface', 'ui', 'ux', 'experience', 'usability', 'navigation', 'button', 'form', 'page'],
'subcategories': {
'UI_ISSUE': ['interface', 'ui', 'button', 'form', 'page', 'layout', 'display', 'rendering'],
'NAVIGATION_ISSUE': ['navigation', 'menu', 'link', 'redirect', 'routing', 'page', 'not', 'found'],
'USABILITY_ISSUE': ['usability', 'experience', 'confusing', 'difficult', 'unclear', 'intuitive'],
'MOBILE_ISSUE': ['mobile', 'app', 'responsive', 'device', 'screen', 'touch', 'gesture'],
}
},
'DATA': {
'keywords': ['data', 'file', 'import', 'export', 'sync', 'backup', 'recovery', 'corruption', 'missing', 'duplicate'],
'subcategories': {
'DATA_CORRUPTION': ['corruption', 'corrupted', 'invalid', 'malformed', 'broken', 'damaged'],
'DATA_LOSS': ['missing', 'lost', 'deleted', 'removed', 'disappeared', 'not', 'found'],
'SYNC_ISSUE': ['sync', 'synchronization', 'conflict', 'merge', 'update', 'latest'],
'BACKUP_ISSUE': ['backup', 'restore', 'recovery', 'archive', 'retention', 'storage'],
}
},
'INTEGRATION': {
'keywords': ['integration', 'third-party', 'external', 'webhook', 'api', 'connection', 'sync', 'import', 'export'],
'subcategories': {
'THIRD_PARTY_ISSUE': ['third-party', 'external', 'vendor', 'partner', 'service', 'provider'],
'WEBHOOK_ISSUE': ['webhook', 'callback', 'notification', 'event', 'trigger', 'delivery'],
'API_INTEGRATION': ['api', 'integration', 'endpoint', 'connection', 'authentication', 'response'],
'DATA_INTEGRATION': ['import', 'export', 'migration', 'transformation', 'mapping', 'format'],
}
}
}
# Urgency indicators
self.urgency_indicators = {
'CRITICAL': ['down', 'outage', 'critical', 'emergency', 'urgent', 'immediate', 'severe', 'complete', 'total'],
'HIGH': ['major', 'significant', 'important', 'priority', 'escalate', 'escalated', 'blocking'],
'MEDIUM': ['moderate', 'some', 'partial', 'intermittent', 'occasional', 'sometimes'],
'LOW': ['minor', 'small', 'cosmetic', 'enhancement', 'improvement', 'suggestion']
}
# Sentiment analysis keywords
self.sentiment_keywords = {
'positive': ['working', 'fixed', 'resolved', 'good', 'excellent', 'improved', 'better', 'success'],
'negative': ['broken', 'failed', 'error', 'issue', 'problem', 'bug', 'crash', 'down', 'slow', 'terrible', 'awful'],
'neutral': ['report', 'incident', 'ticket', 'request', 'update', 'status', 'information']
}
def classify_incident(self, title: str, description: str, free_text: str = "") -> ClassificationResult:
"""
Classify an incident based on its text content
"""
start_time = time.time()
# Combine all text for analysis
combined_text = f"{title} {description} {free_text}".lower()
# Extract keywords
keywords = self._extract_keywords(combined_text)
# Analyze sentiment
sentiment_score = self._analyze_sentiment(combined_text)
# Detect urgency indicators
urgency_indicators = self._detect_urgency_indicators(combined_text)
# Classify category and subcategory
category, subcategory, confidence, alternatives = self._classify_category(combined_text, keywords)
processing_time = time.time() - start_time
return ClassificationResult(
category=category,
subcategory=subcategory,
confidence=confidence,
alternative_categories=alternatives,
keywords=keywords,
sentiment_score=sentiment_score,
urgency_indicators=urgency_indicators
)
def _extract_keywords(self, text: str) -> List[str]:
"""Extract relevant keywords from text"""
# Simple keyword extraction - in production, use more sophisticated NLP
words = re.findall(r'\b[a-zA-Z]{3,}\b', text)
# Filter out common stop words
stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those', 'a', 'an'}
keywords = [word for word in words if word not in stop_words]
# Count frequency and return top keywords
from collections import Counter
keyword_counts = Counter(keywords)
return [word for word, count in keyword_counts.most_common(10)]
def _analyze_sentiment(self, text: str) -> float:
"""Analyze sentiment of the text (-1 to 1)"""
positive_count = sum(1 for word in self.sentiment_keywords['positive'] if word in text)
negative_count = sum(1 for word in self.sentiment_keywords['negative'] if word in text)
total_sentiment_words = positive_count + negative_count
if total_sentiment_words == 0:
return 0.0
return (positive_count - negative_count) / total_sentiment_words
def _detect_urgency_indicators(self, text: str) -> List[str]:
"""Detect urgency indicators in the text"""
detected_indicators = []
for urgency_level, indicators in self.urgency_indicators.items():
for indicator in indicators:
if indicator in text:
detected_indicators.append(f"{urgency_level}: {indicator}")
return detected_indicators
def _classify_category(self, text: str, keywords: List[str]) -> Tuple[str, str, float, List[Dict[str, float]]]:
"""Classify the incident category and subcategory"""
category_scores = {}
subcategory_scores = {}
# Score each category based on keyword matches
for category, data in self.categories.items():
score = 0
category_keywords = data['keywords']
# Count keyword matches
for keyword in category_keywords:
if keyword in text:
score += 1
# Also check for partial matches in keywords list
for extracted_keyword in keywords:
if keyword in extracted_keyword or extracted_keyword in keyword:
score += 0.5
category_scores[category] = score
# Score subcategories
for subcategory, subcategory_keywords in data['subcategories'].items():
subcategory_score = 0
for keyword in subcategory_keywords:
if keyword in text:
subcategory_score += 1
for extracted_keyword in keywords:
if keyword in extracted_keyword or extracted_keyword in keyword:
subcategory_score += 0.5
subcategory_scores[subcategory] = subcategory_score
# Find best category
if not category_scores or max(category_scores.values()) == 0:
best_category = 'GENERAL'
best_subcategory = 'UNKNOWN'
confidence = 0.1
else:
best_category = max(category_scores, key=category_scores.get)
max_score = max(category_scores.values())
confidence = min(max_score / 10.0, 1.0) # Normalize to 0-1
# Find best subcategory within the category
if best_category in self.categories:
category_subcategories = self.categories[best_category]['subcategories']
subcategory_scores_filtered = {k: v for k, v in subcategory_scores.items() if k in category_subcategories}
if subcategory_scores_filtered and max(subcategory_scores_filtered.values()) > 0:
best_subcategory = max(subcategory_scores_filtered, key=subcategory_scores_filtered.get)
else:
best_subcategory = 'GENERAL'
else:
best_subcategory = 'GENERAL'
# Create alternative categories
alternatives = []
sorted_categories = sorted(category_scores.items(), key=lambda x: x[1], reverse=True)
for category, score in sorted_categories[:3]:
if category != best_category and score > 0:
alternatives.append({
'category': category,
'confidence': min(score / 10.0, 1.0)
})
return best_category, best_subcategory, confidence, alternatives
class SeverityAnalyzer:
"""
AI-driven severity analyzer based on impact assessment
"""
def __init__(self):
self.model_version = "v1.0"
# Severity indicators
self.severity_indicators = {
'EMERGENCY': {
'keywords': ['down', 'outage', 'critical', 'emergency', 'complete', 'total', 'all', 'entire', 'system'],
'impact_multiplier': 2.0,
'user_impact_threshold': 0.8,
'business_impact_threshold': 0.9
},
'CRITICAL': {
'keywords': ['major', 'significant', 'severe', 'blocking', 'cannot', 'unable', 'failed', 'broken'],
'impact_multiplier': 1.5,
'user_impact_threshold': 0.6,
'business_impact_threshold': 0.7
},
'HIGH': {
'keywords': ['important', 'priority', 'escalate', 'escalated', 'urgent', 'immediate', 'soon'],
'impact_multiplier': 1.2,
'user_impact_threshold': 0.4,
'business_impact_threshold': 0.5
},
'MEDIUM': {
'keywords': ['moderate', 'some', 'partial', 'intermittent', 'occasional', 'sometimes', 'minor'],
'impact_multiplier': 1.0,
'user_impact_threshold': 0.2,
'business_impact_threshold': 0.3
},
'LOW': {
'keywords': ['small', 'cosmetic', 'enhancement', 'improvement', 'suggestion', 'nice', 'to', 'have'],
'impact_multiplier': 0.5,
'user_impact_threshold': 0.1,
'business_impact_threshold': 0.1
}
}
def analyze_severity(self, incident_data: Dict) -> Dict:
"""
Analyze incident severity based on various factors
"""
start_time = time.time()
title = incident_data.get('title', '').lower()
description = incident_data.get('description', '').lower()
free_text = incident_data.get('free_text', '').lower()
affected_users = incident_data.get('affected_users', 0)
business_impact = incident_data.get('business_impact', '').lower()
combined_text = f"{title} {description} {free_text} {business_impact}"
# Calculate impact scores
user_impact_score = self._calculate_user_impact(affected_users, combined_text)
business_impact_score = self._calculate_business_impact(business_impact, combined_text)
technical_impact_score = self._calculate_technical_impact(combined_text)
# Determine severity based on impact scores and keywords
suggested_severity, confidence, reasoning, impact_factors = self._determine_severity(
combined_text, user_impact_score, business_impact_score, technical_impact_score
)
processing_time = time.time() - start_time
return {
'suggested_severity': suggested_severity,
'confidence_score': confidence,
'user_impact_score': user_impact_score,
'business_impact_score': business_impact_score,
'technical_impact_score': technical_impact_score,
'reasoning': reasoning,
'impact_factors': impact_factors,
'processing_time': processing_time
}
def _calculate_user_impact(self, affected_users: int, text: str) -> float:
"""Calculate user impact score (0-1)"""
# Base score from affected users count
if affected_users == 0:
# Try to extract from text
user_indicators = ['all users', 'everyone', 'entire user base', 'all customers']
if any(indicator in text for indicator in user_indicators):
base_score = 0.9
else:
base_score = 0.1
elif affected_users < 10:
base_score = 0.2
elif affected_users < 100:
base_score = 0.4
elif affected_users < 1000:
base_score = 0.6
elif affected_users < 10000:
base_score = 0.8
else:
base_score = 1.0
# Adjust based on text indicators
if 'all' in text or 'everyone' in text:
base_score = min(base_score + 0.2, 1.0)
elif 'some' in text or 'few' in text:
base_score = max(base_score - 0.1, 0.0)
return base_score
def _calculate_business_impact(self, business_impact: str, text: str) -> float:
"""Calculate business impact score (0-1)"""
if not business_impact:
# Try to infer from text
high_impact_indicators = ['revenue', 'sales', 'customer', 'business', 'critical', 'essential', 'production']
if any(indicator in text for indicator in high_impact_indicators):
return 0.6
return 0.3
# Analyze business impact text
high_impact_keywords = ['revenue', 'sales', 'customer', 'business', 'critical', 'essential', 'production', 'outage', 'down']
medium_impact_keywords = ['service', 'feature', 'functionality', 'performance', 'slow']
low_impact_keywords = ['cosmetic', 'minor', 'enhancement', 'improvement']
score = 0.3 # Base score
for keyword in high_impact_keywords:
if keyword in business_impact:
score += 0.1
for keyword in medium_impact_keywords:
if keyword in business_impact:
score += 0.05
for keyword in low_impact_keywords:
if keyword in business_impact:
score -= 0.05
return min(max(score, 0.0), 1.0)
def _calculate_technical_impact(self, text: str) -> float:
"""Calculate technical impact score (0-1)"""
technical_indicators = {
'high': ['down', 'outage', 'crash', 'failed', 'broken', 'unavailable', 'error', 'exception'],
'medium': ['slow', 'performance', 'latency', 'timeout', 'intermittent', 'partial'],
'low': ['cosmetic', 'display', 'ui', 'minor', 'enhancement']
}
score = 0.3 # Base score
for level, keywords in technical_indicators.items():
for keyword in keywords:
if keyword in text:
if level == 'high':
score += 0.15
elif level == 'medium':
score += 0.08
elif level == 'low':
score -= 0.05
return min(max(score, 0.0), 1.0)
def _determine_severity(self, text: str, user_impact: float, business_impact: float, technical_impact: float) -> Tuple[str, float, str, List[str]]:
"""Determine severity based on impact scores and text analysis"""
impact_factors = []
# Calculate weighted impact score
weighted_score = (user_impact * 0.4 + business_impact * 0.4 + technical_impact * 0.2)
# Check for severity indicators in text
severity_scores = {}
for severity, data in self.severity_indicators.items():
score = 0
for keyword in data['keywords']:
if keyword in text:
score += 1
# Apply impact multiplier
score *= data['impact_multiplier']
severity_scores[severity] = score
# Find best severity match
if severity_scores and max(severity_scores.values()) > 0:
best_severity = max(severity_scores, key=severity_scores.get)
text_confidence = min(max(severity_scores.values()) / 5.0, 1.0)
else:
# Fallback to impact-based severity
if weighted_score >= 0.8:
best_severity = 'CRITICAL'
elif weighted_score >= 0.6:
best_severity = 'HIGH'
elif weighted_score >= 0.4:
best_severity = 'MEDIUM'
else:
best_severity = 'LOW'
text_confidence = 0.5
# Combine text and impact confidence
confidence = (text_confidence + (1.0 - abs(weighted_score - self._severity_to_score(best_severity)))) / 2.0
# Generate reasoning
reasoning_parts = []
if user_impact > 0.6:
reasoning_parts.append(f"High user impact ({user_impact:.1%})")
impact_factors.append(f"User Impact: {user_impact:.1%}")
if business_impact > 0.6:
reasoning_parts.append(f"Significant business impact ({business_impact:.1%})")
impact_factors.append(f"Business Impact: {business_impact:.1%}")
if technical_impact > 0.6:
reasoning_parts.append(f"Major technical impact ({technical_impact:.1%})")
impact_factors.append(f"Technical Impact: {technical_impact:.1%}")
if severity_scores and max(severity_scores.values()) > 0:
reasoning_parts.append("Severity indicators detected in incident description")
impact_factors.append("Text Analysis: Severity keywords found")
reasoning = "; ".join(reasoning_parts) if reasoning_parts else "Based on overall impact assessment"
return best_severity, confidence, reasoning, impact_factors
def _severity_to_score(self, severity: str) -> float:
"""Convert severity level to numeric score"""
severity_scores = {
'LOW': 0.2,
'MEDIUM': 0.4,
'HIGH': 0.6,
'CRITICAL': 0.8,
'EMERGENCY': 1.0
}
return severity_scores.get(severity, 0.4)