Files
ETB/ETB-API/monitoring/services/health_checks.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

373 lines
13 KiB
Python

"""
Health check services for monitoring system components
"""
import time
import requests
import psutil
import logging
from typing import Dict, Any, Optional, Tuple
from django.conf import settings
from django.db import connection
from django.core.cache import cache
from django.utils import timezone
from celery import current_app as celery_app
logger = logging.getLogger(__name__)
class BaseHealthCheck:
"""Base class for health checks"""
def __init__(self, target):
self.target = target
self.start_time = None
self.end_time = None
def execute(self) -> Dict[str, Any]:
"""Execute the health check and return results"""
self.start_time = time.time()
try:
result = self._perform_check()
self.end_time = time.time()
result.update({
'response_time_ms': int((self.end_time - self.start_time) * 1000),
'checked_at': timezone.now(),
'error_message': None
})
return result
except Exception as e:
self.end_time = time.time()
logger.error(f"Health check failed for {self.target.name}: {e}")
return {
'status': 'CRITICAL',
'response_time_ms': int((self.end_time - self.start_time) * 1000),
'checked_at': timezone.now(),
'error_message': str(e)
}
def _perform_check(self) -> Dict[str, Any]:
"""Override in subclasses to implement specific checks"""
raise NotImplementedError
class HTTPHealthCheck(BaseHealthCheck):
"""HTTP-based health check"""
def _perform_check(self) -> Dict[str, Any]:
url = self.target.endpoint_url
if not url:
raise ValueError("No endpoint URL configured")
timeout = self.target.timeout_seconds
expected_codes = self.target.expected_status_codes or [200]
response = requests.get(url, timeout=timeout)
if response.status_code in expected_codes:
status = 'HEALTHY'
elif response.status_code >= 500:
status = 'CRITICAL'
else:
status = 'WARNING'
return {
'status': status,
'status_code': response.status_code,
'response_body': response.text[:1000] # Limit response body size
}
class DatabaseHealthCheck(BaseHealthCheck):
"""Database connection health check"""
def _perform_check(self) -> Dict[str, Any]:
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
if result and result[0] == 1:
return {
'status': 'HEALTHY',
'status_code': 200
}
else:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': 'Database query returned unexpected result'
}
except Exception as e:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': f'Database connection failed: {str(e)}'
}
class CacheHealthCheck(BaseHealthCheck):
"""Cache system health check"""
def _perform_check(self) -> Dict[str, Any]:
try:
# Test cache write/read
test_key = f"health_check_{int(time.time())}"
test_value = "health_check_value"
cache.set(test_key, test_value, timeout=10)
retrieved_value = cache.get(test_key)
if retrieved_value == test_value:
cache.delete(test_key) # Clean up
return {
'status': 'HEALTHY',
'status_code': 200
}
else:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': 'Cache read/write test failed'
}
except Exception as e:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': f'Cache operation failed: {str(e)}'
}
class CeleryHealthCheck(BaseHealthCheck):
"""Celery worker health check"""
def _perform_check(self) -> Dict[str, Any]:
try:
# Check if Celery workers are active
inspect = celery_app.control.inspect()
active_workers = inspect.active()
if active_workers:
worker_count = len(active_workers)
return {
'status': 'HEALTHY',
'status_code': 200,
'response_body': f'Active workers: {worker_count}'
}
else:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': 'No active Celery workers found'
}
except Exception as e:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': f'Celery health check failed: {str(e)}'
}
class SystemResourceHealthCheck(BaseHealthCheck):
"""System resource health check"""
def _perform_check(self) -> Dict[str, Any]:
try:
# Get system metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Determine status based on thresholds
status = 'HEALTHY'
if cpu_percent > 90 or memory.percent > 90 or disk.percent > 90:
status = 'CRITICAL'
elif cpu_percent > 80 or memory.percent > 80 or disk.percent > 80:
status = 'WARNING'
return {
'status': status,
'status_code': 200,
'cpu_usage_percent': cpu_percent,
'memory_usage_percent': memory.percent,
'disk_usage_percent': disk.percent,
'response_body': f'CPU: {cpu_percent}%, Memory: {memory.percent}%, Disk: {disk.percent}%'
}
except Exception as e:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': f'System resource check failed: {str(e)}'
}
class ModuleHealthCheck(BaseHealthCheck):
"""Django module health check"""
def _perform_check(self) -> Dict[str, Any]:
try:
module_name = self.target.related_module
if not module_name:
raise ValueError("No module specified for module health check")
# Import the module to check if it's accessible
__import__(module_name)
# Check if module has required models/views
from django.apps import apps
app_config = apps.get_app_config(module_name)
if app_config:
return {
'status': 'HEALTHY',
'status_code': 200,
'response_body': f'Module {module_name} is accessible'
}
else:
return {
'status': 'WARNING',
'status_code': 200,
'error_message': f'Module {module_name} not found in Django apps'
}
except Exception as e:
return {
'status': 'CRITICAL',
'status_code': 500,
'error_message': f'Module health check failed: {str(e)}'
}
class HealthCheckFactory:
"""Factory for creating health check instances"""
CHECK_CLASSES = {
'HTTP': HTTPHealthCheck,
'DATABASE': DatabaseHealthCheck,
'CACHE': CacheHealthCheck,
'QUEUE': CeleryHealthCheck,
'CUSTOM': BaseHealthCheck,
'PING': HTTPHealthCheck, # Use HTTP for ping
'SSL': HTTPHealthCheck, # Use HTTP for SSL
}
@classmethod
def create_health_check(cls, target, check_type: str) -> BaseHealthCheck:
"""Create a health check instance based on type"""
check_class = cls.CHECK_CLASSES.get(check_type, BaseHealthCheck)
return check_class(target)
@classmethod
def get_available_check_types(cls) -> list:
"""Get list of available health check types"""
return list(cls.CHECK_CLASSES.keys())
class HealthCheckService:
"""Service for managing health checks"""
def __init__(self):
self.factory = HealthCheckFactory()
def execute_health_check(self, target, check_type: str) -> Dict[str, Any]:
"""Execute a health check for a target"""
health_check = self.factory.create_health_check(target, check_type)
return health_check.execute()
def execute_all_health_checks(self) -> Dict[str, Any]:
"""Execute health checks for all active targets"""
from monitoring.models import MonitoringTarget, HealthCheck
results = {}
active_targets = MonitoringTarget.objects.filter(
status='ACTIVE',
health_check_enabled=True
)
for target in active_targets:
try:
# Determine check type based on target type
check_type = self._get_check_type_for_target(target)
# Execute health check
result = self.execute_health_check(target, check_type)
# Save result to database
HealthCheck.objects.create(
target=target,
check_type=check_type,
status=result['status'],
response_time_ms=result.get('response_time_ms'),
status_code=result.get('status_code'),
response_body=result.get('response_body'),
error_message=result.get('error_message'),
cpu_usage_percent=result.get('cpu_usage_percent'),
memory_usage_percent=result.get('memory_usage_percent'),
disk_usage_percent=result.get('disk_usage_percent')
)
# Update target status
target.last_checked = timezone.now()
target.last_status = result['status']
target.save(update_fields=['last_checked', 'last_status'])
results[target.name] = result
except Exception as e:
logger.error(f"Failed to execute health check for {target.name}: {e}")
results[target.name] = {
'status': 'CRITICAL',
'error_message': str(e)
}
return results
def _get_check_type_for_target(self, target) -> str:
"""Determine the appropriate check type for a target"""
target_type_mapping = {
'APPLICATION': 'HTTP',
'DATABASE': 'DATABASE',
'CACHE': 'CACHE',
'QUEUE': 'QUEUE',
'EXTERNAL_API': 'HTTP',
'SERVICE': 'HTTP',
'INFRASTRUCTURE': 'HTTP',
'MODULE': 'CUSTOM',
}
return target_type_mapping.get(target.target_type, 'HTTP')
def get_system_health_summary(self) -> Dict[str, Any]:
"""Get overall system health summary"""
from monitoring.models import HealthCheck, MonitoringTarget
# Get latest health check for each target
latest_checks = HealthCheck.objects.filter(
target__status='ACTIVE'
).order_by('target', '-checked_at').distinct('target')
total_targets = MonitoringTarget.objects.filter(status='ACTIVE').count()
healthy_targets = latest_checks.filter(status='HEALTHY').count()
warning_targets = latest_checks.filter(status='WARNING').count()
critical_targets = latest_checks.filter(status='CRITICAL').count()
# Calculate overall status
if critical_targets > 0:
overall_status = 'CRITICAL'
elif warning_targets > 0:
overall_status = 'WARNING'
elif healthy_targets == total_targets:
overall_status = 'HEALTHY'
else:
overall_status = 'UNKNOWN'
return {
'overall_status': overall_status,
'total_targets': total_targets,
'healthy_targets': healthy_targets,
'warning_targets': warning_targets,
'critical_targets': critical_targets,
'health_percentage': (healthy_targets / total_targets * 100) if total_targets > 0 else 0,
'last_updated': timezone.now()
}