""" Health check services for monitoring system components """ import time import requests import psutil import logging from typing import Dict, Any, Optional, Tuple from django.conf import settings from django.db import connection from django.core.cache import cache from django.utils import timezone from celery import current_app as celery_app logger = logging.getLogger(__name__) class BaseHealthCheck: """Base class for health checks""" def __init__(self, target): self.target = target self.start_time = None self.end_time = None def execute(self) -> Dict[str, Any]: """Execute the health check and return results""" self.start_time = time.time() try: result = self._perform_check() self.end_time = time.time() result.update({ 'response_time_ms': int((self.end_time - self.start_time) * 1000), 'checked_at': timezone.now(), 'error_message': None }) return result except Exception as e: self.end_time = time.time() logger.error(f"Health check failed for {self.target.name}: {e}") return { 'status': 'CRITICAL', 'response_time_ms': int((self.end_time - self.start_time) * 1000), 'checked_at': timezone.now(), 'error_message': str(e) } def _perform_check(self) -> Dict[str, Any]: """Override in subclasses to implement specific checks""" raise NotImplementedError class HTTPHealthCheck(BaseHealthCheck): """HTTP-based health check""" def _perform_check(self) -> Dict[str, Any]: url = self.target.endpoint_url if not url: raise ValueError("No endpoint URL configured") timeout = self.target.timeout_seconds expected_codes = self.target.expected_status_codes or [200] response = requests.get(url, timeout=timeout) if response.status_code in expected_codes: status = 'HEALTHY' elif response.status_code >= 500: status = 'CRITICAL' else: status = 'WARNING' return { 'status': status, 'status_code': response.status_code, 'response_body': response.text[:1000] # Limit response body size } class DatabaseHealthCheck(BaseHealthCheck): """Database connection health check""" def _perform_check(self) -> Dict[str, Any]: try: with connection.cursor() as cursor: cursor.execute("SELECT 1") result = cursor.fetchone() if result and result[0] == 1: return { 'status': 'HEALTHY', 'status_code': 200 } else: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': 'Database query returned unexpected result' } except Exception as e: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': f'Database connection failed: {str(e)}' } class CacheHealthCheck(BaseHealthCheck): """Cache system health check""" def _perform_check(self) -> Dict[str, Any]: try: # Test cache write/read test_key = f"health_check_{int(time.time())}" test_value = "health_check_value" cache.set(test_key, test_value, timeout=10) retrieved_value = cache.get(test_key) if retrieved_value == test_value: cache.delete(test_key) # Clean up return { 'status': 'HEALTHY', 'status_code': 200 } else: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': 'Cache read/write test failed' } except Exception as e: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': f'Cache operation failed: {str(e)}' } class CeleryHealthCheck(BaseHealthCheck): """Celery worker health check""" def _perform_check(self) -> Dict[str, Any]: try: # Check if Celery workers are active inspect = celery_app.control.inspect() active_workers = inspect.active() if active_workers: worker_count = len(active_workers) return { 'status': 'HEALTHY', 'status_code': 200, 'response_body': f'Active workers: {worker_count}' } else: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': 'No active Celery workers found' } except Exception as e: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': f'Celery health check failed: {str(e)}' } class SystemResourceHealthCheck(BaseHealthCheck): """System resource health check""" def _perform_check(self) -> Dict[str, Any]: try: # Get system metrics cpu_percent = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') # Determine status based on thresholds status = 'HEALTHY' if cpu_percent > 90 or memory.percent > 90 or disk.percent > 90: status = 'CRITICAL' elif cpu_percent > 80 or memory.percent > 80 or disk.percent > 80: status = 'WARNING' return { 'status': status, 'status_code': 200, 'cpu_usage_percent': cpu_percent, 'memory_usage_percent': memory.percent, 'disk_usage_percent': disk.percent, 'response_body': f'CPU: {cpu_percent}%, Memory: {memory.percent}%, Disk: {disk.percent}%' } except Exception as e: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': f'System resource check failed: {str(e)}' } class ModuleHealthCheck(BaseHealthCheck): """Django module health check""" def _perform_check(self) -> Dict[str, Any]: try: module_name = self.target.related_module if not module_name: raise ValueError("No module specified for module health check") # Import the module to check if it's accessible __import__(module_name) # Check if module has required models/views from django.apps import apps app_config = apps.get_app_config(module_name) if app_config: return { 'status': 'HEALTHY', 'status_code': 200, 'response_body': f'Module {module_name} is accessible' } else: return { 'status': 'WARNING', 'status_code': 200, 'error_message': f'Module {module_name} not found in Django apps' } except Exception as e: return { 'status': 'CRITICAL', 'status_code': 500, 'error_message': f'Module health check failed: {str(e)}' } class HealthCheckFactory: """Factory for creating health check instances""" CHECK_CLASSES = { 'HTTP': HTTPHealthCheck, 'DATABASE': DatabaseHealthCheck, 'CACHE': CacheHealthCheck, 'QUEUE': CeleryHealthCheck, 'CUSTOM': BaseHealthCheck, 'PING': HTTPHealthCheck, # Use HTTP for ping 'SSL': HTTPHealthCheck, # Use HTTP for SSL } @classmethod def create_health_check(cls, target, check_type: str) -> BaseHealthCheck: """Create a health check instance based on type""" check_class = cls.CHECK_CLASSES.get(check_type, BaseHealthCheck) return check_class(target) @classmethod def get_available_check_types(cls) -> list: """Get list of available health check types""" return list(cls.CHECK_CLASSES.keys()) class HealthCheckService: """Service for managing health checks""" def __init__(self): self.factory = HealthCheckFactory() def execute_health_check(self, target, check_type: str) -> Dict[str, Any]: """Execute a health check for a target""" health_check = self.factory.create_health_check(target, check_type) return health_check.execute() def execute_all_health_checks(self) -> Dict[str, Any]: """Execute health checks for all active targets""" from monitoring.models import MonitoringTarget, HealthCheck results = {} active_targets = MonitoringTarget.objects.filter( status='ACTIVE', health_check_enabled=True ) for target in active_targets: try: # Determine check type based on target type check_type = self._get_check_type_for_target(target) # Execute health check result = self.execute_health_check(target, check_type) # Save result to database HealthCheck.objects.create( target=target, check_type=check_type, status=result['status'], response_time_ms=result.get('response_time_ms'), status_code=result.get('status_code'), response_body=result.get('response_body'), error_message=result.get('error_message'), cpu_usage_percent=result.get('cpu_usage_percent'), memory_usage_percent=result.get('memory_usage_percent'), disk_usage_percent=result.get('disk_usage_percent') ) # Update target status target.last_checked = timezone.now() target.last_status = result['status'] target.save(update_fields=['last_checked', 'last_status']) results[target.name] = result except Exception as e: logger.error(f"Failed to execute health check for {target.name}: {e}") results[target.name] = { 'status': 'CRITICAL', 'error_message': str(e) } return results def _get_check_type_for_target(self, target) -> str: """Determine the appropriate check type for a target""" target_type_mapping = { 'APPLICATION': 'HTTP', 'DATABASE': 'DATABASE', 'CACHE': 'CACHE', 'QUEUE': 'QUEUE', 'EXTERNAL_API': 'HTTP', 'SERVICE': 'HTTP', 'INFRASTRUCTURE': 'HTTP', 'MODULE': 'CUSTOM', } return target_type_mapping.get(target.target_type, 'HTTP') def get_system_health_summary(self) -> Dict[str, Any]: """Get overall system health summary""" from monitoring.models import HealthCheck, MonitoringTarget # Get latest health check for each target latest_checks = HealthCheck.objects.filter( target__status='ACTIVE' ).order_by('target', '-checked_at').distinct('target') total_targets = MonitoringTarget.objects.filter(status='ACTIVE').count() healthy_targets = latest_checks.filter(status='HEALTHY').count() warning_targets = latest_checks.filter(status='WARNING').count() critical_targets = latest_checks.filter(status='CRITICAL').count() # Calculate overall status if critical_targets > 0: overall_status = 'CRITICAL' elif warning_targets > 0: overall_status = 'WARNING' elif healthy_targets == total_targets: overall_status = 'HEALTHY' else: overall_status = 'UNKNOWN' return { 'overall_status': overall_status, 'total_targets': total_targets, 'healthy_targets': healthy_targets, 'warning_targets': warning_targets, 'critical_targets': critical_targets, 'health_percentage': (healthy_targets / total_targets * 100) if total_targets > 0 else 0, 'last_updated': timezone.now() }