365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""
|
|
Metrics collection service for system monitoring
|
|
"""
|
|
import time
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, timedelta
|
|
from django.utils import timezone
|
|
from django.db import connection
|
|
from django.core.cache import cache
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
|
|
from monitoring.models import SystemMetric, MetricMeasurement
|
|
|
|
User = get_user_model()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Service for collecting and storing system metrics"""
|
|
|
|
def __init__(self):
|
|
self.collected_metrics = {}
|
|
|
|
def collect_all_metrics(self) -> Dict[str, Any]:
|
|
"""Collect all configured metrics"""
|
|
results = {}
|
|
|
|
# Get all active metrics
|
|
active_metrics = SystemMetric.objects.filter(is_active=True)
|
|
|
|
for metric in active_metrics:
|
|
try:
|
|
value = self._collect_metric_value(metric)
|
|
if value is not None:
|
|
# Store measurement
|
|
measurement = MetricMeasurement.objects.create(
|
|
metric=metric,
|
|
value=value,
|
|
tags=self._get_metric_tags(metric),
|
|
metadata=self._get_metric_metadata(metric)
|
|
)
|
|
|
|
results[metric.name] = {
|
|
'value': value,
|
|
'measurement_id': measurement.id,
|
|
'timestamp': measurement.timestamp
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to collect metric {metric.name}: {e}")
|
|
results[metric.name] = {
|
|
'error': str(e)
|
|
}
|
|
|
|
return results
|
|
|
|
def _collect_metric_value(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect value for a specific metric"""
|
|
category = metric.category
|
|
|
|
if category == 'API_RESPONSE_TIME':
|
|
return self._collect_api_response_time(metric)
|
|
elif category == 'THROUGHPUT':
|
|
return self._collect_throughput(metric)
|
|
elif category == 'ERROR_RATE':
|
|
return self._collect_error_rate(metric)
|
|
elif category == 'AVAILABILITY':
|
|
return self._collect_availability(metric)
|
|
elif category == 'INCIDENT_COUNT':
|
|
return self._collect_incident_count(metric)
|
|
elif category == 'MTTR':
|
|
return self._collect_mttr(metric)
|
|
elif category == 'MTTA':
|
|
return self._collect_mtta(metric)
|
|
elif category == 'SLA_COMPLIANCE':
|
|
return self._collect_sla_compliance(metric)
|
|
elif category == 'SECURITY_EVENTS':
|
|
return self._collect_security_events(metric)
|
|
elif category == 'AUTOMATION_SUCCESS':
|
|
return self._collect_automation_success(metric)
|
|
elif category == 'AI_ACCURACY':
|
|
return self._collect_ai_accuracy(metric)
|
|
elif category == 'COST_IMPACT':
|
|
return self._collect_cost_impact(metric)
|
|
elif category == 'USER_ACTIVITY':
|
|
return self._collect_user_activity(metric)
|
|
elif category == 'SYSTEM_RESOURCES':
|
|
return self._collect_system_resources(metric)
|
|
else:
|
|
logger.warning(f"Unknown metric category: {category}")
|
|
return None
|
|
|
|
def _collect_api_response_time(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect API response time metrics"""
|
|
# This would typically come from middleware or APM tools
|
|
# For now, return a mock value
|
|
return 150.5 # milliseconds
|
|
|
|
def _collect_throughput(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect throughput metrics (requests per minute)"""
|
|
# Count requests in the last minute
|
|
# This would typically come from access logs or middleware
|
|
return 120.0 # requests per minute
|
|
|
|
def _collect_error_rate(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect error rate metrics"""
|
|
# Count errors in the last hour
|
|
# This would typically come from logs or error tracking
|
|
return 0.02 # 2% error rate
|
|
|
|
def _collect_availability(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect availability metrics"""
|
|
# Calculate availability percentage
|
|
# This would typically come from uptime monitoring
|
|
return 99.9 # 99.9% availability
|
|
|
|
def _collect_incident_count(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect incident count metrics"""
|
|
from incident_intelligence.models import Incident
|
|
|
|
# Count incidents in the last 24 hours
|
|
since = timezone.now() - timedelta(hours=24)
|
|
count = Incident.objects.filter(created_at__gte=since).count()
|
|
return float(count)
|
|
|
|
def _collect_mttr(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect Mean Time to Resolve metrics"""
|
|
from incident_intelligence.models import Incident
|
|
|
|
# Calculate MTTR for resolved incidents in the last 7 days
|
|
since = timezone.now() - timedelta(days=7)
|
|
resolved_incidents = Incident.objects.filter(
|
|
status__in=['RESOLVED', 'CLOSED'],
|
|
resolved_at__isnull=False,
|
|
resolved_at__gte=since
|
|
)
|
|
|
|
if not resolved_incidents.exists():
|
|
return None
|
|
|
|
total_resolution_time = 0
|
|
count = 0
|
|
|
|
for incident in resolved_incidents:
|
|
if incident.resolved_at and incident.created_at:
|
|
resolution_time = incident.resolved_at - incident.created_at
|
|
total_resolution_time += resolution_time.total_seconds()
|
|
count += 1
|
|
|
|
if count > 0:
|
|
return total_resolution_time / count / 60 # Convert to minutes
|
|
return None
|
|
|
|
def _collect_mtta(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect Mean Time to Acknowledge metrics"""
|
|
# This would require tracking when incidents are first acknowledged
|
|
# For now, return a mock value
|
|
return 15.5 # minutes
|
|
|
|
def _collect_sla_compliance(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect SLA compliance metrics"""
|
|
from sla_oncall.models import SLAInstance
|
|
|
|
# Calculate SLA compliance percentage
|
|
total_slas = SLAInstance.objects.count()
|
|
if total_slas == 0:
|
|
return None
|
|
|
|
# This would require more complex SLA compliance calculation
|
|
# For now, return a mock value
|
|
return 95.5 # 95.5% SLA compliance
|
|
|
|
def _collect_security_events(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect security events metrics"""
|
|
# Count security events in the last hour
|
|
# This would come from security logs or audit trails
|
|
return 3.0 # 3 security events in the last hour
|
|
|
|
def _collect_automation_success(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect automation success rate metrics"""
|
|
from automation_orchestration.models import RunbookExecution
|
|
|
|
# Calculate success rate for runbook executions in the last 24 hours
|
|
since = timezone.now() - timedelta(hours=24)
|
|
executions = RunbookExecution.objects.filter(created_at__gte=since)
|
|
|
|
if not executions.exists():
|
|
return None
|
|
|
|
successful = executions.filter(status='COMPLETED').count()
|
|
total = executions.count()
|
|
|
|
return (successful / total * 100) if total > 0 else None
|
|
|
|
def _collect_ai_accuracy(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect AI model accuracy metrics"""
|
|
from incident_intelligence.models import IncidentClassification
|
|
|
|
# Calculate accuracy for AI classifications
|
|
classifications = IncidentClassification.objects.all()
|
|
|
|
if not classifications.exists():
|
|
return None
|
|
|
|
# This would require comparing predictions with actual outcomes
|
|
# For now, return average confidence score
|
|
total_confidence = sum(c.confidence_score for c in classifications)
|
|
return (total_confidence / classifications.count() * 100) if classifications.count() > 0 else None
|
|
|
|
def _collect_cost_impact(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect cost impact metrics"""
|
|
from analytics_predictive_insights.models import CostImpactAnalysis
|
|
|
|
# Calculate total cost impact for the last 30 days
|
|
since = timezone.now() - timedelta(days=30)
|
|
cost_analyses = CostImpactAnalysis.objects.filter(created_at__gte=since)
|
|
|
|
total_cost = sum(float(ca.cost_amount) for ca in cost_analyses)
|
|
return total_cost
|
|
|
|
def _collect_user_activity(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect user activity metrics"""
|
|
# Count active users in the last hour
|
|
since = timezone.now() - timedelta(hours=1)
|
|
# This would require user activity tracking
|
|
return 25.0 # 25 active users in the last hour
|
|
|
|
def _collect_system_resources(self, metric: SystemMetric) -> Optional[float]:
|
|
"""Collect system resource metrics"""
|
|
import psutil
|
|
|
|
# Get CPU usage
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
return cpu_percent
|
|
|
|
def _get_metric_tags(self, metric: SystemMetric) -> Dict[str, str]:
|
|
"""Get tags for a metric measurement"""
|
|
tags = {
|
|
'metric_type': metric.metric_type,
|
|
'category': metric.category,
|
|
}
|
|
|
|
if metric.related_module:
|
|
tags['module'] = metric.related_module
|
|
|
|
return tags
|
|
|
|
def _get_metric_metadata(self, metric: SystemMetric) -> Dict[str, Any]:
|
|
"""Get metadata for a metric measurement"""
|
|
return {
|
|
'unit': metric.unit,
|
|
'aggregation_method': metric.aggregation_method,
|
|
'collection_interval': metric.collection_interval_seconds,
|
|
}
|
|
|
|
|
|
class MetricsAggregator:
|
|
"""Service for aggregating metrics over time periods"""
|
|
|
|
def __init__(self):
|
|
self.collector = MetricsCollector()
|
|
|
|
def aggregate_metrics(self, metric: SystemMetric, start_time: datetime, end_time: datetime) -> Dict[str, Any]:
|
|
"""Aggregate metrics over a time period"""
|
|
measurements = MetricMeasurement.objects.filter(
|
|
metric=metric,
|
|
timestamp__gte=start_time,
|
|
timestamp__lte=end_time
|
|
).order_by('timestamp')
|
|
|
|
if not measurements.exists():
|
|
return {
|
|
'count': 0,
|
|
'values': [],
|
|
'aggregated_value': None
|
|
}
|
|
|
|
values = [float(m.value) for m in measurements]
|
|
aggregated_value = self._aggregate_values(values, metric.aggregation_method)
|
|
|
|
return {
|
|
'count': len(values),
|
|
'values': values,
|
|
'aggregated_value': aggregated_value,
|
|
'start_time': start_time,
|
|
'end_time': end_time,
|
|
'unit': metric.unit
|
|
}
|
|
|
|
def _aggregate_values(self, values: List[float], method: str) -> Optional[float]:
|
|
"""Aggregate a list of values using the specified method"""
|
|
if not values:
|
|
return None
|
|
|
|
if method == 'AVERAGE':
|
|
return sum(values) / len(values)
|
|
elif method == 'SUM':
|
|
return sum(values)
|
|
elif method == 'COUNT':
|
|
return len(values)
|
|
elif method == 'MIN':
|
|
return min(values)
|
|
elif method == 'MAX':
|
|
return max(values)
|
|
elif method == 'PERCENTILE_95':
|
|
return self._calculate_percentile(values, 95)
|
|
elif method == 'PERCENTILE_99':
|
|
return self._calculate_percentile(values, 99)
|
|
else:
|
|
return sum(values) / len(values) # Default to average
|
|
|
|
def _calculate_percentile(self, values: List[float], percentile: int) -> float:
|
|
"""Calculate percentile of values"""
|
|
sorted_values = sorted(values)
|
|
index = int((percentile / 100) * len(sorted_values))
|
|
return sorted_values[min(index, len(sorted_values) - 1)]
|
|
|
|
def get_metric_trends(self, metric: SystemMetric, days: int = 7) -> Dict[str, Any]:
|
|
"""Get metric trends over a period"""
|
|
end_time = timezone.now()
|
|
start_time = end_time - timedelta(days=days)
|
|
|
|
# Get daily aggregations
|
|
daily_data = []
|
|
for i in range(days):
|
|
day_start = start_time + timedelta(days=i)
|
|
day_end = day_start + timedelta(days=1)
|
|
|
|
day_aggregation = self.aggregate_metrics(metric, day_start, day_end)
|
|
daily_data.append({
|
|
'date': day_start.date(),
|
|
'value': day_aggregation['aggregated_value'],
|
|
'count': day_aggregation['count']
|
|
})
|
|
|
|
return {
|
|
'metric_name': metric.name,
|
|
'period_days': days,
|
|
'daily_data': daily_data,
|
|
'trend': self._calculate_trend([d['value'] for d in daily_data if d['value'] is not None])
|
|
}
|
|
|
|
def _calculate_trend(self, values: List[float]) -> str:
|
|
"""Calculate trend direction from values"""
|
|
if len(values) < 2:
|
|
return 'STABLE'
|
|
|
|
# Simple linear trend calculation
|
|
first_half = values[:len(values)//2]
|
|
second_half = values[len(values)//2:]
|
|
|
|
first_avg = sum(first_half) / len(first_half)
|
|
second_avg = sum(second_half) / len(second_half)
|
|
|
|
change_percent = ((second_avg - first_avg) / first_avg) * 100 if first_avg != 0 else 0
|
|
|
|
if change_percent > 5:
|
|
return 'INCREASING'
|
|
elif change_percent < -5:
|
|
return 'DECREASING'
|
|
else:
|
|
return 'STABLE'
|