685 lines
28 KiB
Python
685 lines
28 KiB
Python
"""
|
|
ML-based predictive models for incident management
|
|
Implements various predictive algorithms for incident prediction, severity prediction, and cost analysis
|
|
"""
|
|
import numpy as np
|
|
import pandas as pd
|
|
from typing import Dict, List, Tuple, Optional, Any, Union
|
|
from datetime import datetime, timedelta
|
|
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, mean_squared_error, r2_score
|
|
import joblib
|
|
import logging
|
|
|
|
from django.utils import timezone
|
|
from django.db.models import Q, Avg, Count, Sum, Max, Min
|
|
from incident_intelligence.models import Incident
|
|
from ..models import PredictiveModel, PredictiveInsight, CostImpactAnalysis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BasePredictiveModel:
|
|
"""Base class for predictive models"""
|
|
|
|
def __init__(self, model_config: Dict[str, Any] = None):
|
|
self.model_config = model_config or {}
|
|
self.scaler = StandardScaler()
|
|
self.label_encoders = {}
|
|
self.is_fitted = False
|
|
self.feature_columns = []
|
|
self.target_column = None
|
|
|
|
def prepare_features(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Prepare features for model training/prediction"""
|
|
raise NotImplementedError
|
|
|
|
def fit(self, X: pd.DataFrame, y: pd.Series) -> Dict[str, float]:
|
|
"""Fit the model and return performance metrics"""
|
|
raise NotImplementedError
|
|
|
|
def predict(self, X: pd.DataFrame) -> np.ndarray:
|
|
"""Make predictions"""
|
|
raise NotImplementedError
|
|
|
|
def get_feature_importance(self) -> Dict[str, float]:
|
|
"""Get feature importance scores"""
|
|
raise NotImplementedError
|
|
|
|
|
|
class IncidentPredictionModel(BasePredictiveModel):
|
|
"""Model for predicting incident occurrence"""
|
|
|
|
def __init__(self, model_config: Dict[str, Any] = None):
|
|
super().__init__(model_config)
|
|
self.model = RandomForestClassifier(
|
|
n_estimators=self.model_config.get('n_estimators', 100),
|
|
max_depth=self.model_config.get('max_depth', 10),
|
|
random_state=42
|
|
)
|
|
|
|
def prepare_features(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Prepare features for incident prediction"""
|
|
features = pd.DataFrame()
|
|
|
|
# Time-based features
|
|
if 'timestamp' in data.columns:
|
|
timestamp = pd.to_datetime(data['timestamp'])
|
|
features['hour_of_day'] = timestamp.dt.hour
|
|
features['day_of_week'] = timestamp.dt.dayofweek
|
|
features['day_of_month'] = timestamp.dt.day
|
|
features['month'] = timestamp.dt.month
|
|
features['is_weekend'] = (timestamp.dt.dayofweek >= 5).astype(int)
|
|
features['is_business_hours'] = ((timestamp.dt.hour >= 9) & (timestamp.dt.hour <= 17)).astype(int)
|
|
|
|
# Historical incident features
|
|
if 'incident_count_1h' in data.columns:
|
|
features['incident_count_1h'] = data['incident_count_1h']
|
|
if 'incident_count_24h' in data.columns:
|
|
features['incident_count_24h'] = data['incident_count_24h']
|
|
if 'avg_severity_24h' in data.columns:
|
|
features['avg_severity_24h'] = data['avg_severity_24h']
|
|
|
|
# System metrics (if available)
|
|
system_metrics = ['cpu_usage', 'memory_usage', 'disk_usage', 'network_usage']
|
|
for metric in system_metrics:
|
|
if metric in data.columns:
|
|
features[metric] = data[metric]
|
|
|
|
# Service-specific features
|
|
if 'service_name' in data.columns:
|
|
# Encode service names
|
|
if 'service_name' not in self.label_encoders:
|
|
self.label_encoders['service_name'] = LabelEncoder()
|
|
features['service_encoded'] = self.label_encoders['service_name'].fit_transform(data['service_name'])
|
|
else:
|
|
features['service_encoded'] = self.label_encoders['service_name'].transform(data['service_name'])
|
|
|
|
return features
|
|
|
|
def fit(self, X: pd.DataFrame, y: pd.Series) -> Dict[str, float]:
|
|
"""Fit the incident prediction model"""
|
|
# Prepare features
|
|
X_processed = self.prepare_features(X)
|
|
self.feature_columns = X_processed.columns.tolist()
|
|
|
|
# Scale features
|
|
X_scaled = self.scaler.fit_transform(X_processed)
|
|
|
|
# Split data for validation
|
|
X_train, X_val, y_train, y_val = train_test_split(
|
|
X_scaled, y, test_size=0.2, random_state=42, stratify=y
|
|
)
|
|
|
|
# Fit model
|
|
self.model.fit(X_train, y_train)
|
|
|
|
# Evaluate model
|
|
y_pred = self.model.predict(X_val)
|
|
y_pred_proba = self.model.predict_proba(X_val)[:, 1]
|
|
|
|
metrics = {
|
|
'accuracy': accuracy_score(y_val, y_pred),
|
|
'precision': precision_score(y_val, y_pred, average='weighted'),
|
|
'recall': recall_score(y_val, y_pred, average='weighted'),
|
|
'f1_score': f1_score(y_val, y_pred, average='weighted')
|
|
}
|
|
|
|
self.is_fitted = True
|
|
return metrics
|
|
|
|
def predict(self, X: pd.DataFrame) -> np.ndarray:
|
|
"""Predict incident probability"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Model must be fitted before prediction")
|
|
|
|
X_processed = self.prepare_features(X)
|
|
X_scaled = self.scaler.transform(X_processed)
|
|
|
|
# Return probability of incident occurrence
|
|
return self.model.predict_proba(X_scaled)[:, 1]
|
|
|
|
def get_feature_importance(self) -> Dict[str, float]:
|
|
"""Get feature importance scores"""
|
|
if not self.is_fitted:
|
|
return {}
|
|
|
|
importance_scores = self.model.feature_importances_
|
|
return dict(zip(self.feature_columns, importance_scores))
|
|
|
|
|
|
class SeverityPredictionModel(BasePredictiveModel):
|
|
"""Model for predicting incident severity"""
|
|
|
|
def __init__(self, model_config: Dict[str, Any] = None):
|
|
super().__init__(model_config)
|
|
self.model = RandomForestClassifier(
|
|
n_estimators=self.model_config.get('n_estimators', 100),
|
|
max_depth=self.model_config.get('max_depth', 10),
|
|
random_state=42
|
|
)
|
|
self.severity_mapping = {
|
|
'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4, 'EMERGENCY': 5
|
|
}
|
|
self.reverse_severity_mapping = {v: k for k, v in self.severity_mapping.items()}
|
|
|
|
def prepare_features(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Prepare features for severity prediction"""
|
|
features = pd.DataFrame()
|
|
|
|
# Text-based features
|
|
if 'title' in data.columns:
|
|
features['title_length'] = data['title'].str.len()
|
|
features['title_word_count'] = data['title'].str.split().str.len()
|
|
|
|
if 'description' in data.columns:
|
|
features['description_length'] = data['description'].str.len()
|
|
features['description_word_count'] = data['description'].str.split().str.len()
|
|
|
|
# Categorical features
|
|
if 'category' in data.columns:
|
|
if 'category' not in self.label_encoders:
|
|
self.label_encoders['category'] = LabelEncoder()
|
|
features['category_encoded'] = self.label_encoders['category'].fit_transform(data['category'])
|
|
else:
|
|
features['category_encoded'] = self.label_encoders['category'].transform(data['category'])
|
|
|
|
if 'subcategory' in data.columns:
|
|
if 'subcategory' not in self.label_encoders:
|
|
self.label_encoders['subcategory'] = LabelEncoder()
|
|
features['subcategory_encoded'] = self.label_encoders['subcategory'].fit_transform(data['subcategory'])
|
|
else:
|
|
features['subcategory_encoded'] = self.label_encoders['subcategory'].transform(data['subcategory'])
|
|
|
|
# Impact features
|
|
if 'affected_users' in data.columns:
|
|
features['affected_users'] = data['affected_users']
|
|
features['affected_users_log'] = np.log1p(data['affected_users'])
|
|
|
|
# Time-based features
|
|
if 'created_at' in data.columns:
|
|
timestamp = pd.to_datetime(data['created_at'])
|
|
features['hour_of_day'] = timestamp.dt.hour
|
|
features['day_of_week'] = timestamp.dt.dayofweek
|
|
features['is_weekend'] = (timestamp.dt.dayofweek >= 5).astype(int)
|
|
features['is_business_hours'] = ((timestamp.dt.hour >= 9) & (timestamp.dt.hour <= 17)).astype(int)
|
|
|
|
# Historical features
|
|
if 'reporter_id' in data.columns:
|
|
# Count of previous incidents by reporter
|
|
features['reporter_incident_count'] = data.groupby('reporter_id')['reporter_id'].transform('count')
|
|
|
|
return features
|
|
|
|
def fit(self, X: pd.DataFrame, y: pd.Series) -> Dict[str, float]:
|
|
"""Fit the severity prediction model"""
|
|
# Prepare features
|
|
X_processed = self.prepare_features(X)
|
|
self.feature_columns = X_processed.columns.tolist()
|
|
|
|
# Encode target variable
|
|
y_encoded = y.map(self.severity_mapping)
|
|
|
|
# Scale features
|
|
X_scaled = self.scaler.fit_transform(X_processed)
|
|
|
|
# Split data for validation
|
|
X_train, X_val, y_train, y_val = train_test_split(
|
|
X_scaled, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded
|
|
)
|
|
|
|
# Fit model
|
|
self.model.fit(X_train, y_train)
|
|
|
|
# Evaluate model
|
|
y_pred = self.model.predict(X_val)
|
|
|
|
metrics = {
|
|
'accuracy': accuracy_score(y_val, y_pred),
|
|
'precision': precision_score(y_val, y_pred, average='weighted'),
|
|
'recall': recall_score(y_val, y_pred, average='weighted'),
|
|
'f1_score': f1_score(y_val, y_pred, average='weighted')
|
|
}
|
|
|
|
self.is_fitted = True
|
|
return metrics
|
|
|
|
def predict(self, X: pd.DataFrame) -> np.ndarray:
|
|
"""Predict incident severity"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Model must be fitted before prediction")
|
|
|
|
X_processed = self.prepare_features(X)
|
|
X_scaled = self.scaler.transform(X_processed)
|
|
|
|
# Get predicted severity levels
|
|
y_pred_encoded = self.model.predict(X_scaled)
|
|
|
|
# Convert back to severity labels
|
|
return np.array([self.reverse_severity_mapping.get(level, 'MEDIUM') for level in y_pred_encoded])
|
|
|
|
def get_feature_importance(self) -> Dict[str, float]:
|
|
"""Get feature importance scores"""
|
|
if not self.is_fitted:
|
|
return {}
|
|
|
|
importance_scores = self.model.feature_importances_
|
|
return dict(zip(self.feature_columns, importance_scores))
|
|
|
|
|
|
class ResolutionTimePredictionModel(BasePredictiveModel):
|
|
"""Model for predicting incident resolution time"""
|
|
|
|
def __init__(self, model_config: Dict[str, Any] = None):
|
|
super().__init__(model_config)
|
|
self.model = RandomForestRegressor(
|
|
n_estimators=self.model_config.get('n_estimators', 100),
|
|
max_depth=self.model_config.get('max_depth', 10),
|
|
random_state=42
|
|
)
|
|
|
|
def prepare_features(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Prepare features for resolution time prediction"""
|
|
features = pd.DataFrame()
|
|
|
|
# Severity features
|
|
if 'severity' in data.columns:
|
|
severity_mapping = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4, 'EMERGENCY': 5}
|
|
features['severity_encoded'] = data['severity'].map(severity_mapping).fillna(2)
|
|
|
|
# Categorical features
|
|
if 'category' in data.columns:
|
|
if 'category' not in self.label_encoders:
|
|
self.label_encoders['category'] = LabelEncoder()
|
|
features['category_encoded'] = self.label_encoders['category'].fit_transform(data['category'])
|
|
else:
|
|
features['category_encoded'] = self.label_encoders['category'].transform(data['category'])
|
|
|
|
# Impact features
|
|
if 'affected_users' in data.columns:
|
|
features['affected_users'] = data['affected_users']
|
|
features['affected_users_log'] = np.log1p(data['affected_users'])
|
|
|
|
# Time-based features
|
|
if 'created_at' in data.columns:
|
|
timestamp = pd.to_datetime(data['created_at'])
|
|
features['hour_of_day'] = timestamp.dt.hour
|
|
features['day_of_week'] = timestamp.dt.dayofweek
|
|
features['is_weekend'] = (timestamp.dt.dayofweek >= 5).astype(int)
|
|
features['is_business_hours'] = ((timestamp.dt.hour >= 9) & (timestamp.dt.hour <= 17)).astype(int)
|
|
|
|
# Historical features
|
|
if 'assigned_to' in data.columns:
|
|
# Average resolution time for assignee
|
|
features['assignee_avg_resolution_time'] = data.groupby('assigned_to')['resolution_time_hours'].transform('mean')
|
|
|
|
# Text features
|
|
if 'title' in data.columns:
|
|
features['title_length'] = data['title'].str.len()
|
|
|
|
if 'description' in data.columns:
|
|
features['description_length'] = data['description'].str.len()
|
|
|
|
return features
|
|
|
|
def fit(self, X: pd.DataFrame, y: pd.Series) -> Dict[str, float]:
|
|
"""Fit the resolution time prediction model"""
|
|
# Prepare features
|
|
X_processed = self.prepare_features(X)
|
|
self.feature_columns = X_processed.columns.tolist()
|
|
|
|
# Scale features
|
|
X_scaled = self.scaler.fit_transform(X_processed)
|
|
|
|
# Split data for validation
|
|
X_train, X_val, y_train, y_val = train_test_split(
|
|
X_scaled, y, test_size=0.2, random_state=42
|
|
)
|
|
|
|
# Fit model
|
|
self.model.fit(X_train, y_train)
|
|
|
|
# Evaluate model
|
|
y_pred = self.model.predict(X_val)
|
|
|
|
metrics = {
|
|
'mse': mean_squared_error(y_val, y_pred),
|
|
'rmse': np.sqrt(mean_squared_error(y_val, y_pred)),
|
|
'r2_score': r2_score(y_val, y_pred)
|
|
}
|
|
|
|
self.is_fitted = True
|
|
return metrics
|
|
|
|
def predict(self, X: pd.DataFrame) -> np.ndarray:
|
|
"""Predict resolution time in hours"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Model must be fitted before prediction")
|
|
|
|
X_processed = self.prepare_features(X)
|
|
X_scaled = self.scaler.transform(X_processed)
|
|
|
|
return self.model.predict(X_scaled)
|
|
|
|
def get_feature_importance(self) -> Dict[str, float]:
|
|
"""Get feature importance scores"""
|
|
if not self.is_fitted:
|
|
return {}
|
|
|
|
importance_scores = self.model.feature_importances_
|
|
return dict(zip(self.feature_columns, importance_scores))
|
|
|
|
|
|
class CostPredictionModel(BasePredictiveModel):
|
|
"""Model for predicting incident cost impact"""
|
|
|
|
def __init__(self, model_config: Dict[str, Any] = None):
|
|
super().__init__(model_config)
|
|
self.model = RandomForestRegressor(
|
|
n_estimators=self.model_config.get('n_estimators', 100),
|
|
max_depth=self.model_config.get('max_depth', 10),
|
|
random_state=42
|
|
)
|
|
|
|
def prepare_features(self, data: pd.DataFrame) -> pd.DataFrame:
|
|
"""Prepare features for cost prediction"""
|
|
features = pd.DataFrame()
|
|
|
|
# Severity features
|
|
if 'severity' in data.columns:
|
|
severity_mapping = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4, 'EMERGENCY': 5}
|
|
features['severity_encoded'] = data['severity'].map(severity_mapping).fillna(2)
|
|
|
|
# Impact features
|
|
if 'affected_users' in data.columns:
|
|
features['affected_users'] = data['affected_users']
|
|
features['affected_users_log'] = np.log1p(data['affected_users'])
|
|
|
|
if 'downtime_hours' in data.columns:
|
|
features['downtime_hours'] = data['downtime_hours']
|
|
features['downtime_hours_log'] = np.log1p(data['downtime_hours'])
|
|
|
|
# Categorical features
|
|
if 'category' in data.columns:
|
|
if 'category' not in self.label_encoders:
|
|
self.label_encoders['category'] = LabelEncoder()
|
|
features['category_encoded'] = self.label_encoders['category'].fit_transform(data['category'])
|
|
else:
|
|
features['category_encoded'] = self.label_encoders['category'].transform(data['category'])
|
|
|
|
# Business context
|
|
if 'business_unit' in data.columns:
|
|
if 'business_unit' not in self.label_encoders:
|
|
self.label_encoders['business_unit'] = LabelEncoder()
|
|
features['business_unit_encoded'] = self.label_encoders['business_unit'].fit_transform(data['business_unit'])
|
|
else:
|
|
features['business_unit_encoded'] = self.label_encoders['business_unit'].transform(data['business_unit'])
|
|
|
|
# Time-based features
|
|
if 'created_at' in data.columns:
|
|
timestamp = pd.to_datetime(data['created_at'])
|
|
features['hour_of_day'] = timestamp.dt.hour
|
|
features['day_of_week'] = timestamp.dt.dayofweek
|
|
features['is_weekend'] = (timestamp.dt.dayofweek >= 5).astype(int)
|
|
features['is_business_hours'] = ((timestamp.dt.hour >= 9) & (timestamp.dt.hour <= 17)).astype(int)
|
|
|
|
return features
|
|
|
|
def fit(self, X: pd.DataFrame, y: pd.Series) -> Dict[str, float]:
|
|
"""Fit the cost prediction model"""
|
|
# Prepare features
|
|
X_processed = self.prepare_features(X)
|
|
self.feature_columns = X_processed.columns.tolist()
|
|
|
|
# Scale features
|
|
X_scaled = self.scaler.fit_transform(X_processed)
|
|
|
|
# Split data for validation
|
|
X_train, X_val, y_train, y_val = train_test_split(
|
|
X_scaled, y, test_size=0.2, random_state=42
|
|
)
|
|
|
|
# Fit model
|
|
self.model.fit(X_train, y_train)
|
|
|
|
# Evaluate model
|
|
y_pred = self.model.predict(X_val)
|
|
|
|
metrics = {
|
|
'mse': mean_squared_error(y_val, y_pred),
|
|
'rmse': np.sqrt(mean_squared_error(y_val, y_pred)),
|
|
'r2_score': r2_score(y_val, y_pred)
|
|
}
|
|
|
|
self.is_fitted = True
|
|
return metrics
|
|
|
|
def predict(self, X: pd.DataFrame) -> np.ndarray:
|
|
"""Predict cost impact in USD"""
|
|
if not self.is_fitted:
|
|
raise ValueError("Model must be fitted before prediction")
|
|
|
|
X_processed = self.prepare_features(X)
|
|
X_scaled = self.scaler.transform(X_processed)
|
|
|
|
return self.model.predict(X_scaled)
|
|
|
|
def get_feature_importance(self) -> Dict[str, float]:
|
|
"""Get feature importance scores"""
|
|
if not self.is_fitted:
|
|
return {}
|
|
|
|
importance_scores = self.model.feature_importances_
|
|
return dict(zip(self.feature_columns, importance_scores))
|
|
|
|
|
|
class PredictiveModelFactory:
|
|
"""Factory for creating predictive models"""
|
|
|
|
@staticmethod
|
|
def create_model(model_type: str, model_config: Dict[str, Any] = None) -> BasePredictiveModel:
|
|
"""Create a predictive model instance"""
|
|
models = {
|
|
'INCIDENT_PREDICTION': IncidentPredictionModel,
|
|
'SEVERITY_PREDICTION': SeverityPredictionModel,
|
|
'RESOLUTION_TIME_PREDICTION': ResolutionTimePredictionModel,
|
|
'COST_PREDICTION': CostPredictionModel
|
|
}
|
|
|
|
if model_type not in models:
|
|
raise ValueError(f"Unknown model type: {model_type}")
|
|
|
|
return models[model_type](model_config)
|
|
|
|
|
|
class PredictiveModelService:
|
|
"""Service for managing predictive models"""
|
|
|
|
def __init__(self):
|
|
self.factory = PredictiveModelFactory()
|
|
|
|
def prepare_training_data(self, model_type: str, days_back: int = 90) -> Tuple[pd.DataFrame, pd.Series]:
|
|
"""Prepare training data for the specified model type"""
|
|
end_date = timezone.now()
|
|
start_date = end_date - timedelta(days=days_back)
|
|
|
|
# Get incidents from the time period
|
|
incidents = Incident.objects.filter(
|
|
created_at__gte=start_date,
|
|
created_at__lte=end_date
|
|
).values(
|
|
'id', 'title', 'description', 'severity', 'category', 'subcategory',
|
|
'affected_users', 'estimated_downtime', 'created_at', 'resolved_at',
|
|
'assigned_to', 'reporter', 'status'
|
|
)
|
|
|
|
if not incidents:
|
|
return pd.DataFrame(), pd.Series()
|
|
|
|
df = pd.DataFrame(list(incidents))
|
|
|
|
# Prepare target variable based on model type
|
|
if model_type == 'INCIDENT_PREDICTION':
|
|
# For incident prediction, we need to create time series data
|
|
# This is a simplified version - in practice, you'd need more sophisticated time series preparation
|
|
y = pd.Series([1] * len(df)) # Placeholder
|
|
elif model_type == 'SEVERITY_PREDICTION':
|
|
y = df['severity']
|
|
elif model_type == 'RESOLUTION_TIME_PREDICTION':
|
|
# Calculate resolution time in hours
|
|
df['resolved_at'] = pd.to_datetime(df['resolved_at'])
|
|
df['created_at'] = pd.to_datetime(df['created_at'])
|
|
df['resolution_time_hours'] = (df['resolved_at'] - df['created_at']).dt.total_seconds() / 3600
|
|
y = df['resolution_time_hours'].fillna(df['resolution_time_hours'].median())
|
|
elif model_type == 'COST_PREDICTION':
|
|
# Get cost data
|
|
cost_analyses = CostImpactAnalysis.objects.filter(
|
|
incident_id__in=df['id']
|
|
).values('incident_id', 'cost_amount')
|
|
|
|
cost_df = pd.DataFrame(list(cost_analyses))
|
|
if not cost_df.empty:
|
|
df = df.merge(cost_df, left_on='id', right_on='incident_id', how='left')
|
|
y = df['cost_amount'].fillna(df['cost_amount'].median())
|
|
else:
|
|
y = pd.Series([0] * len(df))
|
|
else:
|
|
raise ValueError(f"Unknown model type: {model_type}")
|
|
|
|
return df, y
|
|
|
|
def train_model(self, model_id: str) -> Dict[str, Any]:
|
|
"""Train a predictive model"""
|
|
try:
|
|
model = PredictiveModel.objects.get(id=model_id)
|
|
|
|
# Prepare training data
|
|
X, y = self.prepare_training_data(model.model_type, model.training_data_period_days)
|
|
|
|
if X.empty or len(y) < model.min_training_samples:
|
|
return {
|
|
'success': False,
|
|
'error': f'Insufficient training data. Need at least {model.min_training_samples} samples, got {len(y)}'
|
|
}
|
|
|
|
# Create model instance
|
|
ml_model = self.factory.create_model(model.model_type, model.model_config)
|
|
|
|
# Train the model
|
|
start_time = timezone.now()
|
|
metrics = ml_model.fit(X, y)
|
|
end_time = timezone.now()
|
|
|
|
# Update model with performance metrics
|
|
model.accuracy_score = metrics.get('accuracy', metrics.get('r2_score'))
|
|
model.precision_score = metrics.get('precision')
|
|
model.recall_score = metrics.get('recall')
|
|
model.f1_score = metrics.get('f1_score')
|
|
model.status = 'ACTIVE'
|
|
model.last_trained_at = end_time
|
|
model.training_duration_seconds = (end_time - start_time).total_seconds()
|
|
model.training_samples_count = len(y)
|
|
model.feature_columns = ml_model.feature_columns
|
|
|
|
# Save model (in a real implementation, you'd save the actual model file)
|
|
model.model_file_path = f"models/{model.id}_{model.version}.joblib"
|
|
|
|
model.save()
|
|
|
|
return {
|
|
'success': True,
|
|
'metrics': metrics,
|
|
'training_samples': len(y),
|
|
'training_duration': model.training_duration_seconds
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error training model {model_id}: {str(e)}")
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def generate_predictions(self, model_id: str, prediction_horizon_hours: int = 24) -> List[Dict[str, Any]]:
|
|
"""Generate predictions using a trained model"""
|
|
try:
|
|
model = PredictiveModel.objects.get(id=model_id, status='ACTIVE')
|
|
|
|
# Create model instance
|
|
ml_model = self.factory.create_model(model.model_type, model.model_config)
|
|
|
|
# Load model (in a real implementation, you'd load from the saved file)
|
|
# For now, we'll create a mock prediction
|
|
|
|
# Prepare prediction data
|
|
X, _ = self.prepare_training_data(model.model_type, 7) # Last 7 days
|
|
|
|
if X.empty:
|
|
return []
|
|
|
|
# Make predictions
|
|
predictions = ml_model.predict(X.tail(10)) # Predict for last 10 incidents
|
|
|
|
# Create insight objects
|
|
insights = []
|
|
for i, prediction in enumerate(predictions):
|
|
insight_data = {
|
|
'model': model,
|
|
'insight_type': model.model_type,
|
|
'title': f"Prediction for {model.model_type.replace('_', ' ').title()}",
|
|
'description': f"Model predicts {prediction} for upcoming incidents",
|
|
'confidence_level': 'MEDIUM', # Could be calculated based on model confidence
|
|
'confidence_score': 0.7, # Placeholder
|
|
'predicted_value': {'value': float(prediction)},
|
|
'prediction_horizon': prediction_horizon_hours,
|
|
'prediction_date': timezone.now() + timedelta(hours=prediction_horizon_hours),
|
|
'input_features': X.iloc[i].to_dict(),
|
|
'supporting_evidence': [],
|
|
'affected_services': [X.iloc[i].get('category', 'Unknown')],
|
|
'recommendations': self._generate_recommendations(model.model_type, prediction),
|
|
'expires_at': timezone.now() + timedelta(hours=prediction_horizon_hours * 2)
|
|
}
|
|
insights.append(insight_data)
|
|
|
|
return insights
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating predictions for model {model_id}: {str(e)}")
|
|
return []
|
|
|
|
def _generate_recommendations(self, model_type: str, prediction: Any) -> List[str]:
|
|
"""Generate recommendations based on prediction"""
|
|
recommendations = []
|
|
|
|
if model_type == 'INCIDENT_PREDICTION':
|
|
if prediction > 0.7:
|
|
recommendations.append("High probability of incident occurrence - consider proactive monitoring")
|
|
recommendations.append("Ensure on-call team is ready for potential incidents")
|
|
elif prediction > 0.4:
|
|
recommendations.append("Moderate probability of incident - monitor system metrics closely")
|
|
|
|
elif model_type == 'SEVERITY_PREDICTION':
|
|
if prediction in ['CRITICAL', 'EMERGENCY']:
|
|
recommendations.append("High severity incident predicted - prepare escalation procedures")
|
|
recommendations.append("Ensure senior staff are available for response")
|
|
elif prediction == 'HIGH':
|
|
recommendations.append("High severity incident predicted - review response procedures")
|
|
|
|
elif model_type == 'RESOLUTION_TIME_PREDICTION':
|
|
if prediction > 24:
|
|
recommendations.append("Long resolution time predicted - consider additional resources")
|
|
recommendations.append("Review escalation procedures for complex incidents")
|
|
elif prediction > 8:
|
|
recommendations.append("Extended resolution time predicted - prepare for extended response")
|
|
|
|
elif model_type == 'COST_PREDICTION':
|
|
if prediction > 10000:
|
|
recommendations.append("High cost impact predicted - prepare cost mitigation strategies")
|
|
recommendations.append("Consider business continuity measures")
|
|
elif prediction > 5000:
|
|
recommendations.append("Significant cost impact predicted - review cost control measures")
|
|
|
|
return recommendations
|