Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

484 lines
19 KiB
Python

from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.utils import timezone
import uuid
User = get_user_model()
class Incident(models.Model):
"""Main incident model for tracking and managing incidents"""
SEVERITY_CHOICES = [
('LOW', 'Low'),
('MEDIUM', 'Medium'),
('HIGH', 'High'),
('CRITICAL', 'Critical'),
('EMERGENCY', 'Emergency'),
]
STATUS_CHOICES = [
('OPEN', 'Open'),
('IN_PROGRESS', 'In Progress'),
('RESOLVED', 'Resolved'),
('CLOSED', 'Closed'),
('CANCELLED', 'Cancelled'),
]
PRIORITY_CHOICES = [
('P1', 'P1 - Critical'),
('P2', 'P2 - High'),
('P3', 'P3 - Medium'),
('P4', 'P4 - Low'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
title = models.CharField(max_length=200)
description = models.TextField()
free_text = models.TextField(help_text="Original free text description from user")
# Classification fields
category = models.CharField(max_length=100, blank=True, null=True)
subcategory = models.CharField(max_length=100, blank=True, null=True)
classification_confidence = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)],
null=True, blank=True,
help_text="AI confidence score for classification (0.0-1.0)"
)
# Severity and Priority
severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES, default='MEDIUM')
suggested_severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES, blank=True, null=True)
severity_confidence = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)],
null=True, blank=True,
help_text="AI confidence score for severity suggestion (0.0-1.0)"
)
priority = models.CharField(max_length=10, choices=PRIORITY_CHOICES, default='P3')
# Status and Assignment
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='OPEN')
assigned_to = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
reporter = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='reported_incidents')
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
resolved_at = models.DateTimeField(null=True, blank=True)
# Impact and Business Context
affected_users = models.PositiveIntegerField(default=0)
business_impact = models.TextField(blank=True, null=True)
estimated_downtime = models.DurationField(null=True, blank=True)
# AI Processing Flags
ai_processed = models.BooleanField(default=False)
ai_processing_error = models.TextField(blank=True, null=True)
last_ai_analysis = models.DateTimeField(null=True, blank=True)
# Automation Integration
automation_enabled = models.BooleanField(
default=True,
help_text="Whether automation can be triggered for this incident"
)
runbook_suggested = models.BooleanField(
default=False,
help_text="Whether a runbook has been suggested for this incident"
)
auto_remediation_attempted = models.BooleanField(
default=False,
help_text="Whether auto-remediation has been attempted"
)
maintenance_window_override = models.BooleanField(
default=False,
help_text="Whether this incident should override maintenance window suppressions"
)
# SLA Integration
sla_override = models.BooleanField(
default=False,
help_text="Whether this incident overrides normal SLA calculations"
)
sla_override_reason = models.TextField(
blank=True,
null=True,
help_text="Reason for SLA override"
)
oncall_assignment = models.ForeignKey(
'sla_oncall.OnCallAssignment',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='assigned_incidents',
help_text="On-call assignment responsible for this incident"
)
# Duplication Detection
is_duplicate = models.BooleanField(default=False)
original_incident = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='duplicates')
duplicate_confidence = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)],
null=True, blank=True,
help_text="AI confidence score for duplication detection (0.0-1.0)"
)
# Security Integration
data_classification = models.ForeignKey(
'security.DataClassification',
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text="Data classification level for this incident"
)
security_clearance_required = models.BooleanField(
default=False,
help_text="Whether this incident requires special security clearance"
)
is_sensitive = models.BooleanField(
default=False,
help_text="Whether this incident contains sensitive information"
)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['status', 'severity']),
models.Index(fields=['category', 'subcategory']),
models.Index(fields=['created_at']),
models.Index(fields=['assigned_to']),
]
def __str__(self):
return f"{self.title} ({self.severity})"
@property
def is_resolved(self):
return self.status in ['RESOLVED', 'CLOSED']
@property
def resolution_time(self):
if self.resolved_at and self.created_at:
return self.resolved_at - self.created_at
return None
def has_user_access(self, user):
"""Check if user has access to this incident based on security clearance"""
if not self.data_classification:
return True # No classification means accessible to all
return user.has_data_access(self.data_classification.level)
def get_required_clearance_level(self):
"""Get the required clearance level for this incident"""
if self.data_classification:
return self.data_classification.level
return 1 # Default to PUBLIC level
def is_accessible_by_user(self, user):
"""Check if user can access this incident"""
# Check basic access
if not self.has_user_access(user):
return False
# Check if incident is sensitive and user has appropriate clearance
if self.is_sensitive and not user.clearance_level:
return False
# Check security clearance requirement
if self.security_clearance_required:
if not user.clearance_level or user.clearance_level.level < self.get_required_clearance_level():
return False
return True
def should_suppress_for_maintenance(self):
"""Check if this incident should be suppressed due to active maintenance windows"""
if self.maintenance_window_override:
return False
from automation_orchestration.models import MaintenanceWindow
from django.utils import timezone
now = timezone.now()
active_maintenance = MaintenanceWindow.objects.filter(
start_time__lte=now,
end_time__gte=now,
status='ACTIVE',
suppress_incident_creation=True
)
# Check if any active maintenance window affects this incident
for maintenance in active_maintenance:
# Check if incident category matches affected services/components
if (self.category in maintenance.affected_services or
self.category in maintenance.affected_components):
return True
return False
class IncidentClassification(models.Model):
"""AI-driven incident classification results"""
incident = models.OneToOneField(Incident, on_delete=models.CASCADE, related_name='ai_classification')
# Classification results
predicted_category = models.CharField(max_length=100)
predicted_subcategory = models.CharField(max_length=100)
confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Alternative classifications
alternative_categories = models.JSONField(default=list, help_text="List of alternative category predictions")
# NLP Analysis
extracted_keywords = models.JSONField(default=list, help_text="Keywords extracted from incident text")
sentiment_score = models.FloatField(null=True, blank=True, help_text="Sentiment analysis score (-1 to 1)")
urgency_indicators = models.JSONField(default=list, help_text="Detected urgency indicators")
# Processing metadata
model_version = models.CharField(max_length=50, default='v1.0')
processing_time = models.FloatField(help_text="Time taken for classification in seconds")
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f"Classification for {self.incident.title}: {self.predicted_category}"
class SeveritySuggestion(models.Model):
"""AI-driven severity suggestions based on impact analysis"""
incident = models.OneToOneField(Incident, on_delete=models.CASCADE, related_name='severity_suggestion')
# Severity prediction
suggested_severity = models.CharField(max_length=20, choices=Incident.SEVERITY_CHOICES)
confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Impact analysis factors
user_impact_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
business_impact_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
technical_impact_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Reasoning
reasoning = models.TextField(help_text="AI explanation for severity suggestion")
impact_factors = models.JSONField(default=list, help_text="List of factors that influenced the severity")
# Processing metadata
model_version = models.CharField(max_length=50, default='v1.0')
processing_time = models.FloatField(help_text="Time taken for severity analysis in seconds")
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f"Severity suggestion for {self.incident.title}: {self.suggested_severity}"
class IncidentCorrelation(models.Model):
"""Correlation engine for linking related incidents"""
CORRELATION_TYPE_CHOICES = [
('SAME_SERVICE', 'Same Service'),
('SAME_COMPONENT', 'Same Component'),
('TEMPORAL', 'Temporal Correlation'),
('PATTERN', 'Pattern Match'),
('DEPENDENCY', 'Dependency Related'),
('CASCADE', 'Cascade Effect'),
]
# Related incidents
primary_incident = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='correlations_as_primary')
related_incident = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='correlations_as_related')
# Correlation details
correlation_type = models.CharField(max_length=20, choices=CORRELATION_TYPE_CHOICES)
confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
correlation_strength = models.CharField(max_length=20, choices=[
('WEAK', 'Weak'),
('MODERATE', 'Moderate'),
('STRONG', 'Strong'),
('VERY_STRONG', 'Very Strong'),
])
# Analysis details
shared_keywords = models.JSONField(default=list, help_text="Keywords shared between incidents")
time_difference = models.DurationField(help_text="Time difference between incidents")
similarity_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Problem detection
is_problem_indicator = models.BooleanField(default=False, help_text="Indicates if this correlation suggests a larger problem")
problem_description = models.TextField(blank=True, null=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
model_version = models.CharField(max_length=50, default='v1.0')
class Meta:
unique_together = ['primary_incident', 'related_incident']
indexes = [
models.Index(fields=['correlation_type', 'confidence_score']),
models.Index(fields=['is_problem_indicator']),
]
def __str__(self):
return f"Correlation: {self.primary_incident.title} <-> {self.related_incident.title}"
class DuplicationDetection(models.Model):
"""Duplication detection results for incident merging"""
DUPLICATION_TYPE_CHOICES = [
('EXACT', 'Exact Duplicate'),
('NEAR_DUPLICATE', 'Near Duplicate'),
('SIMILAR', 'Similar Incident'),
('POTENTIAL_DUPLICATE', 'Potential Duplicate'),
]
# Incident pair
incident_a = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='duplication_as_a')
incident_b = models.ForeignKey(Incident, on_delete=models.CASCADE, related_name='duplication_as_b')
# Duplication analysis
duplication_type = models.CharField(max_length=20, choices=DUPLICATION_TYPE_CHOICES)
similarity_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Analysis details
text_similarity = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
temporal_proximity = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
service_similarity = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Merge recommendation
recommended_action = models.CharField(max_length=20, choices=[
('MERGE', 'Merge Incidents'),
('LINK', 'Link Incidents'),
('REVIEW', 'Manual Review'),
('NO_ACTION', 'No Action'),
])
merge_confidence = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Reasoning
reasoning = models.TextField(help_text="AI explanation for duplication detection")
shared_elements = models.JSONField(default=list, help_text="Elements shared between incidents")
# Status
status = models.CharField(max_length=20, choices=[
('DETECTED', 'Detected'),
('REVIEWED', 'Reviewed'),
('MERGED', 'Merged'),
('REJECTED', 'Rejected'),
], default='DETECTED')
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
reviewed_at = models.DateTimeField(null=True, blank=True)
reviewed_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
model_version = models.CharField(max_length=50, default='v1.0')
class Meta:
unique_together = ['incident_a', 'incident_b']
indexes = [
models.Index(fields=['duplication_type', 'confidence_score']),
models.Index(fields=['status']),
]
def __str__(self):
return f"Duplication: {self.incident_a.title} <-> {self.incident_b.title}"
class IncidentPattern(models.Model):
"""Pattern detection for identifying recurring issues"""
PATTERN_TYPE_CHOICES = [
('RECURRING', 'Recurring Issue'),
('SEASONAL', 'Seasonal Pattern'),
('TREND', 'Trend Analysis'),
('ANOMALY', 'Anomaly Detection'),
]
name = models.CharField(max_length=200)
pattern_type = models.CharField(max_length=20, choices=PATTERN_TYPE_CHOICES)
description = models.TextField()
# Pattern characteristics
frequency = models.CharField(max_length=50, help_text="How often this pattern occurs")
affected_services = models.JSONField(default=list, help_text="Services affected by this pattern")
common_keywords = models.JSONField(default=list, help_text="Common keywords in incidents with this pattern")
# Related incidents
incidents = models.ManyToManyField(Incident, related_name='patterns')
incident_count = models.PositiveIntegerField(default=0)
# Pattern analysis
confidence_score = models.FloatField(validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
last_occurrence = models.DateTimeField(null=True, blank=True)
next_predicted_occurrence = models.DateTimeField(null=True, blank=True)
# Status
is_active = models.BooleanField(default=True)
is_resolved = models.BooleanField(default=False)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
model_version = models.CharField(max_length=50, default='v1.0')
class Meta:
ordering = ['-confidence_score', '-incident_count']
indexes = [
models.Index(fields=['pattern_type', 'is_active']),
models.Index(fields=['confidence_score']),
]
def __str__(self):
return f"Pattern: {self.name} ({self.pattern_type})"
class AIProcessingLog(models.Model):
"""Log of AI processing activities for audit and debugging"""
PROCESSING_TYPE_CHOICES = [
('CLASSIFICATION', 'Classification'),
('SEVERITY_ANALYSIS', 'Severity Analysis'),
('CORRELATION', 'Correlation Analysis'),
('DUPLICATION_DETECTION', 'Duplication Detection'),
('PATTERN_DETECTION', 'Pattern Detection'),
]
STATUS_CHOICES = [
('PENDING', 'Pending'),
('PROCESSING', 'Processing'),
('COMPLETED', 'Completed'),
('FAILED', 'Failed'),
('SKIPPED', 'Skipped'),
]
# Processing details
processing_type = models.CharField(max_length=30, choices=PROCESSING_TYPE_CHOICES)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
# Related objects
incident = models.ForeignKey(Incident, on_delete=models.CASCADE, null=True, blank=True)
related_incidents = models.JSONField(default=list, help_text="List of related incident IDs")
# Processing results
input_data = models.JSONField(help_text="Input data for processing")
output_data = models.JSONField(null=True, blank=True, help_text="Output data from processing")
error_message = models.TextField(blank=True, null=True)
# Performance metrics
processing_time = models.FloatField(null=True, blank=True, help_text="Processing time in seconds")
model_version = models.CharField(max_length=50, default='v1.0')
confidence_score = models.FloatField(null=True, blank=True, validators=[MinValueValidator(0.0), MaxValueValidator(1.0)])
# Timestamps
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-started_at']
indexes = [
models.Index(fields=['processing_type', 'status']),
models.Index(fields=['incident', 'processing_type']),
]
def __str__(self):
return f"{self.processing_type} - {self.status} - {self.incident.title if self.incident else 'N/A'}"