from django.db import models from django.contrib.auth import get_user_model from django.core.validators import MinValueValidator, MaxValueValidator from django.utils import timezone import uuid User = get_user_model() class Incident(models.Model): """Main incident model for tracking and managing incidents""" SEVERITY_CHOICES = [ ('LOW', 'Low'), ('MEDIUM', 'Medium'), ('HIGH', 'High'), ('CRITICAL', 'Critical'), ('EMERGENCY', 'Emergency'), ] STATUS_CHOICES = [ ('OPEN', 'Open'), ('IN_PROGRESS', 'In Progress'), ('RESOLVED', 'Resolved'), ('CLOSED', 'Closed'), ('CANCELLED', 'Cancelled'), ] PRIORITY_CHOICES = [ ('P1', 'P1 - Critical'), ('P2', 'P2 - High'), ('P3', 'P3 - Medium'), ('P4', 'P4 - Low'), ] id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) title = models.CharField(max_length=200) description = models.TextField() free_text = models.TextField(help_text="Original free text description from user") # Classification fields category = models.CharField(max_length=100, blank=True, null=True) subcategory = models.CharField(max_length=100, blank=True, null=True) classification_confidence = models.FloatField( validators=[MinValueValidator(0.0), MaxValueValidator(1.0)], null=True, blank=True, help_text="AI confidence score for classification (0.0-1.0)" ) # Severity and Priority severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES, default='MEDIUM') suggested_severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES, blank=True, null=True) severity_confidence = models.FloatField( validators=[MinValueValidator(0.0), MaxValueValidator(1.0)], null=True, blank=True, help_text="AI confidence score for severity suggestion (0.0-1.0)" ) priority = models.CharField(max_length=10, choices=PRIORITY_CHOICES, default='P3') # Status and Assignment status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='OPEN') assigned_to = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) reporter = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='reported_incidents') # Timestamps created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) resolved_at = models.DateTimeField(null=True, blank=True) # Impact and Business Context affected_users = models.PositiveIntegerField(default=0) business_impact = models.TextField(blank=True, null=True) estimated_downtime = models.DurationField(null=True, blank=True) # AI Processing Flags ai_processed = models.BooleanField(default=False) ai_processing_error = models.TextField(blank=True, null=True) last_ai_analysis = models.DateTimeField(null=True, blank=True) # Automation Integration automation_enabled = models.BooleanField( default=True, help_text="Whether automation can be triggered for this incident" ) runbook_suggested = models.BooleanField( default=False, help_text="Whether a runbook has been suggested for this incident" ) auto_remediation_attempted = models.BooleanField( default=False, help_text="Whether auto-remediation has been attempted" ) maintenance_window_override = models.BooleanField( default=False, help_text="Whether this incident should override maintenance window suppressions" ) # SLA Integration sla_override = models.BooleanField( default=False, help_text="Whether this incident overrides normal SLA calculations" ) sla_override_reason = models.TextField( blank=True, null=True, help_text="Reason for SLA override" ) oncall_assignment = models.ForeignKey( 'sla_oncall.OnCallAssignment', on_delete=models.SET_NULL, null=True, blank=True, related_name='assigned_incidents', help_text="On-call assignment responsible for this incident" ) # Duplication Detection is_duplicate = models.BooleanField(default=False) original_incident = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='duplicates') duplicate_confidence = models.FloatField( validators=[MinValueValidator(0.0), MaxValueValidator(1.0)], null=True, blank=True, help_text="AI confidence score for duplication detection (0.0-1.0)" ) # Security Integration data_classification = models.ForeignKey( 'security.DataClassification', on_delete=models.SET_NULL, null=True, blank=True, help_text="Data classification level for this incident" ) security_clearance_required = models.BooleanField( default=False, help_text="Whether this incident requires special security clearance" ) is_sensitive = models.BooleanField( default=False, help_text="Whether this incident contains sensitive information" ) class Meta: ordering = ['-created_at'] indexes = [ models.Index(fields=['status', 'severity']), models.Index(fields=['category', 'subcategory']), models.Index(fields=['created_at']), models.Index(fields=['assigned_to']), ] def __str__(self): return f"{self.title} ({self.severity})" @property def is_resolved(self): return self.status in ['RESOLVED', 'CLOSED'] @property def resolution_time(self): if self.resolved_at and self.created_at: return self.resolved_at - self.created_at return None def has_user_access(self, user): """Check if user has access to this incident based on security clearance""" if not self.data_classification: return True # No classification means accessible to all return user.has_data_access(self.data_classification.level) def get_required_clearance_level(self): """Get the required clearance level for this incident""" if self.data_classification: return self.data_classification.level return 1 # Default to PUBLIC level def is_accessible_by_user(self, user): """Check if user can access this incident""" # Check basic access if not self.has_user_access(user): return False # Check if incident is sensitive and user has appropriate clearance if self.is_sensitive and not user.clearance_level: return False # Check security clearance requirement if self.security_clearance_required: if not user.clearance_level or user.clearance_level.level < self.get_required_clearance_level(): return False return True def should_suppress_for_maintenance(self): """Check if this incident should be suppressed due to active maintenance windows""" if self.maintenance_window_override: return False from automation_orchestration.models import MaintenanceWindow from django.utils import timezone now = timezone.now() active_maintenance = MaintenanceWindow.objects.filter( start_time__lte=now, end_time__gte=now, status='ACTIVE', suppress_incident_creation=True ) # Check if any active maintenance window affects this incident for maintenance in active_maintenance: # Check if incident category matches affected services/components if (self.category in maintenance.affected_services or self.category in maintenance.affected_components): return True return False class IncidentClassification(models.Model): """AI-driven incident classification results""" incident = models.OneToOneField(Incident, on_delete=models.CASCADE, related_name='ai_classification') # Classification results predicted_category = models.CharField(max_length=100) predicted_subcategory = models.CharField(max_length=100) confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Alternative classifications alternative_categories = models.JSONField(default=list, help_text="List of alternative category predictions") # NLP Analysis extracted_keywords = models.JSONField(default=list, help_text="Keywords extracted from incident text") sentiment_score = models.FloatField(null=True, blank=True, help_text="Sentiment analysis score (-1 to 1)") urgency_indicators = models.JSONField(default=list, help_text="Detected urgency indicators") # Processing metadata model_version = models.CharField(max_length=50, default='v1.0') processing_time = models.FloatField(help_text="Time taken for classification in seconds") created_at = models.DateTimeField(auto_now_add=True) def __str__(self): return f"Classification for {self.incident.title}: {self.predicted_category}" class SeveritySuggestion(models.Model): """AI-driven severity suggestions based on impact analysis""" incident = models.OneToOneField(Incident, on_delete=models.CASCADE, related_name='severity_suggestion') # Severity prediction suggested_severity = models.CharField(max_length=20, choices=Incident.SEVERITY_CHOICES) confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Impact analysis factors user_impact_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) business_impact_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) technical_impact_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Reasoning reasoning = models.TextField(help_text="AI explanation for severity suggestion") impact_factors = models.JSONField(default=list, help_text="List of factors that influenced the severity") # Processing metadata model_version = models.CharField(max_length=50, default='v1.0') processing_time = models.FloatField(help_text="Time taken for severity analysis in seconds") created_at = models.DateTimeField(auto_now_add=True) def __str__(self): return f"Severity suggestion for {self.incident.title}: {self.suggested_severity}" class IncidentCorrelation(models.Model): """Correlation engine for linking related incidents""" CORRELATION_TYPE_CHOICES = [ ('SAME_SERVICE', 'Same Service'), ('SAME_COMPONENT', 'Same Component'), ('TEMPORAL', 'Temporal Correlation'), ('PATTERN', 'Pattern Match'), ('DEPENDENCY', 'Dependency Related'), ('CASCADE', 'Cascade Effect'), ] # Related incidents primary_incident = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='correlations_as_primary') related_incident = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='correlations_as_related') # Correlation details correlation_type = models.CharField(max_length=20, choices=CORRELATION_TYPE_CHOICES) confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) correlation_strength = models.CharField(max_length=20, choices=[ ('WEAK', 'Weak'), ('MODERATE', 'Moderate'), ('STRONG', 'Strong'), ('VERY_STRONG', 'Very Strong'), ]) # Analysis details shared_keywords = models.JSONField(default=list, help_text="Keywords shared between incidents") time_difference = models.DurationField(help_text="Time difference between incidents") similarity_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Problem detection is_problem_indicator = models.BooleanField(default=False, help_text="Indicates if this correlation suggests a larger problem") problem_description = models.TextField(blank=True, null=True) # Metadata created_at = models.DateTimeField(auto_now_add=True) model_version = models.CharField(max_length=50, default='v1.0') class Meta: unique_together = ['primary_incident', 'related_incident'] indexes = [ models.Index(fields=['correlation_type', 'confidence_score']), models.Index(fields=['is_problem_indicator']), ] def __str__(self): return f"Correlation: {self.primary_incident.title} <-> {self.related_incident.title}" class DuplicationDetection(models.Model): """Duplication detection results for incident merging""" DUPLICATION_TYPE_CHOICES = [ ('EXACT', 'Exact Duplicate'), ('NEAR_DUPLICATE', 'Near Duplicate'), ('SIMILAR', 'Similar Incident'), ('POTENTIAL_DUPLICATE', 'Potential Duplicate'), ] # Incident pair incident_a = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='duplication_as_a') incident_b = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='duplication_as_b') # Duplication analysis duplication_type = models.CharField(max_length=20, choices=DUPLICATION_TYPE_CHOICES) similarity_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Analysis details text_similarity = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) temporal_proximity = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) service_similarity = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Merge recommendation recommended_action = models.CharField(max_length=20, choices=[ ('MERGE', 'Merge Incidents'), ('LINK', 'Link Incidents'), ('REVIEW', 'Manual Review'), ('NO_ACTION', 'No Action'), ]) merge_confidence = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Reasoning reasoning = models.TextField(help_text="AI explanation for duplication detection") shared_elements = models.JSONField(default=list, help_text="Elements shared between incidents") # Status status = models.CharField(max_length=20, choices=[ ('DETECTED', 'Detected'), ('REVIEWED', 'Reviewed'), ('MERGED', 'Merged'), ('REJECTED', 'Rejected'), ], default='DETECTED') # Metadata created_at = models.DateTimeField(auto_now_add=True) reviewed_at = models.DateTimeField(null=True, blank=True) reviewed_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True) model_version = models.CharField(max_length=50, default='v1.0') class Meta: unique_together = ['incident_a', 'incident_b'] indexes = [ models.Index(fields=['duplication_type', 'confidence_score']), models.Index(fields=['status']), ] def __str__(self): return f"Duplication: {self.incident_a.title} <-> {self.incident_b.title}" class IncidentPattern(models.Model): """Pattern detection for identifying recurring issues""" PATTERN_TYPE_CHOICES = [ ('RECURRING', 'Recurring Issue'), ('SEASONAL', 'Seasonal Pattern'), ('TREND', 'Trend Analysis'), ('ANOMALY', 'Anomaly Detection'), ] name = models.CharField(max_length=200) pattern_type = models.CharField(max_length=20, choices=PATTERN_TYPE_CHOICES) description = models.TextField() # Pattern characteristics frequency = models.CharField(max_length=50, help_text="How often this pattern occurs") affected_services = models.JSONField(default=list, help_text="Services affected by this pattern") common_keywords = models.JSONField(default=list, help_text="Common keywords in incidents with this pattern") # Related incidents incidents = models.ManyToManyField(Incident, related_name='patterns') incident_count = models.PositiveIntegerField(default=0) # Pattern analysis confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) last_occurrence = models.DateTimeField(null=True, blank=True) next_predicted_occurrence = models.DateTimeField(null=True, blank=True) # Status is_active = models.BooleanField(default=True) is_resolved = models.BooleanField(default=False) # Metadata created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) model_version = models.CharField(max_length=50, default='v1.0') class Meta: ordering = ['-confidence_score', '-incident_count'] indexes = [ models.Index(fields=['pattern_type', 'is_active']), models.Index(fields=['confidence_score']), ] def __str__(self): return f"Pattern: {self.name} ({self.pattern_type})" class AIProcessingLog(models.Model): """Log of AI processing activities for audit and debugging""" PROCESSING_TYPE_CHOICES = [ ('CLASSIFICATION', 'Classification'), ('SEVERITY_ANALYSIS', 'Severity Analysis'), ('CORRELATION', 'Correlation Analysis'), ('DUPLICATION_DETECTION', 'Duplication Detection'), ('PATTERN_DETECTION', 'Pattern Detection'), ] STATUS_CHOICES = [ ('PENDING', 'Pending'), ('PROCESSING', 'Processing'), ('COMPLETED', 'Completed'), ('FAILED', 'Failed'), ('SKIPPED', 'Skipped'), ] # Processing details processing_type = models.CharField(max_length=30, choices=PROCESSING_TYPE_CHOICES) status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING') # Related objects incident = models.ForeignKey(Incident, on_delete=models.CASCADE, null=True, blank=True) related_incidents = models.JSONField(default=list, help_text="List of related incident IDs") # Processing results input_data = models.JSONField(help_text="Input data for processing") output_data = models.JSONField(null=True, blank=True, help_text="Output data from processing") error_message = models.TextField(blank=True, null=True) # Performance metrics processing_time = models.FloatField(null=True, blank=True, help_text="Processing time in seconds") model_version = models.CharField(max_length=50, default='v1.0') confidence_score = models.FloatField(null=True, blank=True, validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]) # Timestamps started_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True, blank=True) class Meta: ordering = ['-started_at'] indexes = [ models.Index(fields=['processing_type', 'status']), models.Index(fields=['incident', 'processing_type']), ] def __str__(self): return f"{self.processing_type} - {self.status} - {self.incident.title if self.incident else 'N/A'}"