307 lines
8.3 KiB
Python
307 lines
8.3 KiB
Python
"""
|
|
Celery configuration for ETB-API
|
|
Enterprise-grade task queue with comprehensive monitoring and error handling
|
|
"""
|
|
import os
|
|
from celery import Celery
|
|
from celery.schedules import crontab
|
|
from django.conf import settings
|
|
|
|
# Set the default Django settings module for the 'celery' program.
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
|
|
|
|
app = Celery('etb_api')
|
|
|
|
# Using a string here means the worker doesn't have to serialize
|
|
# the configuration object to child processes.
|
|
app.config_from_object('django.conf:settings', namespace='CELERY')
|
|
|
|
# Load task modules from all registered Django apps.
|
|
app.autodiscover_tasks()
|
|
|
|
# Celery Beat Schedule for Enterprise Tasks
|
|
app.conf.beat_schedule = {
|
|
# Health Checks - Every minute
|
|
'health-checks': {
|
|
'task': 'monitoring.tasks.execute_health_checks',
|
|
'schedule': 60.0,
|
|
'options': {
|
|
'priority': 9,
|
|
'expires': 300,
|
|
}
|
|
},
|
|
|
|
# Metrics Collection - Every 5 minutes
|
|
'metrics-collection': {
|
|
'task': 'monitoring.tasks.collect_metrics',
|
|
'schedule': 300.0,
|
|
'options': {
|
|
'priority': 7,
|
|
'expires': 600,
|
|
}
|
|
},
|
|
|
|
# Alert Evaluation - Every minute
|
|
'alert-evaluation': {
|
|
'task': 'monitoring.tasks.evaluate_alerts',
|
|
'schedule': 60.0,
|
|
'options': {
|
|
'priority': 8,
|
|
'expires': 300,
|
|
}
|
|
},
|
|
|
|
# Data Cleanup - Daily at 2 AM
|
|
'data-cleanup': {
|
|
'task': 'monitoring.tasks.cleanup_old_data',
|
|
'schedule': crontab(hour=2, minute=0),
|
|
'options': {
|
|
'priority': 5,
|
|
'expires': 3600,
|
|
}
|
|
},
|
|
|
|
# System Status Report - Every 5 minutes
|
|
'system-status-report': {
|
|
'task': 'monitoring.tasks.generate_system_status_report',
|
|
'schedule': 300.0,
|
|
'options': {
|
|
'priority': 6,
|
|
'expires': 600,
|
|
}
|
|
},
|
|
|
|
# Backup Tasks - Daily at 3 AM
|
|
'database-backup': {
|
|
'task': 'core.tasks.backup_database',
|
|
'schedule': crontab(hour=3, minute=0),
|
|
'options': {
|
|
'priority': 4,
|
|
'expires': 7200,
|
|
}
|
|
},
|
|
|
|
# AI Model Retraining - Weekly on Sunday at 4 AM
|
|
'ai-model-retraining': {
|
|
'task': 'analytics_predictive_insights.tasks.retrain_predictive_models',
|
|
'schedule': crontab(hour=4, minute=0, day_of_week=0),
|
|
'options': {
|
|
'priority': 3,
|
|
'expires': 14400,
|
|
}
|
|
},
|
|
|
|
# SLA Monitoring - Every 30 seconds
|
|
'sla-monitoring': {
|
|
'task': 'sla_oncall.tasks.monitor_sla_breaches',
|
|
'schedule': 30.0,
|
|
'options': {
|
|
'priority': 9,
|
|
'expires': 60,
|
|
}
|
|
},
|
|
|
|
# Incident Correlation - Every 2 minutes
|
|
'incident-correlation': {
|
|
'task': 'incident_intelligence.tasks.correlate_incidents',
|
|
'schedule': 120.0,
|
|
'options': {
|
|
'priority': 7,
|
|
'expires': 300,
|
|
}
|
|
},
|
|
|
|
# Security Audit - Every hour
|
|
'security-audit': {
|
|
'task': 'security.tasks.perform_security_audit',
|
|
'schedule': 3600.0,
|
|
'options': {
|
|
'priority': 6,
|
|
'expires': 1800,
|
|
}
|
|
},
|
|
|
|
# Compliance Check - Daily at 5 AM
|
|
'compliance-check': {
|
|
'task': 'compliance_governance.tasks.perform_compliance_check',
|
|
'schedule': crontab(hour=5, minute=0),
|
|
'options': {
|
|
'priority': 5,
|
|
'expires': 3600,
|
|
}
|
|
},
|
|
|
|
# Knowledge Base Update - Every 6 hours
|
|
'knowledge-base-update': {
|
|
'task': 'knowledge_learning.tasks.update_knowledge_base',
|
|
'schedule': 21600.0,
|
|
'options': {
|
|
'priority': 4,
|
|
'expires': 1800,
|
|
}
|
|
},
|
|
|
|
# Performance Optimization - Every 15 minutes
|
|
'performance-optimization': {
|
|
'task': 'monitoring.tasks.optimize_performance',
|
|
'schedule': 900.0,
|
|
'options': {
|
|
'priority': 5,
|
|
'expires': 600,
|
|
}
|
|
},
|
|
}
|
|
|
|
# Celery Configuration
|
|
app.conf.update(
|
|
# Task execution settings
|
|
task_serializer='json',
|
|
accept_content=['json'],
|
|
result_serializer='json',
|
|
timezone='UTC',
|
|
enable_utc=True,
|
|
|
|
# Worker settings
|
|
worker_prefetch_multiplier=1,
|
|
task_acks_late=True,
|
|
worker_max_tasks_per_child=1000,
|
|
worker_disable_rate_limits=False,
|
|
|
|
# Result backend settings
|
|
result_expires=3600,
|
|
result_persistent=True,
|
|
|
|
# Task routing
|
|
task_routes={
|
|
'monitoring.tasks.*': {'queue': 'monitoring'},
|
|
'security.tasks.*': {'queue': 'security'},
|
|
'incident_intelligence.tasks.*': {'queue': 'incidents'},
|
|
'analytics_predictive_insights.tasks.*': {'queue': 'analytics'},
|
|
'sla_oncall.tasks.*': {'queue': 'sla'},
|
|
'compliance_governance.tasks.*': {'queue': 'compliance'},
|
|
'knowledge_learning.tasks.*': {'queue': 'knowledge'},
|
|
'automation_orchestration.tasks.*': {'queue': 'automation'},
|
|
'collaboration_war_rooms.tasks.*': {'queue': 'collaboration'},
|
|
'core.tasks.*': {'queue': 'core'},
|
|
},
|
|
|
|
# Queue configuration
|
|
task_default_queue='default',
|
|
task_queues={
|
|
'default': {
|
|
'exchange': 'default',
|
|
'routing_key': 'default',
|
|
},
|
|
'monitoring': {
|
|
'exchange': 'monitoring',
|
|
'routing_key': 'monitoring',
|
|
},
|
|
'security': {
|
|
'exchange': 'security',
|
|
'routing_key': 'security',
|
|
},
|
|
'incidents': {
|
|
'exchange': 'incidents',
|
|
'routing_key': 'incidents',
|
|
},
|
|
'analytics': {
|
|
'exchange': 'analytics',
|
|
'routing_key': 'analytics',
|
|
},
|
|
'sla': {
|
|
'exchange': 'sla',
|
|
'routing_key': 'sla',
|
|
},
|
|
'compliance': {
|
|
'exchange': 'compliance',
|
|
'routing_key': 'compliance',
|
|
},
|
|
'knowledge': {
|
|
'exchange': 'knowledge',
|
|
'routing_key': 'knowledge',
|
|
},
|
|
'automation': {
|
|
'exchange': 'automation',
|
|
'routing_key': 'automation',
|
|
},
|
|
'collaboration': {
|
|
'exchange': 'collaboration',
|
|
'routing_key': 'collaboration',
|
|
},
|
|
'core': {
|
|
'exchange': 'core',
|
|
'routing_key': 'core',
|
|
},
|
|
},
|
|
|
|
# Error handling
|
|
task_reject_on_worker_lost=True,
|
|
task_ignore_result=False,
|
|
|
|
# Monitoring
|
|
worker_send_task_events=True,
|
|
task_send_sent_event=True,
|
|
|
|
# Retry settings
|
|
task_default_retry_delay=60,
|
|
task_max_retries=3,
|
|
|
|
# Security
|
|
worker_hijack_root_logger=False,
|
|
worker_log_color=False,
|
|
)
|
|
|
|
# Task error handling
|
|
@app.task(bind=True)
|
|
def debug_task(self):
|
|
print(f'Request: {self.request!r}')
|
|
|
|
# Global error handler
|
|
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
|
|
def error_handler(self, exc, task_id, args, kwargs, einfo):
|
|
"""Global error handler for all tasks"""
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logger.error(f'Task {task_id} failed: {exc}')
|
|
logger.error(f'Args: {args}, Kwargs: {kwargs}')
|
|
logger.error(f'Exception info: {einfo}')
|
|
|
|
# Send alert for critical failures
|
|
if hasattr(self, 'retries') and self.retries >= 3:
|
|
from monitoring.tasks import send_critical_alert
|
|
send_critical_alert.delay(
|
|
title=f'Critical Task Failure: {self.name}',
|
|
message=f'Task {task_id} failed after {self.retries} retries: {exc}',
|
|
severity='CRITICAL'
|
|
)
|
|
|
|
# Task monitoring
|
|
@app.task(bind=True)
|
|
def monitor_task_health(self):
|
|
"""Monitor the health of all task queues"""
|
|
from celery import current_app
|
|
from django.core.cache import cache
|
|
|
|
# Get active tasks
|
|
inspect = current_app.control.inspect()
|
|
active_tasks = inspect.active()
|
|
|
|
# Get scheduled tasks
|
|
scheduled_tasks = inspect.scheduled()
|
|
|
|
# Get registered tasks
|
|
registered_tasks = inspect.registered()
|
|
|
|
# Store health data in cache
|
|
health_data = {
|
|
'active_tasks': active_tasks,
|
|
'scheduled_tasks': scheduled_tasks,
|
|
'registered_tasks': registered_tasks,
|
|
'timestamp': self.request.id,
|
|
}
|
|
|
|
cache.set('celery_health', health_data, 300) # 5 minutes
|
|
|
|
return health_data
|