""" Celery tasks for incident intelligence processing """ from celery import shared_task from django.utils import timezone from django.db import transaction import logging from .models import ( Incident, IncidentClassification, SeveritySuggestion, IncidentCorrelation, DuplicationDetection, IncidentPattern, AIProcessingLog ) from .ai.classification import IncidentClassifier, SeverityAnalyzer from .ai.correlation import IncidentCorrelationEngine from .ai.duplication import DuplicationDetector logger = logging.getLogger(__name__) @shared_task(bind=True, max_retries=3) def process_incident_ai(self, incident_id): """ Process a single incident with AI analysis """ try: incident = Incident.objects.get(id=incident_id) # Create processing log log = AIProcessingLog.objects.create( processing_type='CLASSIFICATION', status='PROCESSING', incident=incident, input_data={ 'incident_id': str(incident_id), 'title': incident.title, 'description': incident.description, 'free_text': incident.free_text } ) # Initialize AI components classifier = IncidentClassifier() severity_analyzer = SeverityAnalyzer() # Perform classification classification_result = classifier.classify_incident( incident.title, incident.description, incident.free_text ) # Save classification results with transaction.atomic(): # Update incident with classification incident.category = classification_result.category incident.subcategory = classification_result.subcategory incident.classification_confidence = classification_result.confidence # Create detailed classification record IncidentClassification.objects.update_or_create( incident=incident, defaults={ 'predicted_category': classification_result.category, 'predicted_subcategory': classification_result.subcategory, 'confidence_score': classification_result.confidence, 'alternative_categories': [ {'category': alt['category'], 'confidence': alt['confidence']} for alt in classification_result.alternative_categories ], 'extracted_keywords': classification_result.keywords, 'sentiment_score': classification_result.sentiment_score, 'urgency_indicators': classification_result.urgency_indicators, 'processing_time': classification_result.processing_time, 'model_version': classifier.model_version } ) # Perform severity analysis severity_result = severity_analyzer.analyze_severity({ 'title': incident.title, 'description': incident.description, 'free_text': incident.free_text, 'affected_users': incident.affected_users, 'business_impact': incident.business_impact }) # Update incident with severity suggestion incident.suggested_severity = severity_result['suggested_severity'] incident.severity_confidence = severity_result['confidence_score'] # Create detailed severity suggestion record SeveritySuggestion.objects.update_or_create( incident=incident, defaults={ 'suggested_severity': severity_result['suggested_severity'], 'confidence_score': severity_result['confidence_score'], 'user_impact_score': severity_result['user_impact_score'], 'business_impact_score': severity_result['business_impact_score'], 'technical_impact_score': severity_result['technical_impact_score'], 'reasoning': severity_result['reasoning'], 'impact_factors': severity_result['impact_factors'], 'processing_time': severity_result['processing_time'], 'model_version': severity_analyzer.model_version } ) # Mark as processed incident.ai_processed = True incident.last_ai_analysis = timezone.now() incident.save() # Update processing log log.status = 'COMPLETED' log.completed_at = timezone.now() log.output_data = { 'classification': { 'category': classification_result.category, 'subcategory': classification_result.subcategory, 'confidence': classification_result.confidence }, 'severity': { 'suggested_severity': severity_result['suggested_severity'], 'confidence': severity_result['confidence_score'] } } log.processing_time = (log.completed_at - log.started_at).total_seconds() log.save() logger.info(f"Successfully processed incident {incident_id} with AI analysis") # Trigger correlation and duplication detection find_correlations.delay(incident_id) find_duplicates.delay(incident_id) return f"Successfully processed incident {incident_id}" except Incident.DoesNotExist: logger.error(f"Incident {incident_id} not found") return f"Incident {incident_id} not found" except Exception as e: logger.error(f"Error processing incident {incident_id}: {str(e)}") # Update processing log with error try: log = AIProcessingLog.objects.filter( incident_id=incident_id, processing_type='CLASSIFICATION', status='PROCESSING' ).first() if log: log.status = 'FAILED' log.error_message = str(e) log.completed_at = timezone.now() log.save() except: pass # Update incident with error try: incident = Incident.objects.get(id=incident_id) incident.ai_processing_error = str(e) incident.save() except: pass # Retry if not max retries reached if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to process incident {incident_id}: {str(e)}" @shared_task(bind=True, max_retries=3) def batch_process_incidents_ai(self, incident_ids): """ Process multiple incidents with AI analysis """ try: results = [] for incident_id in incident_ids: try: result = process_incident_ai.delay(incident_id) results.append({'incident_id': incident_id, 'task_id': result.id}) except Exception as e: logger.error(f"Failed to queue incident {incident_id}: {str(e)}") results.append({'incident_id': incident_id, 'error': str(e)}) return f"Queued {len(incident_ids)} incidents for processing" except Exception as e: logger.error(f"Error in batch processing: {str(e)}") if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to batch process incidents: {str(e)}" @shared_task(bind=True, max_retries=3) def find_correlations(self, incident_id): """ Find correlations for a specific incident """ try: incident = Incident.objects.get(id=incident_id) # Create processing log log = AIProcessingLog.objects.create( processing_type='CORRELATION', status='PROCESSING', incident=incident, input_data={'incident_id': str(incident_id)} ) # Get recent incidents for correlation analysis recent_incidents = Incident.objects.filter( created_at__gte=timezone.now() - timezone.timedelta(days=30) ).exclude(id=incident_id) # Initialize correlation engine correlation_engine = IncidentCorrelationEngine() # Find correlations correlations = correlation_engine.find_related_incidents( { 'id': str(incident.id), 'title': incident.title, 'description': incident.description, 'free_text': incident.free_text, 'category': incident.category, 'severity': incident.severity, 'status': incident.status, 'created_at': incident.created_at }, [ { 'id': str(inc.id), 'title': inc.title, 'description': inc.description, 'free_text': inc.free_text, 'category': inc.category, 'severity': inc.severity, 'status': inc.status, 'created_at': inc.created_at } for inc in recent_incidents ] ) # Save correlations correlation_count = 0 with transaction.atomic(): for related_incident, correlation_result in correlations: if correlation_result.confidence_score > 0.5: # Only save high-confidence correlations IncidentCorrelation.objects.update_or_create( primary_incident=incident, related_incident_id=related_incident['id'], defaults={ 'correlation_type': correlation_result.correlation_type, 'confidence_score': correlation_result.confidence_score, 'correlation_strength': correlation_result.correlation_strength, 'shared_keywords': correlation_result.shared_keywords, 'time_difference': correlation_result.time_difference, 'similarity_score': correlation_result.similarity_score, 'is_problem_indicator': correlation_result.is_problem_indicator, 'problem_description': correlation_result.problem_description, 'model_version': correlation_engine.model_version } ) correlation_count += 1 # Update processing log log.status = 'COMPLETED' log.completed_at = timezone.now() log.output_data = { 'correlations_found': correlation_count, 'total_analyzed': len(recent_incidents) } log.processing_time = (log.completed_at - log.started_at).total_seconds() log.save() logger.info(f"Found {correlation_count} correlations for incident {incident_id}") return f"Found {correlation_count} correlations for incident {incident_id}" except Incident.DoesNotExist: logger.error(f"Incident {incident_id} not found") return f"Incident {incident_id} not found" except Exception as e: logger.error(f"Error finding correlations for incident {incident_id}: {str(e)}") # Update processing log with error try: log = AIProcessingLog.objects.filter( incident_id=incident_id, processing_type='CORRELATION', status='PROCESSING' ).first() if log: log.status = 'FAILED' log.error_message = str(e) log.completed_at = timezone.now() log.save() except: pass if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to find correlations for incident {incident_id}: {str(e)}" @shared_task(bind=True, max_retries=3) def find_duplicates(self, incident_id): """ Find duplicates for a specific incident """ try: incident = Incident.objects.get(id=incident_id) # Create processing log log = AIProcessingLog.objects.create( processing_type='DUPLICATION_DETECTION', status='PROCESSING', incident=incident, input_data={'incident_id': str(incident_id)} ) # Get recent incidents for duplication detection recent_incidents = Incident.objects.filter( created_at__gte=timezone.now() - timezone.timedelta(days=7) ).exclude(id=incident_id) # Initialize duplication detector duplication_detector = DuplicationDetector() # Find duplicates duplicates = duplication_detector.find_duplicate_candidates( { 'id': str(incident.id), 'title': incident.title, 'description': incident.description, 'free_text': incident.free_text, 'category': incident.category, 'severity': incident.severity, 'status': incident.status, 'created_at': incident.created_at, 'assigned_to': incident.assigned_to_id, 'reporter': incident.reporter_id }, [ { 'id': str(inc.id), 'title': inc.title, 'description': inc.description, 'free_text': inc.free_text, 'category': inc.category, 'severity': inc.severity, 'status': inc.status, 'created_at': inc.created_at, 'assigned_to': inc.assigned_to_id, 'reporter': inc.reporter_id } for inc in recent_incidents ] ) # Save duplications duplication_count = 0 with transaction.atomic(): for duplicate_incident, duplication_result in duplicates: if duplication_result.confidence_score > 0.6: # Only save high-confidence duplications DuplicationDetection.objects.update_or_create( incident_a=incident, incident_b_id=duplicate_incident['id'], defaults={ 'duplication_type': duplication_result.duplication_type, 'similarity_score': duplication_result.similarity_score, 'confidence_score': duplication_result.confidence_score, 'text_similarity': duplication_result.text_similarity, 'temporal_proximity': duplication_result.temporal_proximity, 'service_similarity': duplication_result.service_similarity, 'recommended_action': duplication_result.recommended_action, 'merge_confidence': duplication_result.merge_confidence, 'reasoning': duplication_result.reasoning, 'shared_elements': duplication_result.shared_elements, 'model_version': duplication_detector.model_version } ) duplication_count += 1 # Update processing log log.status = 'COMPLETED' log.completed_at = timezone.now() log.output_data = { 'duplicates_found': duplication_count, 'total_analyzed': len(recent_incidents) } log.processing_time = (log.completed_at - log.started_at).total_seconds() log.save() logger.info(f"Found {duplication_count} duplicates for incident {incident_id}") return f"Found {duplication_count} duplicates for incident {incident_id}" except Incident.DoesNotExist: logger.error(f"Incident {incident_id} not found") return f"Incident {incident_id} not found" except Exception as e: logger.error(f"Error finding duplicates for incident {incident_id}: {str(e)}") # Update processing log with error try: log = AIProcessingLog.objects.filter( incident_id=incident_id, processing_type='DUPLICATION_DETECTION', status='PROCESSING' ).first() if log: log.status = 'FAILED' log.error_message = str(e) log.completed_at = timezone.now() log.save() except: pass if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to find duplicates for incident {incident_id}: {str(e)}" @shared_task(bind=True, max_retries=3) def detect_all_duplicates(self): """ Detect duplicates across all incidents """ try: # Get all incidents incidents = Incident.objects.all() # Initialize duplication detector duplication_detector = DuplicationDetector() # Convert to list of dictionaries incident_data = [ { 'id': str(inc.id), 'title': inc.title, 'description': inc.description, 'free_text': inc.free_text, 'category': inc.category, 'severity': inc.severity, 'status': inc.status, 'created_at': inc.created_at, 'assigned_to': inc.assigned_to_id, 'reporter': inc.reporter_id } for inc in incidents ] # Batch detect duplicates duplicates = duplication_detector.batch_detect_duplicates(incident_data) # Save duplications duplication_count = 0 with transaction.atomic(): for incident_a_data, incident_b_data, duplication_result in duplicates: if duplication_result.confidence_score > 0.6: # Only save high-confidence duplications DuplicationDetection.objects.update_or_create( incident_a_id=incident_a_data['id'], incident_b_id=incident_b_data['id'], defaults={ 'duplication_type': duplication_result.duplication_type, 'similarity_score': duplication_result.similarity_score, 'confidence_score': duplication_result.confidence_score, 'text_similarity': duplication_result.text_similarity, 'temporal_proximity': duplication_result.temporal_proximity, 'service_similarity': duplication_result.service_similarity, 'recommended_action': duplication_result.recommended_action, 'merge_confidence': duplication_result.merge_confidence, 'reasoning': duplication_result.reasoning, 'shared_elements': duplication_result.shared_elements, 'model_version': duplication_detector.model_version } ) duplication_count += 1 logger.info(f"Detected {duplication_count} duplicate pairs across all incidents") return f"Detected {duplication_count} duplicate pairs across all incidents" except Exception as e: logger.error(f"Error in batch duplicate detection: {str(e)}") if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to detect duplicates: {str(e)}" @shared_task(bind=True, max_retries=3) def correlate_all_incidents(self): """ Run correlation analysis on all incidents """ try: # Get all incidents incidents = Incident.objects.all() # Initialize correlation engine correlation_engine = IncidentCorrelationEngine() # Convert to list of dictionaries incident_data = [ { 'id': str(inc.id), 'title': inc.title, 'description': inc.description, 'free_text': inc.free_text, 'category': inc.category, 'severity': inc.severity, 'status': inc.status, 'created_at': inc.created_at } for inc in incidents ] # Detect problem clusters clusters = correlation_engine.detect_problem_clusters(incident_data) # Save correlations and patterns correlation_count = 0 pattern_count = 0 with transaction.atomic(): for cluster in clusters: # Create pattern if significant if len(cluster['incidents']) >= 3 and cluster['confidence'] > 0.7: pattern, created = IncidentPattern.objects.update_or_create( name=f"Pattern {cluster['problem_type']}", pattern_type=cluster['problem_type'], defaults={ 'description': f"Detected pattern of {cluster['problem_type']} affecting {len(cluster['incidents'])} incidents", 'frequency': f"Occurred {len(cluster['incidents'])} times", 'affected_services': list(set([ inc.get('category', 'Unknown') for inc in cluster['incidents'] ])), 'common_keywords': list(set([ keyword for corr in cluster['correlations'] for keyword in corr.shared_keywords ]))[:10], 'confidence_score': cluster['confidence'], 'last_occurrence': max([ inc['created_at'] for inc in cluster['incidents'] ]), 'model_version': correlation_engine.model_version } ) if created: pattern_count += 1 # Add incidents to pattern for incident_data in cluster['incidents']: try: incident = Incident.objects.get(id=incident_data['id']) pattern.incidents.add(incident) except Incident.DoesNotExist: continue pattern.incident_count = pattern.incidents.count() pattern.save() # Save correlations for incident_a_data, incident_b_data, correlation_result in zip( cluster['incidents'][:-1], cluster['incidents'][1:], cluster['correlations'] ): if correlation_result.confidence_score > 0.5: try: incident_a = Incident.objects.get(id=incident_a_data['id']) incident_b = Incident.objects.get(id=incident_b_data['id']) IncidentCorrelation.objects.update_or_create( primary_incident=incident_a, related_incident=incident_b, defaults={ 'correlation_type': correlation_result.correlation_type, 'confidence_score': correlation_result.confidence_score, 'correlation_strength': correlation_result.correlation_strength, 'shared_keywords': correlation_result.shared_keywords, 'time_difference': correlation_result.time_difference, 'similarity_score': correlation_result.similarity_score, 'is_problem_indicator': correlation_result.is_problem_indicator, 'problem_description': correlation_result.problem_description, 'model_version': correlation_engine.model_version } ) correlation_count += 1 except Incident.DoesNotExist: continue logger.info(f"Created {correlation_count} correlations and {pattern_count} patterns") return f"Created {correlation_count} correlations and {pattern_count} patterns" except Exception as e: logger.error(f"Error in correlation analysis: {str(e)}") if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to correlate incidents: {str(e)}" @shared_task(bind=True, max_retries=3) def merge_duplicate_incidents(self, duplication_id): """ Merge duplicate incidents """ try: duplication = DuplicationDetection.objects.get(id=duplication_id) if duplication.status != 'REVIEWED': return f"Duplication {duplication_id} is not approved for merging" incident_a = duplication.incident_a incident_b = duplication.incident_b with transaction.atomic(): # Merge incident data (keep the older incident as primary) if incident_a.created_at <= incident_b.created_at: primary_incident = incident_a secondary_incident = incident_b else: primary_incident = incident_b secondary_incident = incident_a # Update primary incident with information from secondary if not primary_incident.description and secondary_incident.description: primary_incident.description = secondary_incident.description if not primary_incident.free_text and secondary_incident.free_text: primary_incident.free_text = secondary_incident.free_text if not primary_incident.business_impact and secondary_incident.business_impact: primary_incident.business_impact = secondary_incident.business_impact # Update affected users (take maximum) primary_incident.affected_users = max( primary_incident.affected_users, secondary_incident.affected_users ) # Update severity (take higher severity) severity_order = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4, 'EMERGENCY': 5} if severity_order.get(secondary_incident.severity, 0) > severity_order.get(primary_incident.severity, 0): primary_incident.severity = secondary_incident.severity primary_incident.save() # Mark secondary incident as duplicate secondary_incident.is_duplicate = True secondary_incident.original_incident = primary_incident secondary_incident.duplicate_confidence = duplication.confidence_score secondary_incident.status = 'CLOSED' secondary_incident.save() # Update duplication status duplication.status = 'MERGED' duplication.save() logger.info(f"Successfully merged incidents {incident_a.id} and {incident_b.id}") return f"Successfully merged incidents {incident_a.id} and {incident_b.id}" except DuplicationDetection.DoesNotExist: logger.error(f"Duplication {duplication_id} not found") return f"Duplication {duplication_id} not found" except Exception as e: logger.error(f"Error merging incidents for duplication {duplication_id}: {str(e)}") if self.request.retries < self.max_retries: raise self.retry(countdown=60 * (2 ** self.request.retries)) return f"Failed to merge incidents: {str(e)}"