Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

828 lines
26 KiB
Python

"""
Analytics & Predictive Insights models for Enterprise Incident Management API
Implements advanced KPIs, predictive analytics, ML-based anomaly detection, and cost analysis
"""
import uuid
import json
from datetime import datetime, timedelta, time
from typing import Dict, Any, Optional, List
from decimal import Decimal
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.utils import timezone
from django.core.exceptions import ValidationError
User = get_user_model()
class KPIMetric(models.Model):
"""Base model for KPI metrics tracking"""
METRIC_TYPES = [
('MTTA', 'Mean Time to Acknowledge'),
('MTTR', 'Mean Time to Resolve'),
('MTBF', 'Mean Time Between Failures'),
('MTBSI', 'Mean Time Between Service Incidents'),
('AVAILABILITY', 'Service Availability'),
('INCIDENT_COUNT', 'Incident Count'),
('RESOLUTION_RATE', 'Resolution Rate'),
('ESCALATION_RATE', 'Escalation Rate'),
('CUSTOM', 'Custom Metric'),
]
AGGREGATION_TYPES = [
('AVERAGE', 'Average'),
('MEDIAN', 'Median'),
('MIN', 'Minimum'),
('MAX', 'Maximum'),
('SUM', 'Sum'),
('COUNT', 'Count'),
('PERCENTILE_95', '95th Percentile'),
('PERCENTILE_99', '99th Percentile'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
description = models.TextField()
metric_type = models.CharField(max_length=20, choices=METRIC_TYPES)
aggregation_type = models.CharField(max_length=20, choices=AGGREGATION_TYPES)
# Targeting criteria
incident_categories = models.JSONField(
default=list,
help_text="List of incident categories this metric applies to"
)
incident_severities = models.JSONField(
default=list,
help_text="List of incident severities this metric applies to"
)
incident_priorities = models.JSONField(
default=list,
help_text="List of incident priorities this metric applies to"
)
# Calculation configuration
calculation_formula = models.TextField(
blank=True,
null=True,
help_text="Custom calculation formula for complex metrics"
)
time_window_hours = models.PositiveIntegerField(
default=24,
help_text="Time window for metric calculation in hours"
)
# Status and metadata
is_active = models.BooleanField(default=True)
is_system_metric = models.BooleanField(
default=False,
help_text="Whether this is a system-defined metric"
)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['metric_type', 'is_active']),
models.Index(fields=['incident_categories']),
models.Index(fields=['incident_severities']),
]
def __str__(self):
return f"{self.name} ({self.metric_type})"
class KPIMeasurement(models.Model):
"""Individual measurements of KPI metrics"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
metric = models.ForeignKey(KPIMetric, on_delete=models.CASCADE, related_name='measurements')
# Measurement details
value = models.DecimalField(max_digits=15, decimal_places=4)
unit = models.CharField(max_length=50, help_text="Unit of measurement (minutes, hours, percentage, etc.)")
# Time period
measurement_period_start = models.DateTimeField()
measurement_period_end = models.DateTimeField()
# Context
incident_count = models.PositiveIntegerField(
default=0,
help_text="Number of incidents included in this measurement"
)
sample_size = models.PositiveIntegerField(
default=0,
help_text="Total sample size for this measurement"
)
# Additional metadata
metadata = models.JSONField(
default=dict,
help_text="Additional metadata for this measurement"
)
# Timestamps
calculated_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-calculated_at']
indexes = [
models.Index(fields=['metric', 'measurement_period_start']),
models.Index(fields=['calculated_at']),
]
def __str__(self):
return f"{self.metric.name}: {self.value} {self.unit}"
class IncidentRecurrenceAnalysis(models.Model):
"""Analysis of incident recurrence patterns"""
RECURRENCE_TYPES = [
('EXACT_DUPLICATE', 'Exact Duplicate'),
('SIMILAR_PATTERN', 'Similar Pattern'),
('SEASONAL', 'Seasonal Recurrence'),
('TREND', 'Trend-based Recurrence'),
('CASCADE', 'Cascade Effect'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# Related incidents
primary_incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.CASCADE,
related_name='recurrence_analyses_as_primary'
)
recurring_incidents = models.ManyToManyField(
'incident_intelligence.Incident',
related_name='recurrence_analyses_as_recurring'
)
# Analysis details
recurrence_type = models.CharField(max_length=20, choices=RECURRENCE_TYPES)
confidence_score = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
recurrence_rate = models.FloatField(
help_text="Rate of recurrence (incidents per time period)"
)
# Pattern characteristics
common_keywords = models.JSONField(
default=list,
help_text="Common keywords across recurring incidents"
)
common_categories = models.JSONField(
default=list,
help_text="Common categories across recurring incidents"
)
time_pattern = models.JSONField(
default=dict,
help_text="Time-based pattern analysis"
)
# Impact analysis
total_affected_users = models.PositiveIntegerField(default=0)
total_downtime_hours = models.DecimalField(max_digits=10, decimal_places=2, default=0)
estimated_cost_impact = models.DecimalField(max_digits=15, decimal_places=2, default=0)
# Recommendations
prevention_recommendations = models.JSONField(
default=list,
help_text="AI-generated recommendations to prevent recurrence"
)
automation_opportunities = models.JSONField(
default=list,
help_text="Potential automation opportunities identified"
)
# Automation integration
suggested_runbooks = models.ManyToManyField(
'automation_orchestration.Runbook',
blank=True,
related_name='recurrence_analyses',
help_text="Runbooks suggested to prevent recurrence"
)
# Status
is_resolved = models.BooleanField(default=False)
resolution_actions = models.JSONField(
default=list,
help_text="Actions taken to resolve the recurrence pattern"
)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
model_version = models.CharField(max_length=50, default='v1.0')
class Meta:
ordering = ['-confidence_score', '-created_at']
indexes = [
models.Index(fields=['recurrence_type', 'confidence_score']),
models.Index(fields=['is_resolved']),
]
def __str__(self):
return f"Recurrence Analysis: {self.primary_incident.title} ({self.recurrence_type})"
class PredictiveModel(models.Model):
"""ML models for predictive analytics"""
MODEL_TYPES = [
('ANOMALY_DETECTION', 'Anomaly Detection'),
('INCIDENT_PREDICTION', 'Incident Prediction'),
('SEVERITY_PREDICTION', 'Severity Prediction'),
('RESOLUTION_TIME_PREDICTION', 'Resolution Time Prediction'),
('ESCALATION_PREDICTION', 'Escalation Prediction'),
('COST_PREDICTION', 'Cost Impact Prediction'),
]
ALGORITHM_TYPES = [
('ISOLATION_FOREST', 'Isolation Forest'),
('LSTM', 'Long Short-Term Memory'),
('RANDOM_FOREST', 'Random Forest'),
('XGBOOST', 'XGBoost'),
('SVM', 'Support Vector Machine'),
('NEURAL_NETWORK', 'Neural Network'),
('ARIMA', 'ARIMA'),
('PROPHET', 'Prophet'),
]
STATUS_CHOICES = [
('TRAINING', 'Training'),
('ACTIVE', 'Active'),
('INACTIVE', 'Inactive'),
('RETRAINING', 'Retraining'),
('ERROR', 'Error'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
description = models.TextField()
model_type = models.CharField(max_length=30, choices=MODEL_TYPES)
algorithm_type = models.CharField(max_length=20, choices=ALGORITHM_TYPES)
# Model configuration
model_config = models.JSONField(
default=dict,
help_text="Model-specific configuration parameters"
)
feature_columns = models.JSONField(
default=list,
help_text="List of feature columns used by the model"
)
target_column = models.CharField(
max_length=100,
help_text="Target column for prediction"
)
# Training data
training_data_period_days = models.PositiveIntegerField(
default=90,
help_text="Number of days of training data to use"
)
min_training_samples = models.PositiveIntegerField(
default=100,
help_text="Minimum number of samples required for training"
)
# Performance metrics
accuracy_score = models.FloatField(
null=True, blank=True,
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
precision_score = models.FloatField(
null=True, blank=True,
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
recall_score = models.FloatField(
null=True, blank=True,
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
f1_score = models.FloatField(
null=True, blank=True,
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
# Status and metadata
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='TRAINING')
version = models.CharField(max_length=20, default='1.0')
model_file_path = models.CharField(
max_length=500,
blank=True,
null=True,
help_text="Path to the trained model file"
)
# Training metadata
last_trained_at = models.DateTimeField(null=True, blank=True)
training_duration_seconds = models.PositiveIntegerField(null=True, blank=True)
training_samples_count = models.PositiveIntegerField(null=True, blank=True)
# Retraining configuration
auto_retrain_enabled = models.BooleanField(default=True)
retrain_frequency_days = models.PositiveIntegerField(default=7)
performance_threshold = models.FloatField(
default=0.8,
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)],
help_text="Performance threshold below which model should be retrained"
)
# Metadata
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['model_type', 'status']),
models.Index(fields=['algorithm_type']),
models.Index(fields=['status']),
]
def __str__(self):
return f"{self.name} ({self.model_type})"
class AnomalyDetection(models.Model):
"""Anomaly detection results and alerts"""
ANOMALY_TYPES = [
('STATISTICAL', 'Statistical Anomaly'),
('TEMPORAL', 'Temporal Anomaly'),
('PATTERN', 'Pattern Anomaly'),
('THRESHOLD', 'Threshold Breach'),
('BEHAVIORAL', 'Behavioral Anomaly'),
]
SEVERITY_CHOICES = [
('LOW', 'Low'),
('MEDIUM', 'Medium'),
('HIGH', 'High'),
('CRITICAL', 'Critical'),
]
STATUS_CHOICES = [
('DETECTED', 'Detected'),
('INVESTIGATING', 'Investigating'),
('CONFIRMED', 'Confirmed'),
('FALSE_POSITIVE', 'False Positive'),
('RESOLVED', 'Resolved'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
model = models.ForeignKey(PredictiveModel, on_delete=models.CASCADE, related_name='anomaly_detections')
# Anomaly details
anomaly_type = models.CharField(max_length=20, choices=ANOMALY_TYPES)
severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DETECTED')
# Detection details
confidence_score = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
anomaly_score = models.FloatField(
help_text="Raw anomaly score from the model"
)
threshold_used = models.FloatField(
help_text="Threshold used for anomaly detection"
)
# Context
detected_at = models.DateTimeField(auto_now_add=True)
time_window_start = models.DateTimeField()
time_window_end = models.DateTimeField()
# Related data
related_incidents = models.ManyToManyField(
'incident_intelligence.Incident',
blank=True,
related_name='anomaly_detections'
)
affected_services = models.JSONField(
default=list,
help_text="Services affected by this anomaly"
)
affected_metrics = models.JSONField(
default=list,
help_text="Metrics that showed anomalous behavior"
)
# Analysis
description = models.TextField(help_text="Description of the anomaly")
root_cause_analysis = models.TextField(
blank=True,
null=True,
help_text="Root cause analysis of the anomaly"
)
impact_assessment = models.TextField(
blank=True,
null=True,
help_text="Assessment of the anomaly's impact"
)
# Actions taken
actions_taken = models.JSONField(
default=list,
help_text="Actions taken in response to the anomaly"
)
resolved_at = models.DateTimeField(null=True, blank=True)
resolved_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='resolved_anomalies'
)
# Metadata
metadata = models.JSONField(
default=dict,
help_text="Additional metadata for this anomaly"
)
class Meta:
ordering = ['-detected_at']
indexes = [
models.Index(fields=['anomaly_type', 'severity']),
models.Index(fields=['status', 'detected_at']),
models.Index(fields=['confidence_score']),
]
def __str__(self):
return f"Anomaly: {self.anomaly_type} - {self.severity} ({self.detected_at})"
class CostImpactAnalysis(models.Model):
"""Cost impact analysis for incidents and downtime"""
COST_TYPES = [
('DOWNTIME', 'Downtime Cost'),
('LOST_REVENUE', 'Lost Revenue'),
('PENALTY', 'Penalty Cost'),
('RESOURCE_COST', 'Resource Cost'),
('REPUTATION_COST', 'Reputation Cost'),
('COMPLIANCE_COST', 'Compliance Cost'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
# Related incident
incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.CASCADE,
related_name='cost_analyses'
)
# SLA integration
sla_instance = models.ForeignKey(
'sla_oncall.SLAInstance',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='cost_analyses',
help_text="Related SLA instance for cost calculation"
)
# Cost breakdown
cost_type = models.CharField(max_length=20, choices=COST_TYPES)
cost_amount = models.DecimalField(
max_digits=15,
decimal_places=2,
help_text="Cost amount in USD"
)
currency = models.CharField(max_length=3, default='USD')
# Cost calculation details
calculation_method = models.CharField(
max_length=50,
help_text="Method used to calculate the cost"
)
calculation_details = models.JSONField(
default=dict,
help_text="Detailed breakdown of cost calculation"
)
# Impact metrics
downtime_hours = models.DecimalField(
max_digits=10,
decimal_places=2,
null=True,
blank=True,
help_text="Total downtime in hours"
)
affected_users = models.PositiveIntegerField(
null=True,
blank=True,
help_text="Number of users affected"
)
revenue_impact = models.DecimalField(
max_digits=15,
decimal_places=2,
null=True,
blank=True,
help_text="Revenue impact in USD"
)
# Business context
business_unit = models.CharField(
max_length=100,
blank=True,
null=True,
help_text="Business unit affected"
)
service_tier = models.CharField(
max_length=50,
blank=True,
null=True,
help_text="Service tier (e.g., Premium, Standard)"
)
# Validation and approval
is_validated = models.BooleanField(default=False)
validated_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='validated_cost_analyses'
)
validated_at = models.DateTimeField(null=True, blank=True)
validation_notes = models.TextField(blank=True, null=True)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['incident', 'cost_type']),
models.Index(fields=['cost_amount']),
models.Index(fields=['is_validated']),
]
def __str__(self):
return f"Cost Analysis: {self.incident.title} - {self.cost_type} (${self.cost_amount})"
class DashboardConfiguration(models.Model):
"""Dashboard configuration for analytics visualization"""
DASHBOARD_TYPES = [
('EXECUTIVE', 'Executive Dashboard'),
('OPERATIONAL', 'Operational Dashboard'),
('TECHNICAL', 'Technical Dashboard'),
('CUSTOM', 'Custom Dashboard'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
description = models.TextField()
dashboard_type = models.CharField(max_length=20, choices=DASHBOARD_TYPES)
# Dashboard configuration
layout_config = models.JSONField(
default=dict,
help_text="Dashboard layout configuration"
)
widget_configs = models.JSONField(
default=list,
help_text="Configuration for dashboard widgets"
)
# Access control
is_public = models.BooleanField(default=False)
allowed_users = models.ManyToManyField(
User,
blank=True,
related_name='accessible_dashboards'
)
allowed_roles = models.JSONField(
default=list,
help_text="List of roles that can access this dashboard"
)
# Refresh configuration
auto_refresh_enabled = models.BooleanField(default=True)
refresh_interval_seconds = models.PositiveIntegerField(default=300)
# Status and metadata
is_active = models.BooleanField(default=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['dashboard_type', 'is_active']),
models.Index(fields=['is_public']),
]
def __str__(self):
return f"{self.name} ({self.dashboard_type})"
class HeatmapData(models.Model):
"""Heatmap data for visualization"""
HEATMAP_TYPES = [
('INCIDENT_FREQUENCY', 'Incident Frequency'),
('RESOLUTION_TIME', 'Resolution Time'),
('COST_IMPACT', 'Cost Impact'),
('ANOMALY_DENSITY', 'Anomaly Density'),
('SLA_PERFORMANCE', 'SLA Performance'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
heatmap_type = models.CharField(max_length=20, choices=HEATMAP_TYPES)
# Data configuration
time_period_start = models.DateTimeField()
time_period_end = models.DateTimeField()
time_granularity = models.CharField(
max_length=20,
choices=[
('HOUR', 'Hour'),
('DAY', 'Day'),
('WEEK', 'Week'),
('MONTH', 'Month'),
]
)
# Heatmap data
data_points = models.JSONField(
help_text="Heatmap data points with coordinates and values"
)
color_scheme = models.CharField(
max_length=50,
default='viridis',
help_text="Color scheme for the heatmap"
)
# Aggregation settings
aggregation_method = models.CharField(
max_length=20,
choices=[
('SUM', 'Sum'),
('AVERAGE', 'Average'),
('COUNT', 'Count'),
('MAX', 'Maximum'),
('MIN', 'Minimum'),
]
)
# Metadata
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['heatmap_type', 'time_period_start']),
models.Index(fields=['time_granularity']),
]
def __str__(self):
return f"Heatmap: {self.name} ({self.heatmap_type})"
class PredictiveInsight(models.Model):
"""Predictive insights generated by ML models"""
INSIGHT_TYPES = [
('INCIDENT_PREDICTION', 'Incident Prediction'),
('SEVERITY_PREDICTION', 'Severity Prediction'),
('RESOLUTION_TIME_PREDICTION', 'Resolution Time Prediction'),
('COST_PREDICTION', 'Cost Prediction'),
('TREND_ANALYSIS', 'Trend Analysis'),
('PATTERN_DETECTION', 'Pattern Detection'),
]
CONFIDENCE_LEVELS = [
('LOW', 'Low Confidence'),
('MEDIUM', 'Medium Confidence'),
('HIGH', 'High Confidence'),
('VERY_HIGH', 'Very High Confidence'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
model = models.ForeignKey(PredictiveModel, on_delete=models.CASCADE, related_name='insights')
# Security integration
data_classification = models.ForeignKey(
'security.DataClassification',
on_delete=models.SET_NULL,
null=True,
blank=True,
help_text="Data classification level for this insight"
)
# Insight details
insight_type = models.CharField(max_length=30, choices=INSIGHT_TYPES)
title = models.CharField(max_length=200)
description = models.TextField()
confidence_level = models.CharField(max_length=20, choices=CONFIDENCE_LEVELS)
confidence_score = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
# Prediction details
predicted_value = models.JSONField(
help_text="Predicted value or values"
)
prediction_horizon = models.PositiveIntegerField(
help_text="Prediction horizon in hours"
)
prediction_date = models.DateTimeField(
help_text="When the prediction is for"
)
# Context
input_features = models.JSONField(
help_text="Input features used for the prediction"
)
supporting_evidence = models.JSONField(
default=list,
help_text="Supporting evidence for the prediction"
)
# Related data
related_incidents = models.ManyToManyField(
'incident_intelligence.Incident',
blank=True,
related_name='predictive_insights'
)
affected_services = models.JSONField(
default=list,
help_text="Services that may be affected"
)
# Recommendations
recommendations = models.JSONField(
default=list,
help_text="AI-generated recommendations based on the insight"
)
risk_assessment = models.TextField(
blank=True,
null=True,
help_text="Risk assessment based on the prediction"
)
# Status
is_acknowledged = models.BooleanField(default=False)
acknowledged_by = models.ForeignKey(
User,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='acknowledged_insights'
)
acknowledged_at = models.DateTimeField(null=True, blank=True)
# Validation
is_validated = models.BooleanField(default=False)
actual_value = models.JSONField(
null=True,
blank=True,
help_text="Actual value when prediction is validated"
)
validation_accuracy = models.FloatField(
null=True,
blank=True,
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)]
)
# Metadata
generated_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField(
help_text="When this insight expires"
)
class Meta:
ordering = ['-generated_at']
indexes = [
models.Index(fields=['insight_type', 'confidence_score']),
models.Index(fields=['prediction_date']),
models.Index(fields=['is_acknowledged']),
]
def __str__(self):
return f"Insight: {self.title} ({self.insight_type})"
@property
def is_expired(self):
"""Check if this insight has expired"""
return timezone.now() > self.expires_at