Files
ETB/ETB-API/monitoring/services/metrics_collector.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

365 lines
14 KiB
Python

"""
Metrics collection service for system monitoring
"""
import time
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from django.utils import timezone
from django.db import connection
from django.core.cache import cache
from django.conf import settings
from django.contrib.auth import get_user_model
from monitoring.models import SystemMetric, MetricMeasurement
User = get_user_model()
logger = logging.getLogger(__name__)
class MetricsCollector:
"""Service for collecting and storing system metrics"""
def __init__(self):
self.collected_metrics = {}
def collect_all_metrics(self) -> Dict[str, Any]:
"""Collect all configured metrics"""
results = {}
# Get all active metrics
active_metrics = SystemMetric.objects.filter(is_active=True)
for metric in active_metrics:
try:
value = self._collect_metric_value(metric)
if value is not None:
# Store measurement
measurement = MetricMeasurement.objects.create(
metric=metric,
value=value,
tags=self._get_metric_tags(metric),
metadata=self._get_metric_metadata(metric)
)
results[metric.name] = {
'value': value,
'measurement_id': measurement.id,
'timestamp': measurement.timestamp
}
except Exception as e:
logger.error(f"Failed to collect metric {metric.name}: {e}")
results[metric.name] = {
'error': str(e)
}
return results
def _collect_metric_value(self, metric: SystemMetric) -> Optional[float]:
"""Collect value for a specific metric"""
category = metric.category
if category == 'API_RESPONSE_TIME':
return self._collect_api_response_time(metric)
elif category == 'THROUGHPUT':
return self._collect_throughput(metric)
elif category == 'ERROR_RATE':
return self._collect_error_rate(metric)
elif category == 'AVAILABILITY':
return self._collect_availability(metric)
elif category == 'INCIDENT_COUNT':
return self._collect_incident_count(metric)
elif category == 'MTTR':
return self._collect_mttr(metric)
elif category == 'MTTA':
return self._collect_mtta(metric)
elif category == 'SLA_COMPLIANCE':
return self._collect_sla_compliance(metric)
elif category == 'SECURITY_EVENTS':
return self._collect_security_events(metric)
elif category == 'AUTOMATION_SUCCESS':
return self._collect_automation_success(metric)
elif category == 'AI_ACCURACY':
return self._collect_ai_accuracy(metric)
elif category == 'COST_IMPACT':
return self._collect_cost_impact(metric)
elif category == 'USER_ACTIVITY':
return self._collect_user_activity(metric)
elif category == 'SYSTEM_RESOURCES':
return self._collect_system_resources(metric)
else:
logger.warning(f"Unknown metric category: {category}")
return None
def _collect_api_response_time(self, metric: SystemMetric) -> Optional[float]:
"""Collect API response time metrics"""
# This would typically come from middleware or APM tools
# For now, return a mock value
return 150.5 # milliseconds
def _collect_throughput(self, metric: SystemMetric) -> Optional[float]:
"""Collect throughput metrics (requests per minute)"""
# Count requests in the last minute
# This would typically come from access logs or middleware
return 120.0 # requests per minute
def _collect_error_rate(self, metric: SystemMetric) -> Optional[float]:
"""Collect error rate metrics"""
# Count errors in the last hour
# This would typically come from logs or error tracking
return 0.02 # 2% error rate
def _collect_availability(self, metric: SystemMetric) -> Optional[float]:
"""Collect availability metrics"""
# Calculate availability percentage
# This would typically come from uptime monitoring
return 99.9 # 99.9% availability
def _collect_incident_count(self, metric: SystemMetric) -> Optional[float]:
"""Collect incident count metrics"""
from incident_intelligence.models import Incident
# Count incidents in the last 24 hours
since = timezone.now() - timedelta(hours=24)
count = Incident.objects.filter(created_at__gte=since).count()
return float(count)
def _collect_mttr(self, metric: SystemMetric) -> Optional[float]:
"""Collect Mean Time to Resolve metrics"""
from incident_intelligence.models import Incident
# Calculate MTTR for resolved incidents in the last 7 days
since = timezone.now() - timedelta(days=7)
resolved_incidents = Incident.objects.filter(
status__in=['RESOLVED', 'CLOSED'],
resolved_at__isnull=False,
resolved_at__gte=since
)
if not resolved_incidents.exists():
return None
total_resolution_time = 0
count = 0
for incident in resolved_incidents:
if incident.resolved_at and incident.created_at:
resolution_time = incident.resolved_at - incident.created_at
total_resolution_time += resolution_time.total_seconds()
count += 1
if count > 0:
return total_resolution_time / count / 60 # Convert to minutes
return None
def _collect_mtta(self, metric: SystemMetric) -> Optional[float]:
"""Collect Mean Time to Acknowledge metrics"""
# This would require tracking when incidents are first acknowledged
# For now, return a mock value
return 15.5 # minutes
def _collect_sla_compliance(self, metric: SystemMetric) -> Optional[float]:
"""Collect SLA compliance metrics"""
from sla_oncall.models import SLAInstance
# Calculate SLA compliance percentage
total_slas = SLAInstance.objects.count()
if total_slas == 0:
return None
# This would require more complex SLA compliance calculation
# For now, return a mock value
return 95.5 # 95.5% SLA compliance
def _collect_security_events(self, metric: SystemMetric) -> Optional[float]:
"""Collect security events metrics"""
# Count security events in the last hour
# This would come from security logs or audit trails
return 3.0 # 3 security events in the last hour
def _collect_automation_success(self, metric: SystemMetric) -> Optional[float]:
"""Collect automation success rate metrics"""
from automation_orchestration.models import RunbookExecution
# Calculate success rate for runbook executions in the last 24 hours
since = timezone.now() - timedelta(hours=24)
executions = RunbookExecution.objects.filter(created_at__gte=since)
if not executions.exists():
return None
successful = executions.filter(status='COMPLETED').count()
total = executions.count()
return (successful / total * 100) if total > 0 else None
def _collect_ai_accuracy(self, metric: SystemMetric) -> Optional[float]:
"""Collect AI model accuracy metrics"""
from incident_intelligence.models import IncidentClassification
# Calculate accuracy for AI classifications
classifications = IncidentClassification.objects.all()
if not classifications.exists():
return None
# This would require comparing predictions with actual outcomes
# For now, return average confidence score
total_confidence = sum(c.confidence_score for c in classifications)
return (total_confidence / classifications.count() * 100) if classifications.count() > 0 else None
def _collect_cost_impact(self, metric: SystemMetric) -> Optional[float]:
"""Collect cost impact metrics"""
from analytics_predictive_insights.models import CostImpactAnalysis
# Calculate total cost impact for the last 30 days
since = timezone.now() - timedelta(days=30)
cost_analyses = CostImpactAnalysis.objects.filter(created_at__gte=since)
total_cost = sum(float(ca.cost_amount) for ca in cost_analyses)
return total_cost
def _collect_user_activity(self, metric: SystemMetric) -> Optional[float]:
"""Collect user activity metrics"""
# Count active users in the last hour
since = timezone.now() - timedelta(hours=1)
# This would require user activity tracking
return 25.0 # 25 active users in the last hour
def _collect_system_resources(self, metric: SystemMetric) -> Optional[float]:
"""Collect system resource metrics"""
import psutil
# Get CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
return cpu_percent
def _get_metric_tags(self, metric: SystemMetric) -> Dict[str, str]:
"""Get tags for a metric measurement"""
tags = {
'metric_type': metric.metric_type,
'category': metric.category,
}
if metric.related_module:
tags['module'] = metric.related_module
return tags
def _get_metric_metadata(self, metric: SystemMetric) -> Dict[str, Any]:
"""Get metadata for a metric measurement"""
return {
'unit': metric.unit,
'aggregation_method': metric.aggregation_method,
'collection_interval': metric.collection_interval_seconds,
}
class MetricsAggregator:
"""Service for aggregating metrics over time periods"""
def __init__(self):
self.collector = MetricsCollector()
def aggregate_metrics(self, metric: SystemMetric, start_time: datetime, end_time: datetime) -> Dict[str, Any]:
"""Aggregate metrics over a time period"""
measurements = MetricMeasurement.objects.filter(
metric=metric,
timestamp__gte=start_time,
timestamp__lte=end_time
).order_by('timestamp')
if not measurements.exists():
return {
'count': 0,
'values': [],
'aggregated_value': None
}
values = [float(m.value) for m in measurements]
aggregated_value = self._aggregate_values(values, metric.aggregation_method)
return {
'count': len(values),
'values': values,
'aggregated_value': aggregated_value,
'start_time': start_time,
'end_time': end_time,
'unit': metric.unit
}
def _aggregate_values(self, values: List[float], method: str) -> Optional[float]:
"""Aggregate a list of values using the specified method"""
if not values:
return None
if method == 'AVERAGE':
return sum(values) / len(values)
elif method == 'SUM':
return sum(values)
elif method == 'COUNT':
return len(values)
elif method == 'MIN':
return min(values)
elif method == 'MAX':
return max(values)
elif method == 'PERCENTILE_95':
return self._calculate_percentile(values, 95)
elif method == 'PERCENTILE_99':
return self._calculate_percentile(values, 99)
else:
return sum(values) / len(values) # Default to average
def _calculate_percentile(self, values: List[float], percentile: int) -> float:
"""Calculate percentile of values"""
sorted_values = sorted(values)
index = int((percentile / 100) * len(sorted_values))
return sorted_values[min(index, len(sorted_values) - 1)]
def get_metric_trends(self, metric: SystemMetric, days: int = 7) -> Dict[str, Any]:
"""Get metric trends over a period"""
end_time = timezone.now()
start_time = end_time - timedelta(days=days)
# Get daily aggregations
daily_data = []
for i in range(days):
day_start = start_time + timedelta(days=i)
day_end = day_start + timedelta(days=1)
day_aggregation = self.aggregate_metrics(metric, day_start, day_end)
daily_data.append({
'date': day_start.date(),
'value': day_aggregation['aggregated_value'],
'count': day_aggregation['count']
})
return {
'metric_name': metric.name,
'period_days': days,
'daily_data': daily_data,
'trend': self._calculate_trend([d['value'] for d in daily_data if d['value'] is not None])
}
def _calculate_trend(self, values: List[float]) -> str:
"""Calculate trend direction from values"""
if len(values) < 2:
return 'STABLE'
# Simple linear trend calculation
first_half = values[:len(values)//2]
second_half = values[len(values)//2:]
first_avg = sum(first_half) / len(first_half)
second_avg = sum(second_half) / len(second_half)
change_percent = ((second_avg - first_avg) / first_avg) * 100 if first_avg != 0 else 0
if change_percent > 5:
return 'INCREASING'
elif change_percent < -5:
return 'DECREASING'
else:
return 'STABLE'