Files
ETB/ETB-API/monitoring/enterprise_monitoring.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

796 lines
28 KiB
Python

"""
Enterprise Monitoring System for ETB-API
Advanced monitoring with metrics, alerting, and observability
"""
import logging
import time
import psutil
import json
import os
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union
from django.http import HttpRequest, JsonResponse
from django.conf import settings
from django.utils import timezone
from django.core.cache import cache
from django.db import connection
from django.core.management import call_command
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from django.core.management.base import BaseCommand
import requests
import redis
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from prometheus_client.core import CollectorRegistry
import threading
import queue
logger = logging.getLogger(__name__)
class MetricsCollector:
"""Enterprise metrics collection system"""
def __init__(self):
self.registry = CollectorRegistry()
self.metrics = self._initialize_metrics()
self.collection_interval = 60 # seconds
self.is_running = False
self.collection_thread = None
def _initialize_metrics(self):
"""Initialize Prometheus metrics"""
metrics = {}
# Application metrics
metrics['http_requests_total'] = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code'],
registry=self.registry
)
metrics['http_request_duration_seconds'] = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint'],
registry=self.registry
)
metrics['active_users'] = Gauge(
'active_users',
'Number of active users',
registry=self.registry
)
metrics['incident_count'] = Gauge(
'incident_count',
'Total number of incidents',
['status', 'priority'],
registry=self.registry
)
metrics['sla_breach_count'] = Gauge(
'sla_breach_count',
'Number of SLA breaches',
['sla_type'],
registry=self.registry
)
# System metrics
metrics['system_cpu_usage'] = Gauge(
'system_cpu_usage_percent',
'System CPU usage percentage',
registry=self.registry
)
metrics['system_memory_usage'] = Gauge(
'system_memory_usage_percent',
'System memory usage percentage',
registry=self.registry
)
metrics['system_disk_usage'] = Gauge(
'system_disk_usage_percent',
'System disk usage percentage',
registry=self.registry
)
metrics['database_connections'] = Gauge(
'database_connections_active',
'Active database connections',
registry=self.registry
)
metrics['cache_hit_ratio'] = Gauge(
'cache_hit_ratio',
'Cache hit ratio',
registry=self.registry
)
# Business metrics
metrics['incident_resolution_time'] = Histogram(
'incident_resolution_time_seconds',
'Incident resolution time in seconds',
['priority', 'category'],
registry=self.registry
)
metrics['automation_success_rate'] = Gauge(
'automation_success_rate',
'Automation success rate',
['automation_type'],
registry=self.registry
)
metrics['user_satisfaction_score'] = Gauge(
'user_satisfaction_score',
'User satisfaction score',
registry=self.registry
)
return metrics
def start_collection(self):
"""Start metrics collection in background thread"""
if self.is_running:
return
self.is_running = True
self.collection_thread = threading.Thread(target=self._collect_metrics_loop)
self.collection_thread.daemon = True
self.collection_thread.start()
logger.info("Metrics collection started")
def stop_collection(self):
"""Stop metrics collection"""
self.is_running = False
if self.collection_thread:
self.collection_thread.join()
logger.info("Metrics collection stopped")
def _collect_metrics_loop(self):
"""Main metrics collection loop"""
while self.is_running:
try:
self._collect_system_metrics()
self._collect_application_metrics()
self._collect_business_metrics()
time.sleep(self.collection_interval)
except Exception as e:
logger.error(f"Error collecting metrics: {str(e)}")
time.sleep(self.collection_interval)
def _collect_system_metrics(self):
"""Collect system-level metrics"""
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics['system_cpu_usage'].set(cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
self.metrics['system_memory_usage'].set(memory.percent)
# Disk usage
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
self.metrics['system_disk_usage'].set(disk_percent)
# Database connections
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM pg_stat_activity")
db_connections = cursor.fetchone()[0]
self.metrics['database_connections'].set(db_connections)
# Cache hit ratio
cache_stats = cache._cache.get_stats()
if cache_stats:
hit_ratio = cache_stats.get('hit_ratio', 0)
self.metrics['cache_hit_ratio'].set(hit_ratio)
except Exception as e:
logger.error(f"Error collecting system metrics: {str(e)}")
def _collect_application_metrics(self):
"""Collect application-level metrics"""
try:
# Active users (from cache)
active_users = cache.get('active_users_count', 0)
self.metrics['active_users'].set(active_users)
# Incident counts
from incident_intelligence.models import Incident
from django.db import models
incident_counts = Incident.objects.values('status', 'priority').annotate(
count=models.Count('id')
)
for incident in incident_counts:
self.metrics['incident_count'].labels(
status=incident['status'],
priority=incident['priority']
).set(incident['count'])
# SLA breach counts
from sla_oncall.models import SLAInstance
sla_breaches = SLAInstance.objects.filter(
status='breached'
).values('sla_type').annotate(
count=models.Count('id')
)
for breach in sla_breaches:
self.metrics['sla_breach_count'].labels(
sla_type=breach['sla_type']
).set(breach['count'])
except Exception as e:
logger.error(f"Error collecting application metrics: {str(e)}")
def _collect_business_metrics(self):
"""Collect business-level metrics"""
try:
# Incident resolution times
from incident_intelligence.models import Incident
from django.db import models
resolved_incidents = Incident.objects.filter(
status='resolved',
resolved_at__isnull=False
).values('priority', 'category')
for incident in resolved_incidents:
resolution_time = (incident['resolved_at'] - incident['created_at']).total_seconds()
self.metrics['incident_resolution_time'].labels(
priority=incident['priority'],
category=incident['category']
).observe(resolution_time)
# Automation success rates
from automation_orchestration.models import AutomationExecution
from django.db import models
automation_stats = AutomationExecution.objects.values('automation_type').annotate(
total=models.Count('id'),
successful=models.Count('id', filter=models.Q(status='success'))
)
for stat in automation_stats:
success_rate = (stat['successful'] / stat['total']) * 100 if stat['total'] > 0 else 0
self.metrics['automation_success_rate'].labels(
automation_type=stat['automation_type']
).set(success_rate)
# User satisfaction score (from feedback)
from knowledge_learning.models import UserFeedback
from django.db import models
feedback_scores = UserFeedback.objects.values('rating').annotate(
count=models.Count('id')
)
total_feedback = sum(f['count'] for f in feedback_scores)
if total_feedback > 0:
weighted_score = sum(f['rating'] * f['count'] for f in feedback_scores) / total_feedback
self.metrics['user_satisfaction_score'].set(weighted_score)
except Exception as e:
logger.error(f"Error collecting business metrics: {str(e)}")
def record_http_request(self, method: str, endpoint: str, status_code: int, duration: float):
"""Record HTTP request metrics"""
self.metrics['http_requests_total'].labels(
method=method,
endpoint=endpoint,
status_code=str(status_code)
).inc()
self.metrics['http_request_duration_seconds'].labels(
method=method,
endpoint=endpoint
).observe(duration)
def get_metrics(self) -> str:
"""Get metrics in Prometheus format"""
return generate_latest(self.registry)
class AlertManager:
"""Enterprise alert management system"""
def __init__(self):
self.alert_rules = self._load_alert_rules()
self.notification_channels = self._load_notification_channels()
self.alert_queue = queue.Queue()
self.is_running = False
self.alert_thread = None
def _load_alert_rules(self) -> List[Dict[str, Any]]:
"""Load alert rules from configuration"""
return [
{
'name': 'high_cpu_usage',
'condition': 'system_cpu_usage > 80',
'severity': 'warning',
'duration': 300, # 5 minutes
'enabled': True,
},
{
'name': 'high_memory_usage',
'condition': 'system_memory_usage > 85',
'severity': 'warning',
'duration': 300,
'enabled': True,
},
{
'name': 'disk_space_low',
'condition': 'system_disk_usage > 90',
'severity': 'critical',
'duration': 60,
'enabled': True,
},
{
'name': 'database_connections_high',
'condition': 'database_connections > 50',
'severity': 'warning',
'duration': 300,
'enabled': True,
},
{
'name': 'incident_volume_high',
'condition': 'incident_count > 100',
'severity': 'warning',
'duration': 600,
'enabled': True,
},
{
'name': 'sla_breach_detected',
'condition': 'sla_breach_count > 0',
'severity': 'critical',
'duration': 0,
'enabled': True,
},
]
def _load_notification_channels(self) -> List[Dict[str, Any]]:
"""Load notification channels"""
return [
{
'name': 'email',
'type': 'email',
'enabled': True,
'config': {
'recipients': ['admin@company.com'],
'template': 'alert_email.html',
}
},
{
'name': 'slack',
'type': 'slack',
'enabled': True,
'config': {
'webhook_url': os.getenv('SLACK_WEBHOOK_URL'),
'channel': '#alerts',
}
},
{
'name': 'webhook',
'type': 'webhook',
'enabled': True,
'config': {
'url': os.getenv('ALERT_WEBHOOK_URL'),
'headers': {'Authorization': f'Bearer {os.getenv("ALERT_WEBHOOK_TOKEN")}'},
}
},
]
def start_monitoring(self):
"""Start alert monitoring"""
if self.is_running:
return
self.is_running = True
self.alert_thread = threading.Thread(target=self._monitor_alerts)
self.alert_thread.daemon = True
self.alert_thread.start()
logger.info("Alert monitoring started")
def stop_monitoring(self):
"""Stop alert monitoring"""
self.is_running = False
if self.alert_thread:
self.alert_thread.join()
logger.info("Alert monitoring stopped")
def _monitor_alerts(self):
"""Main alert monitoring loop"""
while self.is_running:
try:
self._check_alert_rules()
time.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error monitoring alerts: {str(e)}")
time.sleep(60)
def _check_alert_rules(self):
"""Check all alert rules"""
for rule in self.alert_rules:
if not rule['enabled']:
continue
try:
if self._evaluate_rule(rule):
self._trigger_alert(rule)
except Exception as e:
logger.error(f"Error checking rule {rule['name']}: {str(e)}")
def _evaluate_rule(self, rule: Dict[str, Any]) -> bool:
"""Evaluate alert rule condition"""
condition = rule['condition']
# Parse condition (simplified)
if 'system_cpu_usage' in condition:
cpu_usage = psutil.cpu_percent()
threshold = float(condition.split('>')[1].strip())
return cpu_usage > threshold
elif 'system_memory_usage' in condition:
memory = psutil.virtual_memory()
threshold = float(condition.split('>')[1].strip())
return memory.percent > threshold
elif 'system_disk_usage' in condition:
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
threshold = float(condition.split('>')[1].strip())
return disk_percent > threshold
elif 'database_connections' in condition:
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM pg_stat_activity")
connections = cursor.fetchone()[0]
threshold = float(condition.split('>')[1].strip())
return connections > threshold
elif 'incident_count' in condition:
from incident_intelligence.models import Incident
count = Incident.objects.count()
threshold = float(condition.split('>')[1].strip())
return count > threshold
elif 'sla_breach_count' in condition:
from sla_oncall.models import SLAInstance
count = SLAInstance.objects.filter(status='breached').count()
threshold = float(condition.split('>')[1].strip())
return count > threshold
return False
def _trigger_alert(self, rule: Dict[str, Any]):
"""Trigger alert for rule violation"""
alert = {
'rule_name': rule['name'],
'severity': rule['severity'],
'message': f"Alert: {rule['name']} - {rule['condition']}",
'timestamp': timezone.now().isoformat(),
'metadata': {
'condition': rule['condition'],
'duration': rule['duration'],
}
}
# Send notifications
self._send_notifications(alert)
# Store alert
self._store_alert(alert)
logger.warning(f"Alert triggered: {rule['name']}")
def _send_notifications(self, alert: Dict[str, Any]):
"""Send alert notifications"""
for channel in self.notification_channels:
if not channel['enabled']:
continue
try:
if channel['type'] == 'email':
self._send_email_notification(alert, channel)
elif channel['type'] == 'slack':
self._send_slack_notification(alert, channel)
elif channel['type'] == 'webhook':
self._send_webhook_notification(alert, channel)
except Exception as e:
logger.error(f"Error sending notification via {channel['name']}: {str(e)}")
def _send_email_notification(self, alert: Dict[str, Any], channel: Dict[str, Any]):
"""Send email notification"""
from django.core.mail import send_mail
subject = f"ETB-API Alert: {alert['rule_name']}"
message = f"""
Alert: {alert['rule_name']}
Severity: {alert['severity']}
Message: {alert['message']}
Time: {alert['timestamp']}
"""
send_mail(
subject=subject,
message=message,
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=channel['config']['recipients'],
fail_silently=False,
)
def _send_slack_notification(self, alert: Dict[str, Any], channel: Dict[str, Any]):
"""Send Slack notification"""
webhook_url = channel['config']['webhook_url']
if not webhook_url:
return
payload = {
'channel': channel['config']['channel'],
'text': f"🚨 ETB-API Alert: {alert['rule_name']}",
'attachments': [
{
'color': 'danger' if alert['severity'] == 'critical' else 'warning',
'fields': [
{'title': 'Severity', 'value': alert['severity'], 'short': True},
{'title': 'Message', 'value': alert['message'], 'short': False},
{'title': 'Time', 'value': alert['timestamp'], 'short': True},
]
}
]
}
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
def _send_webhook_notification(self, alert: Dict[str, Any], channel: Dict[str, Any]):
"""Send webhook notification"""
webhook_url = channel['config']['url']
if not webhook_url:
return
headers = channel['config'].get('headers', {})
response = requests.post(webhook_url, json=alert, headers=headers, timeout=10)
response.raise_for_status()
def _store_alert(self, alert: Dict[str, Any]):
"""Store alert in database"""
try:
from monitoring.models import Alert
Alert.objects.create(
rule_name=alert['rule_name'],
severity=alert['severity'],
message=alert['message'],
metadata=alert['metadata'],
timestamp=timezone.now(),
)
except Exception as e:
logger.error(f"Error storing alert: {str(e)}")
class PerformanceProfiler:
"""Enterprise performance profiling system"""
def __init__(self):
self.profiles = {}
self.is_enabled = True
def start_profile(self, name: str) -> str:
"""Start profiling a function or operation"""
if not self.is_enabled:
return None
profile_id = f"{name}_{int(time.time() * 1000)}"
self.profiles[profile_id] = {
'name': name,
'start_time': time.time(),
'start_memory': psutil.Process().memory_info().rss,
'start_cpu': psutil.cpu_percent(),
}
return profile_id
def end_profile(self, profile_id: str) -> Dict[str, Any]:
"""End profiling and return results"""
if not profile_id or profile_id not in self.profiles:
return None
profile = self.profiles.pop(profile_id)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
end_cpu = psutil.cpu_percent()
result = {
'name': profile['name'],
'duration': end_time - profile['start_time'],
'memory_delta': end_memory - profile['start_memory'],
'cpu_delta': end_cpu - profile['start_cpu'],
'timestamp': timezone.now().isoformat(),
}
# Log slow operations
if result['duration'] > 1.0: # 1 second
logger.warning(f"Slow operation detected: {result['name']} took {result['duration']:.2f}s")
return result
def profile_function(self, func):
"""Decorator to profile function execution"""
def wrapper(*args, **kwargs):
profile_id = self.start_profile(func.__name__)
try:
result = func(*args, **kwargs)
return result
finally:
if profile_id:
self.end_profile(profile_id)
return wrapper
# Global instances
metrics_collector = MetricsCollector()
alert_manager = AlertManager()
performance_profiler = PerformanceProfiler()
# API Views for monitoring
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def metrics_endpoint(request):
"""Prometheus metrics endpoint"""
try:
metrics_data = metrics_collector.get_metrics()
return Response(metrics_data, content_type=CONTENT_TYPE_LATEST)
except Exception as e:
logger.error(f"Error getting metrics: {str(e)}")
return Response(
{'error': 'Failed to get metrics'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def monitoring_dashboard(request):
"""Get monitoring dashboard data"""
try:
# Get system metrics
system_metrics = {
'cpu_usage': psutil.cpu_percent(),
'memory_usage': psutil.virtual_memory().percent,
'disk_usage': (psutil.disk_usage('/').used / psutil.disk_usage('/').total) * 100,
'load_average': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else [0, 0, 0],
}
# Get application metrics
from incident_intelligence.models import Incident
from sla_oncall.models import SLAInstance
application_metrics = {
'total_incidents': Incident.objects.count(),
'active_incidents': Incident.objects.filter(status='active').count(),
'resolved_incidents': Incident.objects.filter(status='resolved').count(),
'sla_breaches': SLAInstance.objects.filter(status='breached').count(),
'active_users': cache.get('active_users_count', 0),
}
# Get recent alerts
from monitoring.models import Alert
recent_alerts = Alert.objects.filter(
timestamp__gte=timezone.now() - timedelta(hours=24)
).order_by('-timestamp')[:10]
return Response({
'system_metrics': system_metrics,
'application_metrics': application_metrics,
'recent_alerts': [
{
'rule_name': alert.rule_name,
'severity': alert.severity,
'message': alert.message,
'timestamp': alert.timestamp.isoformat(),
}
for alert in recent_alerts
],
})
except Exception as e:
logger.error(f"Monitoring dashboard error: {str(e)}")
return Response(
{'error': 'Failed to load monitoring dashboard'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def test_alert(request):
"""Test alert notification"""
try:
test_alert = {
'rule_name': 'test_alert',
'severity': 'info',
'message': 'This is a test alert',
'timestamp': timezone.now().isoformat(),
'metadata': {'test': True},
}
alert_manager._send_notifications(test_alert)
return Response({
'message': 'Test alert sent successfully',
'alert': test_alert,
})
except Exception as e:
logger.error(f"Test alert error: {str(e)}")
return Response(
{'error': 'Failed to send test alert'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class MonitoringMiddleware:
"""Middleware for request monitoring and metrics collection"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
# Calculate request duration
duration = time.time() - start_time
# Record metrics
metrics_collector.record_http_request(
method=request.method,
endpoint=request.path,
status_code=response.status_code,
duration=duration
)
# Add performance headers
response['X-Response-Time'] = f"{duration:.3f}s"
response['X-Request-ID'] = request.META.get('HTTP_X_REQUEST_ID', 'unknown')
return response
# Management command for starting monitoring services
class StartMonitoringCommand(BaseCommand):
"""Django management command to start monitoring services"""
help = 'Start monitoring services (metrics collection and alerting)'
def handle(self, *args, **options):
self.stdout.write('Starting monitoring services...')
# Start metrics collection
metrics_collector.start_collection()
self.stdout.write(self.style.SUCCESS('Metrics collection started'))
# Start alert monitoring
alert_manager.start_monitoring()
self.stdout.write(self.style.SUCCESS('Alert monitoring started'))
self.stdout.write(self.style.SUCCESS('All monitoring services started successfully'))
# Keep running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.stdout.write('Stopping monitoring services...')
metrics_collector.stop_collection()
alert_manager.stop_monitoring()
self.stdout.write(self.style.SUCCESS('Monitoring services stopped'))