""" Comprehensive Health Check System for ETB-API Enterprise-grade health monitoring with detailed diagnostics """ import time import psutil import logging from datetime import datetime, timedelta from typing import Dict, Any, List, Optional from django.http import JsonResponse from django.db import connection from django.core.cache import cache from django.conf import settings from django.utils import timezone from django.core.exceptions import ImproperlyConfigured import redis import requests logger = logging.getLogger(__name__) class HealthCheckService: """Enterprise health check service with comprehensive diagnostics""" def __init__(self): self.checks = { 'database': self._check_database, 'cache': self._check_cache, 'celery': self._check_celery, 'redis': self._check_redis, 'disk_space': self._check_disk_space, 'memory': self._check_memory, 'cpu': self._check_cpu, 'external_services': self._check_external_services, 'modules': self._check_modules, 'security': self._check_security, } def perform_health_check(self, checks: Optional[List[str]] = None) -> Dict[str, Any]: """Perform comprehensive health check""" start_time = time.time() results = { 'status': 'healthy', 'timestamp': timezone.now().isoformat(), 'version': getattr(settings, 'VERSION', '1.0.0'), 'environment': 'production' if not settings.DEBUG else 'development', 'checks': {}, 'summary': { 'total_checks': 0, 'passed_checks': 0, 'failed_checks': 0, 'warning_checks': 0, }, 'performance': { 'response_time_ms': 0, 'memory_usage_mb': 0, 'cpu_usage_percent': 0, } } # Determine which checks to run checks_to_run = checks or list(self.checks.keys()) # Run each health check for check_name in checks_to_run: if check_name in self.checks: try: check_result = self.checks[check_name]() results['checks'][check_name] = check_result results['summary']['total_checks'] += 1 if check_result['status'] == 'healthy': results['summary']['passed_checks'] += 1 elif check_result['status'] == 'warning': results['summary']['warning_checks'] += 1 else: results['summary']['failed_checks'] += 1 results['status'] = 'unhealthy' except Exception as e: logger.error(f"Health check {check_name} failed: {str(e)}") results['checks'][check_name] = { 'status': 'unhealthy', 'message': f'Check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } results['summary']['total_checks'] += 1 results['summary']['failed_checks'] += 1 results['status'] = 'unhealthy' # Calculate performance metrics end_time = time.time() results['performance']['response_time_ms'] = round((end_time - start_time) * 1000, 2) results['performance']['memory_usage_mb'] = round(psutil.Process().memory_info().rss / 1024 / 1024, 2) results['performance']['cpu_usage_percent'] = round(psutil.cpu_percent(), 2) # Determine overall status if results['summary']['failed_checks'] > 0: results['status'] = 'unhealthy' elif results['summary']['warning_checks'] > 0: results['status'] = 'degraded' return results def _check_database(self) -> Dict[str, Any]: """Check database connectivity and performance""" try: start_time = time.time() with connection.cursor() as cursor: # Test basic connectivity cursor.execute("SELECT 1") result = cursor.fetchone() # Test database performance cursor.execute("SELECT COUNT(*) FROM django_migrations") migration_count = cursor.fetchone()[0] # Check for long-running queries cursor.execute(""" SELECT COUNT(*) FROM pg_stat_activity WHERE state = 'active' AND query_start < NOW() - INTERVAL '30 seconds' """) long_queries = cursor.fetchone()[0] response_time = (time.time() - start_time) * 1000 status = 'healthy' message = 'Database is healthy' if response_time > 1000: # 1 second status = 'warning' message = 'Database response time is slow' elif long_queries > 5: status = 'warning' message = 'Multiple long-running queries detected' return { 'status': status, 'message': message, 'response_time_ms': round(response_time, 2), 'migration_count': migration_count, 'long_running_queries': long_queries, 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Database connection failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_cache(self) -> Dict[str, Any]: """Check cache connectivity and performance""" try: start_time = time.time() # Test cache write/read test_key = 'health_check_test' test_value = f'test_{time.time()}' cache.set(test_key, test_value, 30) retrieved_value = cache.get(test_key) response_time = (time.time() - start_time) * 1000 if retrieved_value != test_value: return { 'status': 'unhealthy', 'message': 'Cache read/write test failed', 'response_time_ms': round(response_time, 2), 'timestamp': timezone.now().isoformat(), } status = 'healthy' message = 'Cache is healthy' if response_time > 100: # 100ms status = 'warning' message = 'Cache response time is slow' return { 'status': status, 'message': message, 'response_time_ms': round(response_time, 2), 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Cache connection failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_celery(self) -> Dict[str, Any]: """Check Celery worker status and queue health""" try: from celery import current_app # Get worker status inspect = current_app.control.inspect() active_workers = inspect.active() scheduled_tasks = inspect.scheduled() registered_tasks = inspect.registered() worker_count = len(active_workers) if active_workers else 0 total_active_tasks = sum(len(tasks) for tasks in (active_workers or {}).values()) total_scheduled_tasks = sum(len(tasks) for tasks in (scheduled_tasks or {}).values()) status = 'healthy' message = 'Celery workers are healthy' if worker_count == 0: status = 'unhealthy' message = 'No active Celery workers found' elif total_active_tasks > 100: status = 'warning' message = 'High number of active tasks detected' elif total_scheduled_tasks > 50: status = 'warning' message = 'High number of scheduled tasks detected' return { 'status': status, 'message': message, 'worker_count': worker_count, 'active_tasks': total_active_tasks, 'scheduled_tasks': total_scheduled_tasks, 'registered_tasks': len(registered_tasks.get(list(registered_tasks.keys())[0], [])) if registered_tasks else 0, 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Celery check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_redis(self) -> Dict[str, Any]: """Check Redis connectivity and performance""" try: start_time = time.time() # Connect to Redis redis_url = getattr(settings, 'CELERY_BROKER_URL', 'redis://localhost:6379/0') r = redis.from_url(redis_url) # Test basic operations test_key = 'health_check_redis' test_value = f'test_{time.time()}' r.set(test_key, test_value, ex=30) retrieved_value = r.get(test_key) # Get Redis info info = r.info() response_time = (time.time() - start_time) * 1000 if retrieved_value.decode() != test_value: return { 'status': 'unhealthy', 'message': 'Redis read/write test failed', 'response_time_ms': round(response_time, 2), 'timestamp': timezone.now().isoformat(), } status = 'healthy' message = 'Redis is healthy' if response_time > 50: # 50ms status = 'warning' message = 'Redis response time is slow' return { 'status': status, 'message': message, 'response_time_ms': round(response_time, 2), 'redis_version': info.get('redis_version'), 'used_memory_human': info.get('used_memory_human'), 'connected_clients': info.get('connected_clients'), 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Redis connection failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_disk_space(self) -> Dict[str, Any]: """Check disk space usage""" try: disk_usage = psutil.disk_usage('/') total_gb = disk_usage.total / (1024**3) used_gb = disk_usage.used / (1024**3) free_gb = disk_usage.free / (1024**3) usage_percent = (used_gb / total_gb) * 100 status = 'healthy' message = 'Disk space is healthy' if usage_percent > 90: status = 'unhealthy' message = 'Disk space critically low' elif usage_percent > 80: status = 'warning' message = 'Disk space usage is high' return { 'status': status, 'message': message, 'total_gb': round(total_gb, 2), 'used_gb': round(used_gb, 2), 'free_gb': round(free_gb, 2), 'usage_percent': round(usage_percent, 2), 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Disk space check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_memory(self) -> Dict[str, Any]: """Check memory usage""" try: memory = psutil.virtual_memory() total_gb = memory.total / (1024**3) used_gb = memory.used / (1024**3) available_gb = memory.available / (1024**3) usage_percent = memory.percent status = 'healthy' message = 'Memory usage is healthy' if usage_percent > 90: status = 'unhealthy' message = 'Memory usage critically high' elif usage_percent > 80: status = 'warning' message = 'Memory usage is high' return { 'status': status, 'message': message, 'total_gb': round(total_gb, 2), 'used_gb': round(used_gb, 2), 'available_gb': round(available_gb, 2), 'usage_percent': round(usage_percent, 2), 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Memory check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_cpu(self) -> Dict[str, Any]: """Check CPU usage""" try: cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() load_avg = psutil.getloadavg() if hasattr(psutil, 'getloadavg') else [0, 0, 0] status = 'healthy' message = 'CPU usage is healthy' if cpu_percent > 90: status = 'unhealthy' message = 'CPU usage critically high' elif cpu_percent > 80: status = 'warning' message = 'CPU usage is high' return { 'status': status, 'message': message, 'cpu_percent': round(cpu_percent, 2), 'cpu_count': cpu_count, 'load_avg_1min': round(load_avg[0], 2), 'load_avg_5min': round(load_avg[1], 2), 'load_avg_15min': round(load_avg[2], 2), 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'CPU check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_external_services(self) -> Dict[str, Any]: """Check external service dependencies""" try: services = { 'email': { 'url': f'smtp://{getattr(settings, "EMAIL_HOST", "localhost")}:{getattr(settings, "EMAIL_PORT", "587")}', 'timeout': 5, }, } results = {} overall_status = 'healthy' for service_name, config in services.items(): try: # This is a simplified check - in production, you'd implement actual service checks results[service_name] = { 'status': 'healthy', 'message': f'{service_name} service is accessible', 'response_time_ms': 0, } except Exception as e: results[service_name] = { 'status': 'unhealthy', 'message': f'{service_name} service check failed: {str(e)}', 'error': str(e), } overall_status = 'unhealthy' return { 'status': overall_status, 'message': 'External services check completed', 'services': results, 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'External services check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_modules(self) -> Dict[str, Any]: """Check Django modules and apps""" try: from django.apps import apps installed_apps = [] module_status = {} for app_config in apps.get_app_configs(): app_name = app_config.name installed_apps.append(app_name) try: # Check if app has models models = app_config.get_models() model_count = len(models) # Check if app has migrations from django.db import connection with connection.cursor() as cursor: cursor.execute(""" SELECT COUNT(*) FROM django_migrations WHERE app = %s """, [app_name.split('.')[-1]]) migration_count = cursor.fetchone()[0] module_status[app_name] = { 'status': 'healthy', 'model_count': model_count, 'migration_count': migration_count, } except Exception as e: module_status[app_name] = { 'status': 'warning', 'error': str(e), } return { 'status': 'healthy', 'message': 'All modules are healthy', 'installed_apps': installed_apps, 'module_status': module_status, 'total_apps': len(installed_apps), 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Module check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def _check_security(self) -> Dict[str, Any]: """Check security-related configurations""" try: security_checks = { 'debug_mode': not settings.DEBUG, 'secret_key_set': bool(settings.SECRET_KEY and settings.SECRET_KEY != 'django-insecure-'), 'https_enabled': getattr(settings, 'SECURE_SSL_REDIRECT', False), 'hsts_enabled': getattr(settings, 'SECURE_HSTS_SECONDS', 0) > 0, 'csrf_protection': True, # Django default 'session_secure': getattr(settings, 'SESSION_COOKIE_SECURE', False), } failed_checks = [check for check, passed in security_checks.items() if not passed] status = 'healthy' message = 'Security configuration is healthy' if failed_checks: status = 'warning' message = f'Security issues detected: {", ".join(failed_checks)}' return { 'status': status, 'message': message, 'security_checks': security_checks, 'failed_checks': failed_checks, 'timestamp': timezone.now().isoformat(), } except Exception as e: return { 'status': 'unhealthy', 'message': f'Security check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), } def health_check_view(request): """Django view for health check endpoint""" try: service = HealthCheckService() checks = request.GET.getlist('checks') result = service.perform_health_check(checks if checks else None) # Determine HTTP status code if result['status'] == 'healthy': status_code = 200 elif result['status'] == 'degraded': status_code = 200 # Still operational else: status_code = 503 # Service unavailable return JsonResponse(result, status=status_code) except Exception as e: logger.error(f"Health check view failed: {str(e)}") return JsonResponse({ 'status': 'unhealthy', 'message': f'Health check failed: {str(e)}', 'error': str(e), 'timestamp': timezone.now().isoformat(), }, status=503) def readiness_check_view(request): """Django view for readiness check (simplified health check)""" try: # Quick checks for readiness with connection.cursor() as cursor: cursor.execute("SELECT 1") cache.set('readiness_check', 'ok', 10) cache.get('readiness_check') return JsonResponse({ 'status': 'ready', 'timestamp': timezone.now().isoformat(), }) except Exception as e: logger.error(f"Readiness check failed: {str(e)}") return JsonResponse({ 'status': 'not_ready', 'message': str(e), 'timestamp': timezone.now().isoformat(), }, status=503) def liveness_check_view(request): """Django view for liveness check (basic application check)""" return JsonResponse({ 'status': 'alive', 'timestamp': timezone.now().isoformat(), })