320 lines
11 KiB
Python
320 lines
11 KiB
Python
"""
|
|
Celery tasks for automated monitoring
|
|
"""
|
|
import logging
|
|
from celery import shared_task
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
|
|
from monitoring.services.health_checks import HealthCheckService
|
|
from monitoring.services.metrics_collector import MetricsCollector
|
|
from monitoring.services.alerting import AlertingService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def execute_health_checks(self):
|
|
"""Execute health checks for all monitoring targets"""
|
|
try:
|
|
logger.info("Starting health check execution")
|
|
|
|
health_service = HealthCheckService()
|
|
results = health_service.execute_all_health_checks()
|
|
|
|
logger.info(f"Health checks completed. Results: {len(results)} targets checked")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'targets_checked': len(results),
|
|
'results': results
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Health check execution failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying health checks in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def collect_metrics(self):
|
|
"""Collect metrics from all configured sources"""
|
|
try:
|
|
logger.info("Starting metrics collection")
|
|
|
|
collector = MetricsCollector()
|
|
results = collector.collect_all_metrics()
|
|
|
|
successful_metrics = len([r for r in results.values() if 'error' not in r])
|
|
failed_metrics = len([r for r in results.values() if 'error' in r])
|
|
|
|
logger.info(f"Metrics collection completed. Success: {successful_metrics}, Failed: {failed_metrics}")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'successful_metrics': successful_metrics,
|
|
'failed_metrics': failed_metrics,
|
|
'results': results
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Metrics collection failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying metrics collection in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def evaluate_alerts(self):
|
|
"""Evaluate alert rules and send notifications"""
|
|
try:
|
|
logger.info("Starting alert evaluation")
|
|
|
|
alerting_service = AlertingService()
|
|
results = alerting_service.run_alert_evaluation()
|
|
|
|
logger.info(f"Alert evaluation completed. Triggered: {results['triggered_alerts']}, Notifications: {results['notifications_sent']}")
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Alert evaluation failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying alert evaluation in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def cleanup_old_data(self):
|
|
"""Clean up old monitoring data"""
|
|
try:
|
|
logger.info("Starting data cleanup")
|
|
|
|
from monitoring.models import HealthCheck, MetricMeasurement, Alert
|
|
|
|
# Clean up old health checks (keep last 7 days)
|
|
cutoff_date = timezone.now() - timedelta(days=7)
|
|
old_health_checks = HealthCheck.objects.filter(checked_at__lt=cutoff_date)
|
|
health_checks_deleted = old_health_checks.count()
|
|
old_health_checks.delete()
|
|
|
|
# Clean up old metric measurements (keep last 90 days)
|
|
cutoff_date = timezone.now() - timedelta(days=90)
|
|
old_measurements = MetricMeasurement.objects.filter(timestamp__lt=cutoff_date)
|
|
measurements_deleted = old_measurements.count()
|
|
old_measurements.delete()
|
|
|
|
# Clean up resolved alerts older than 30 days
|
|
cutoff_date = timezone.now() - timedelta(days=30)
|
|
old_alerts = Alert.objects.filter(
|
|
status='RESOLVED',
|
|
resolved_at__lt=cutoff_date
|
|
)
|
|
alerts_deleted = old_alerts.count()
|
|
old_alerts.delete()
|
|
|
|
logger.info(f"Data cleanup completed. Health checks: {health_checks_deleted}, Measurements: {measurements_deleted}, Alerts: {alerts_deleted}")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'health_checks_deleted': health_checks_deleted,
|
|
'measurements_deleted': measurements_deleted,
|
|
'alerts_deleted': alerts_deleted
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Data cleanup failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying data cleanup in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def generate_system_status_report(self):
|
|
"""Generate system status report"""
|
|
try:
|
|
logger.info("Generating system status report")
|
|
|
|
from monitoring.models import SystemStatus
|
|
from monitoring.services.health_checks import HealthCheckService
|
|
|
|
health_service = HealthCheckService()
|
|
health_summary = health_service.get_system_health_summary()
|
|
|
|
# Determine overall system status
|
|
if health_summary['critical_targets'] > 0:
|
|
status = 'MAJOR_OUTAGE'
|
|
message = f"Critical issues detected in {health_summary['critical_targets']} systems"
|
|
elif health_summary['warning_targets'] > 0:
|
|
status = 'DEGRADED'
|
|
message = f"Performance issues detected in {health_summary['warning_targets']} systems"
|
|
else:
|
|
status = 'OPERATIONAL'
|
|
message = "All systems operational"
|
|
|
|
# Create system status record
|
|
system_status = SystemStatus.objects.create(
|
|
status=status,
|
|
message=message,
|
|
affected_services=[] # Would be populated based on actual issues
|
|
)
|
|
|
|
logger.info(f"System status report generated: {status}")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'system_status': status,
|
|
'message': message,
|
|
'health_summary': health_summary
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"System status report generation failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying system status report in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def monitor_external_integrations(self):
|
|
"""Monitor external integrations and services"""
|
|
try:
|
|
logger.info("Starting external integrations monitoring")
|
|
|
|
from monitoring.models import MonitoringTarget
|
|
from monitoring.services.health_checks import HealthCheckService
|
|
|
|
health_service = HealthCheckService()
|
|
|
|
# Get external integration targets
|
|
external_targets = MonitoringTarget.objects.filter(
|
|
target_type='EXTERNAL_API',
|
|
status='ACTIVE'
|
|
)
|
|
|
|
results = {}
|
|
for target in external_targets:
|
|
try:
|
|
result = health_service.execute_health_check(target, 'HTTP')
|
|
results[target.name] = result
|
|
|
|
# Log integration status
|
|
if result['status'] == 'CRITICAL':
|
|
logger.warning(f"External integration {target.name} is critical")
|
|
elif result['status'] == 'WARNING':
|
|
logger.info(f"External integration {target.name} has warnings")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check external integration {target.name}: {e}")
|
|
results[target.name] = {'status': 'CRITICAL', 'error': str(e)}
|
|
|
|
logger.info(f"External integrations monitoring completed. Checked: {len(results)} integrations")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'integrations_checked': len(results),
|
|
'results': results
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"External integrations monitoring failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying external integrations monitoring in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def update_monitoring_dashboards(self):
|
|
"""Update monitoring dashboards with latest data"""
|
|
try:
|
|
logger.info("Updating monitoring dashboards")
|
|
|
|
from monitoring.models import MonitoringDashboard
|
|
from monitoring.services.metrics_collector import MetricsAggregator
|
|
|
|
aggregator = MetricsAggregator()
|
|
|
|
# Get active dashboards
|
|
active_dashboards = MonitoringDashboard.objects.filter(is_active=True)
|
|
|
|
updated_dashboards = 0
|
|
for dashboard in active_dashboards:
|
|
try:
|
|
# Update dashboard data (this would typically involve caching or real-time updates)
|
|
# For now, just log the update
|
|
logger.info(f"Updating dashboard: {dashboard.name}")
|
|
updated_dashboards += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update dashboard {dashboard.name}: {e}")
|
|
|
|
logger.info(f"Dashboard updates completed. Updated: {updated_dashboards} dashboards")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'dashboards_updated': updated_dashboards
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Dashboard update failed: {e}")
|
|
|
|
# Retry with exponential backoff
|
|
if self.request.retries < self.max_retries:
|
|
countdown = 2 ** self.request.retries
|
|
logger.info(f"Retrying dashboard update in {countdown} seconds")
|
|
raise self.retry(countdown=countdown)
|
|
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|