373 lines
13 KiB
Python
373 lines
13 KiB
Python
"""
|
|
Health check services for monitoring system components
|
|
"""
|
|
import time
|
|
import requests
|
|
import psutil
|
|
import logging
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from django.conf import settings
|
|
from django.db import connection
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
from celery import current_app as celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BaseHealthCheck:
|
|
"""Base class for health checks"""
|
|
|
|
def __init__(self, target):
|
|
self.target = target
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def execute(self) -> Dict[str, Any]:
|
|
"""Execute the health check and return results"""
|
|
self.start_time = time.time()
|
|
try:
|
|
result = self._perform_check()
|
|
self.end_time = time.time()
|
|
|
|
result.update({
|
|
'response_time_ms': int((self.end_time - self.start_time) * 1000),
|
|
'checked_at': timezone.now(),
|
|
'error_message': None
|
|
})
|
|
|
|
return result
|
|
except Exception as e:
|
|
self.end_time = time.time()
|
|
logger.error(f"Health check failed for {self.target.name}: {e}")
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'response_time_ms': int((self.end_time - self.start_time) * 1000),
|
|
'checked_at': timezone.now(),
|
|
'error_message': str(e)
|
|
}
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
"""Override in subclasses to implement specific checks"""
|
|
raise NotImplementedError
|
|
|
|
|
|
class HTTPHealthCheck(BaseHealthCheck):
|
|
"""HTTP-based health check"""
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
url = self.target.endpoint_url
|
|
if not url:
|
|
raise ValueError("No endpoint URL configured")
|
|
|
|
timeout = self.target.timeout_seconds
|
|
expected_codes = self.target.expected_status_codes or [200]
|
|
|
|
response = requests.get(url, timeout=timeout)
|
|
|
|
if response.status_code in expected_codes:
|
|
status = 'HEALTHY'
|
|
elif response.status_code >= 500:
|
|
status = 'CRITICAL'
|
|
else:
|
|
status = 'WARNING'
|
|
|
|
return {
|
|
'status': status,
|
|
'status_code': response.status_code,
|
|
'response_body': response.text[:1000] # Limit response body size
|
|
}
|
|
|
|
|
|
class DatabaseHealthCheck(BaseHealthCheck):
|
|
"""Database connection health check"""
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT 1")
|
|
result = cursor.fetchone()
|
|
|
|
if result and result[0] == 1:
|
|
return {
|
|
'status': 'HEALTHY',
|
|
'status_code': 200
|
|
}
|
|
else:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': 'Database query returned unexpected result'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': f'Database connection failed: {str(e)}'
|
|
}
|
|
|
|
|
|
class CacheHealthCheck(BaseHealthCheck):
|
|
"""Cache system health check"""
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
try:
|
|
# Test cache write/read
|
|
test_key = f"health_check_{int(time.time())}"
|
|
test_value = "health_check_value"
|
|
|
|
cache.set(test_key, test_value, timeout=10)
|
|
retrieved_value = cache.get(test_key)
|
|
|
|
if retrieved_value == test_value:
|
|
cache.delete(test_key) # Clean up
|
|
return {
|
|
'status': 'HEALTHY',
|
|
'status_code': 200
|
|
}
|
|
else:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': 'Cache read/write test failed'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': f'Cache operation failed: {str(e)}'
|
|
}
|
|
|
|
|
|
class CeleryHealthCheck(BaseHealthCheck):
|
|
"""Celery worker health check"""
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
try:
|
|
# Check if Celery workers are active
|
|
inspect = celery_app.control.inspect()
|
|
active_workers = inspect.active()
|
|
|
|
if active_workers:
|
|
worker_count = len(active_workers)
|
|
return {
|
|
'status': 'HEALTHY',
|
|
'status_code': 200,
|
|
'response_body': f'Active workers: {worker_count}'
|
|
}
|
|
else:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': 'No active Celery workers found'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': f'Celery health check failed: {str(e)}'
|
|
}
|
|
|
|
|
|
class SystemResourceHealthCheck(BaseHealthCheck):
|
|
"""System resource health check"""
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
try:
|
|
# Get system metrics
|
|
cpu_percent = psutil.cpu_percent(interval=1)
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
# Determine status based on thresholds
|
|
status = 'HEALTHY'
|
|
if cpu_percent > 90 or memory.percent > 90 or disk.percent > 90:
|
|
status = 'CRITICAL'
|
|
elif cpu_percent > 80 or memory.percent > 80 or disk.percent > 80:
|
|
status = 'WARNING'
|
|
|
|
return {
|
|
'status': status,
|
|
'status_code': 200,
|
|
'cpu_usage_percent': cpu_percent,
|
|
'memory_usage_percent': memory.percent,
|
|
'disk_usage_percent': disk.percent,
|
|
'response_body': f'CPU: {cpu_percent}%, Memory: {memory.percent}%, Disk: {disk.percent}%'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': f'System resource check failed: {str(e)}'
|
|
}
|
|
|
|
|
|
class ModuleHealthCheck(BaseHealthCheck):
|
|
"""Django module health check"""
|
|
|
|
def _perform_check(self) -> Dict[str, Any]:
|
|
try:
|
|
module_name = self.target.related_module
|
|
if not module_name:
|
|
raise ValueError("No module specified for module health check")
|
|
|
|
# Import the module to check if it's accessible
|
|
__import__(module_name)
|
|
|
|
# Check if module has required models/views
|
|
from django.apps import apps
|
|
app_config = apps.get_app_config(module_name)
|
|
|
|
if app_config:
|
|
return {
|
|
'status': 'HEALTHY',
|
|
'status_code': 200,
|
|
'response_body': f'Module {module_name} is accessible'
|
|
}
|
|
else:
|
|
return {
|
|
'status': 'WARNING',
|
|
'status_code': 200,
|
|
'error_message': f'Module {module_name} not found in Django apps'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': 'CRITICAL',
|
|
'status_code': 500,
|
|
'error_message': f'Module health check failed: {str(e)}'
|
|
}
|
|
|
|
|
|
class HealthCheckFactory:
|
|
"""Factory for creating health check instances"""
|
|
|
|
CHECK_CLASSES = {
|
|
'HTTP': HTTPHealthCheck,
|
|
'DATABASE': DatabaseHealthCheck,
|
|
'CACHE': CacheHealthCheck,
|
|
'QUEUE': CeleryHealthCheck,
|
|
'CUSTOM': BaseHealthCheck,
|
|
'PING': HTTPHealthCheck, # Use HTTP for ping
|
|
'SSL': HTTPHealthCheck, # Use HTTP for SSL
|
|
}
|
|
|
|
@classmethod
|
|
def create_health_check(cls, target, check_type: str) -> BaseHealthCheck:
|
|
"""Create a health check instance based on type"""
|
|
check_class = cls.CHECK_CLASSES.get(check_type, BaseHealthCheck)
|
|
return check_class(target)
|
|
|
|
@classmethod
|
|
def get_available_check_types(cls) -> list:
|
|
"""Get list of available health check types"""
|
|
return list(cls.CHECK_CLASSES.keys())
|
|
|
|
|
|
class HealthCheckService:
|
|
"""Service for managing health checks"""
|
|
|
|
def __init__(self):
|
|
self.factory = HealthCheckFactory()
|
|
|
|
def execute_health_check(self, target, check_type: str) -> Dict[str, Any]:
|
|
"""Execute a health check for a target"""
|
|
health_check = self.factory.create_health_check(target, check_type)
|
|
return health_check.execute()
|
|
|
|
def execute_all_health_checks(self) -> Dict[str, Any]:
|
|
"""Execute health checks for all active targets"""
|
|
from monitoring.models import MonitoringTarget, HealthCheck
|
|
|
|
results = {}
|
|
active_targets = MonitoringTarget.objects.filter(
|
|
status='ACTIVE',
|
|
health_check_enabled=True
|
|
)
|
|
|
|
for target in active_targets:
|
|
try:
|
|
# Determine check type based on target type
|
|
check_type = self._get_check_type_for_target(target)
|
|
|
|
# Execute health check
|
|
result = self.execute_health_check(target, check_type)
|
|
|
|
# Save result to database
|
|
HealthCheck.objects.create(
|
|
target=target,
|
|
check_type=check_type,
|
|
status=result['status'],
|
|
response_time_ms=result.get('response_time_ms'),
|
|
status_code=result.get('status_code'),
|
|
response_body=result.get('response_body'),
|
|
error_message=result.get('error_message'),
|
|
cpu_usage_percent=result.get('cpu_usage_percent'),
|
|
memory_usage_percent=result.get('memory_usage_percent'),
|
|
disk_usage_percent=result.get('disk_usage_percent')
|
|
)
|
|
|
|
# Update target status
|
|
target.last_checked = timezone.now()
|
|
target.last_status = result['status']
|
|
target.save(update_fields=['last_checked', 'last_status'])
|
|
|
|
results[target.name] = result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to execute health check for {target.name}: {e}")
|
|
results[target.name] = {
|
|
'status': 'CRITICAL',
|
|
'error_message': str(e)
|
|
}
|
|
|
|
return results
|
|
|
|
def _get_check_type_for_target(self, target) -> str:
|
|
"""Determine the appropriate check type for a target"""
|
|
target_type_mapping = {
|
|
'APPLICATION': 'HTTP',
|
|
'DATABASE': 'DATABASE',
|
|
'CACHE': 'CACHE',
|
|
'QUEUE': 'QUEUE',
|
|
'EXTERNAL_API': 'HTTP',
|
|
'SERVICE': 'HTTP',
|
|
'INFRASTRUCTURE': 'HTTP',
|
|
'MODULE': 'CUSTOM',
|
|
}
|
|
|
|
return target_type_mapping.get(target.target_type, 'HTTP')
|
|
|
|
def get_system_health_summary(self) -> Dict[str, Any]:
|
|
"""Get overall system health summary"""
|
|
from monitoring.models import HealthCheck, MonitoringTarget
|
|
|
|
# Get latest health check for each target
|
|
latest_checks = HealthCheck.objects.filter(
|
|
target__status='ACTIVE'
|
|
).order_by('target', '-checked_at').distinct('target')
|
|
|
|
total_targets = MonitoringTarget.objects.filter(status='ACTIVE').count()
|
|
healthy_targets = latest_checks.filter(status='HEALTHY').count()
|
|
warning_targets = latest_checks.filter(status='WARNING').count()
|
|
critical_targets = latest_checks.filter(status='CRITICAL').count()
|
|
|
|
# Calculate overall status
|
|
if critical_targets > 0:
|
|
overall_status = 'CRITICAL'
|
|
elif warning_targets > 0:
|
|
overall_status = 'WARNING'
|
|
elif healthy_targets == total_targets:
|
|
overall_status = 'HEALTHY'
|
|
else:
|
|
overall_status = 'UNKNOWN'
|
|
|
|
return {
|
|
'overall_status': overall_status,
|
|
'total_targets': total_targets,
|
|
'healthy_targets': healthy_targets,
|
|
'warning_targets': warning_targets,
|
|
'critical_targets': critical_targets,
|
|
'health_percentage': (healthy_targets / total_targets * 100) if total_targets > 0 else 0,
|
|
'last_updated': timezone.now()
|
|
}
|