""" Enterprise Monitoring System for ETB-API Advanced monitoring with metrics, alerting, and observability """ import logging import time import psutil import json import os from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Union from django.http import HttpRequest, JsonResponse from django.conf import settings from django.utils import timezone from django.core.cache import cache from django.db import connection from django.core.management import call_command from rest_framework import status from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated from django.core.management.base import BaseCommand import requests import redis from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST from prometheus_client.core import CollectorRegistry import threading import queue logger = logging.getLogger(__name__) class MetricsCollector: """Enterprise metrics collection system""" def __init__(self): self.registry = CollectorRegistry() self.metrics = self._initialize_metrics() self.collection_interval = 60 # seconds self.is_running = False self.collection_thread = None def _initialize_metrics(self): """Initialize Prometheus metrics""" metrics = {} # Application metrics metrics['http_requests_total'] = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status_code'], registry=self.registry ) metrics['http_request_duration_seconds'] = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint'], registry=self.registry ) metrics['active_users'] = Gauge( 'active_users', 'Number of active users', registry=self.registry ) metrics['incident_count'] = Gauge( 'incident_count', 'Total number of incidents', ['status', 'priority'], registry=self.registry ) metrics['sla_breach_count'] = Gauge( 'sla_breach_count', 'Number of SLA breaches', ['sla_type'], registry=self.registry ) # System metrics metrics['system_cpu_usage'] = Gauge( 'system_cpu_usage_percent', 'System CPU usage percentage', registry=self.registry ) metrics['system_memory_usage'] = Gauge( 'system_memory_usage_percent', 'System memory usage percentage', registry=self.registry ) metrics['system_disk_usage'] = Gauge( 'system_disk_usage_percent', 'System disk usage percentage', registry=self.registry ) metrics['database_connections'] = Gauge( 'database_connections_active', 'Active database connections', registry=self.registry ) metrics['cache_hit_ratio'] = Gauge( 'cache_hit_ratio', 'Cache hit ratio', registry=self.registry ) # Business metrics metrics['incident_resolution_time'] = Histogram( 'incident_resolution_time_seconds', 'Incident resolution time in seconds', ['priority', 'category'], registry=self.registry ) metrics['automation_success_rate'] = Gauge( 'automation_success_rate', 'Automation success rate', ['automation_type'], registry=self.registry ) metrics['user_satisfaction_score'] = Gauge( 'user_satisfaction_score', 'User satisfaction score', registry=self.registry ) return metrics def start_collection(self): """Start metrics collection in background thread""" if self.is_running: return self.is_running = True self.collection_thread = threading.Thread(target=self._collect_metrics_loop) self.collection_thread.daemon = True self.collection_thread.start() logger.info("Metrics collection started") def stop_collection(self): """Stop metrics collection""" self.is_running = False if self.collection_thread: self.collection_thread.join() logger.info("Metrics collection stopped") def _collect_metrics_loop(self): """Main metrics collection loop""" while self.is_running: try: self._collect_system_metrics() self._collect_application_metrics() self._collect_business_metrics() time.sleep(self.collection_interval) except Exception as e: logger.error(f"Error collecting metrics: {str(e)}") time.sleep(self.collection_interval) def _collect_system_metrics(self): """Collect system-level metrics""" try: # CPU usage cpu_percent = psutil.cpu_percent(interval=1) self.metrics['system_cpu_usage'].set(cpu_percent) # Memory usage memory = psutil.virtual_memory() self.metrics['system_memory_usage'].set(memory.percent) # Disk usage disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) * 100 self.metrics['system_disk_usage'].set(disk_percent) # Database connections with connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM pg_stat_activity") db_connections = cursor.fetchone()[0] self.metrics['database_connections'].set(db_connections) # Cache hit ratio cache_stats = cache._cache.get_stats() if cache_stats: hit_ratio = cache_stats.get('hit_ratio', 0) self.metrics['cache_hit_ratio'].set(hit_ratio) except Exception as e: logger.error(f"Error collecting system metrics: {str(e)}") def _collect_application_metrics(self): """Collect application-level metrics""" try: # Active users (from cache) active_users = cache.get('active_users_count', 0) self.metrics['active_users'].set(active_users) # Incident counts from incident_intelligence.models import Incident from django.db import models incident_counts = Incident.objects.values('status', 'priority').annotate( count=models.Count('id') ) for incident in incident_counts: self.metrics['incident_count'].labels( status=incident['status'], priority=incident['priority'] ).set(incident['count']) # SLA breach counts from sla_oncall.models import SLAInstance sla_breaches = SLAInstance.objects.filter( status='breached' ).values('sla_type').annotate( count=models.Count('id') ) for breach in sla_breaches: self.metrics['sla_breach_count'].labels( sla_type=breach['sla_type'] ).set(breach['count']) except Exception as e: logger.error(f"Error collecting application metrics: {str(e)}") def _collect_business_metrics(self): """Collect business-level metrics""" try: # Incident resolution times from incident_intelligence.models import Incident from django.db import models resolved_incidents = Incident.objects.filter( status='resolved', resolved_at__isnull=False ).values('priority', 'category') for incident in resolved_incidents: resolution_time = (incident['resolved_at'] - incident['created_at']).total_seconds() self.metrics['incident_resolution_time'].labels( priority=incident['priority'], category=incident['category'] ).observe(resolution_time) # Automation success rates from automation_orchestration.models import AutomationExecution from django.db import models automation_stats = AutomationExecution.objects.values('automation_type').annotate( total=models.Count('id'), successful=models.Count('id', filter=models.Q(status='success')) ) for stat in automation_stats: success_rate = (stat['successful'] / stat['total']) * 100 if stat['total'] > 0 else 0 self.metrics['automation_success_rate'].labels( automation_type=stat['automation_type'] ).set(success_rate) # User satisfaction score (from feedback) from knowledge_learning.models import UserFeedback from django.db import models feedback_scores = UserFeedback.objects.values('rating').annotate( count=models.Count('id') ) total_feedback = sum(f['count'] for f in feedback_scores) if total_feedback > 0: weighted_score = sum(f['rating'] * f['count'] for f in feedback_scores) / total_feedback self.metrics['user_satisfaction_score'].set(weighted_score) except Exception as e: logger.error(f"Error collecting business metrics: {str(e)}") def record_http_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics""" self.metrics['http_requests_total'].labels( method=method, endpoint=endpoint, status_code=str(status_code) ).inc() self.metrics['http_request_duration_seconds'].labels( method=method, endpoint=endpoint ).observe(duration) def get_metrics(self) -> str: """Get metrics in Prometheus format""" return generate_latest(self.registry) class AlertManager: """Enterprise alert management system""" def __init__(self): self.alert_rules = self._load_alert_rules() self.notification_channels = self._load_notification_channels() self.alert_queue = queue.Queue() self.is_running = False self.alert_thread = None def _load_alert_rules(self) -> List[Dict[str, Any]]: """Load alert rules from configuration""" return [ { 'name': 'high_cpu_usage', 'condition': 'system_cpu_usage > 80', 'severity': 'warning', 'duration': 300, # 5 minutes 'enabled': True, }, { 'name': 'high_memory_usage', 'condition': 'system_memory_usage > 85', 'severity': 'warning', 'duration': 300, 'enabled': True, }, { 'name': 'disk_space_low', 'condition': 'system_disk_usage > 90', 'severity': 'critical', 'duration': 60, 'enabled': True, }, { 'name': 'database_connections_high', 'condition': 'database_connections > 50', 'severity': 'warning', 'duration': 300, 'enabled': True, }, { 'name': 'incident_volume_high', 'condition': 'incident_count > 100', 'severity': 'warning', 'duration': 600, 'enabled': True, }, { 'name': 'sla_breach_detected', 'condition': 'sla_breach_count > 0', 'severity': 'critical', 'duration': 0, 'enabled': True, }, ] def _load_notification_channels(self) -> List[Dict[str, Any]]: """Load notification channels""" return [ { 'name': 'email', 'type': 'email', 'enabled': True, 'config': { 'recipients': ['admin@company.com'], 'template': 'alert_email.html', } }, { 'name': 'slack', 'type': 'slack', 'enabled': True, 'config': { 'webhook_url': os.getenv('SLACK_WEBHOOK_URL'), 'channel': '#alerts', } }, { 'name': 'webhook', 'type': 'webhook', 'enabled': True, 'config': { 'url': os.getenv('ALERT_WEBHOOK_URL'), 'headers': {'Authorization': f'Bearer {os.getenv("ALERT_WEBHOOK_TOKEN")}'}, } }, ] def start_monitoring(self): """Start alert monitoring""" if self.is_running: return self.is_running = True self.alert_thread = threading.Thread(target=self._monitor_alerts) self.alert_thread.daemon = True self.alert_thread.start() logger.info("Alert monitoring started") def stop_monitoring(self): """Stop alert monitoring""" self.is_running = False if self.alert_thread: self.alert_thread.join() logger.info("Alert monitoring stopped") def _monitor_alerts(self): """Main alert monitoring loop""" while self.is_running: try: self._check_alert_rules() time.sleep(60) # Check every minute except Exception as e: logger.error(f"Error monitoring alerts: {str(e)}") time.sleep(60) def _check_alert_rules(self): """Check all alert rules""" for rule in self.alert_rules: if not rule['enabled']: continue try: if self._evaluate_rule(rule): self._trigger_alert(rule) except Exception as e: logger.error(f"Error checking rule {rule['name']}: {str(e)}") def _evaluate_rule(self, rule: Dict[str, Any]) -> bool: """Evaluate alert rule condition""" condition = rule['condition'] # Parse condition (simplified) if 'system_cpu_usage' in condition: cpu_usage = psutil.cpu_percent() threshold = float(condition.split('>')[1].strip()) return cpu_usage > threshold elif 'system_memory_usage' in condition: memory = psutil.virtual_memory() threshold = float(condition.split('>')[1].strip()) return memory.percent > threshold elif 'system_disk_usage' in condition: disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) * 100 threshold = float(condition.split('>')[1].strip()) return disk_percent > threshold elif 'database_connections' in condition: with connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM pg_stat_activity") connections = cursor.fetchone()[0] threshold = float(condition.split('>')[1].strip()) return connections > threshold elif 'incident_count' in condition: from incident_intelligence.models import Incident count = Incident.objects.count() threshold = float(condition.split('>')[1].strip()) return count > threshold elif 'sla_breach_count' in condition: from sla_oncall.models import SLAInstance count = SLAInstance.objects.filter(status='breached').count() threshold = float(condition.split('>')[1].strip()) return count > threshold return False def _trigger_alert(self, rule: Dict[str, Any]): """Trigger alert for rule violation""" alert = { 'rule_name': rule['name'], 'severity': rule['severity'], 'message': f"Alert: {rule['name']} - {rule['condition']}", 'timestamp': timezone.now().isoformat(), 'metadata': { 'condition': rule['condition'], 'duration': rule['duration'], } } # Send notifications self._send_notifications(alert) # Store alert self._store_alert(alert) logger.warning(f"Alert triggered: {rule['name']}") def _send_notifications(self, alert: Dict[str, Any]): """Send alert notifications""" for channel in self.notification_channels: if not channel['enabled']: continue try: if channel['type'] == 'email': self._send_email_notification(alert, channel) elif channel['type'] == 'slack': self._send_slack_notification(alert, channel) elif channel['type'] == 'webhook': self._send_webhook_notification(alert, channel) except Exception as e: logger.error(f"Error sending notification via {channel['name']}: {str(e)}") def _send_email_notification(self, alert: Dict[str, Any], channel: Dict[str, Any]): """Send email notification""" from django.core.mail import send_mail subject = f"ETB-API Alert: {alert['rule_name']}" message = f""" Alert: {alert['rule_name']} Severity: {alert['severity']} Message: {alert['message']} Time: {alert['timestamp']} """ send_mail( subject=subject, message=message, from_email=settings.DEFAULT_FROM_EMAIL, recipient_list=channel['config']['recipients'], fail_silently=False, ) def _send_slack_notification(self, alert: Dict[str, Any], channel: Dict[str, Any]): """Send Slack notification""" webhook_url = channel['config']['webhook_url'] if not webhook_url: return payload = { 'channel': channel['config']['channel'], 'text': f"🚨 ETB-API Alert: {alert['rule_name']}", 'attachments': [ { 'color': 'danger' if alert['severity'] == 'critical' else 'warning', 'fields': [ {'title': 'Severity', 'value': alert['severity'], 'short': True}, {'title': 'Message', 'value': alert['message'], 'short': False}, {'title': 'Time', 'value': alert['timestamp'], 'short': True}, ] } ] } response = requests.post(webhook_url, json=payload, timeout=10) response.raise_for_status() def _send_webhook_notification(self, alert: Dict[str, Any], channel: Dict[str, Any]): """Send webhook notification""" webhook_url = channel['config']['url'] if not webhook_url: return headers = channel['config'].get('headers', {}) response = requests.post(webhook_url, json=alert, headers=headers, timeout=10) response.raise_for_status() def _store_alert(self, alert: Dict[str, Any]): """Store alert in database""" try: from monitoring.models import Alert Alert.objects.create( rule_name=alert['rule_name'], severity=alert['severity'], message=alert['message'], metadata=alert['metadata'], timestamp=timezone.now(), ) except Exception as e: logger.error(f"Error storing alert: {str(e)}") class PerformanceProfiler: """Enterprise performance profiling system""" def __init__(self): self.profiles = {} self.is_enabled = True def start_profile(self, name: str) -> str: """Start profiling a function or operation""" if not self.is_enabled: return None profile_id = f"{name}_{int(time.time() * 1000)}" self.profiles[profile_id] = { 'name': name, 'start_time': time.time(), 'start_memory': psutil.Process().memory_info().rss, 'start_cpu': psutil.cpu_percent(), } return profile_id def end_profile(self, profile_id: str) -> Dict[str, Any]: """End profiling and return results""" if not profile_id or profile_id not in self.profiles: return None profile = self.profiles.pop(profile_id) end_time = time.time() end_memory = psutil.Process().memory_info().rss end_cpu = psutil.cpu_percent() result = { 'name': profile['name'], 'duration': end_time - profile['start_time'], 'memory_delta': end_memory - profile['start_memory'], 'cpu_delta': end_cpu - profile['start_cpu'], 'timestamp': timezone.now().isoformat(), } # Log slow operations if result['duration'] > 1.0: # 1 second logger.warning(f"Slow operation detected: {result['name']} took {result['duration']:.2f}s") return result def profile_function(self, func): """Decorator to profile function execution""" def wrapper(*args, **kwargs): profile_id = self.start_profile(func.__name__) try: result = func(*args, **kwargs) return result finally: if profile_id: self.end_profile(profile_id) return wrapper # Global instances metrics_collector = MetricsCollector() alert_manager = AlertManager() performance_profiler = PerformanceProfiler() # API Views for monitoring @api_view(['GET']) @permission_classes([IsAuthenticated]) def metrics_endpoint(request): """Prometheus metrics endpoint""" try: metrics_data = metrics_collector.get_metrics() return Response(metrics_data, content_type=CONTENT_TYPE_LATEST) except Exception as e: logger.error(f"Error getting metrics: {str(e)}") return Response( {'error': 'Failed to get metrics'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['GET']) @permission_classes([IsAuthenticated]) def monitoring_dashboard(request): """Get monitoring dashboard data""" try: # Get system metrics system_metrics = { 'cpu_usage': psutil.cpu_percent(), 'memory_usage': psutil.virtual_memory().percent, 'disk_usage': (psutil.disk_usage('/').used / psutil.disk_usage('/').total) * 100, 'load_average': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else [0, 0, 0], } # Get application metrics from incident_intelligence.models import Incident from sla_oncall.models import SLAInstance application_metrics = { 'total_incidents': Incident.objects.count(), 'active_incidents': Incident.objects.filter(status='active').count(), 'resolved_incidents': Incident.objects.filter(status='resolved').count(), 'sla_breaches': SLAInstance.objects.filter(status='breached').count(), 'active_users': cache.get('active_users_count', 0), } # Get recent alerts from monitoring.models import Alert recent_alerts = Alert.objects.filter( timestamp__gte=timezone.now() - timedelta(hours=24) ).order_by('-timestamp')[:10] return Response({ 'system_metrics': system_metrics, 'application_metrics': application_metrics, 'recent_alerts': [ { 'rule_name': alert.rule_name, 'severity': alert.severity, 'message': alert.message, 'timestamp': alert.timestamp.isoformat(), } for alert in recent_alerts ], }) except Exception as e: logger.error(f"Monitoring dashboard error: {str(e)}") return Response( {'error': 'Failed to load monitoring dashboard'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @api_view(['POST']) @permission_classes([IsAuthenticated]) def test_alert(request): """Test alert notification""" try: test_alert = { 'rule_name': 'test_alert', 'severity': 'info', 'message': 'This is a test alert', 'timestamp': timezone.now().isoformat(), 'metadata': {'test': True}, } alert_manager._send_notifications(test_alert) return Response({ 'message': 'Test alert sent successfully', 'alert': test_alert, }) except Exception as e: logger.error(f"Test alert error: {str(e)}") return Response( {'error': 'Failed to send test alert'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) class MonitoringMiddleware: """Middleware for request monitoring and metrics collection""" def __init__(self, get_response): self.get_response = get_response def __call__(self, request): start_time = time.time() response = self.get_response(request) # Calculate request duration duration = time.time() - start_time # Record metrics metrics_collector.record_http_request( method=request.method, endpoint=request.path, status_code=response.status_code, duration=duration ) # Add performance headers response['X-Response-Time'] = f"{duration:.3f}s" response['X-Request-ID'] = request.META.get('HTTP_X_REQUEST_ID', 'unknown') return response # Management command for starting monitoring services class StartMonitoringCommand(BaseCommand): """Django management command to start monitoring services""" help = 'Start monitoring services (metrics collection and alerting)' def handle(self, *args, **options): self.stdout.write('Starting monitoring services...') # Start metrics collection metrics_collector.start_collection() self.stdout.write(self.style.SUCCESS('Metrics collection started')) # Start alert monitoring alert_manager.start_monitoring() self.stdout.write(self.style.SUCCESS('Alert monitoring started')) self.stdout.write(self.style.SUCCESS('All monitoring services started successfully')) # Keep running try: while True: time.sleep(1) except KeyboardInterrupt: self.stdout.write('Stopping monitoring services...') metrics_collector.stop_collection() alert_manager.stop_monitoring() self.stdout.write(self.style.SUCCESS('Monitoring services stopped'))