""" Celery configuration for ETB-API Enterprise-grade task queue with comprehensive monitoring and error handling """ import os from celery import Celery from celery.schedules import crontab from django.conf import settings # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings') app = Celery('etb_api') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django apps. app.autodiscover_tasks() # Celery Beat Schedule for Enterprise Tasks app.conf.beat_schedule = { # Health Checks - Every minute 'health-checks': { 'task': 'monitoring.tasks.execute_health_checks', 'schedule': 60.0, 'options': { 'priority': 9, 'expires': 300, } }, # Metrics Collection - Every 5 minutes 'metrics-collection': { 'task': 'monitoring.tasks.collect_metrics', 'schedule': 300.0, 'options': { 'priority': 7, 'expires': 600, } }, # Alert Evaluation - Every minute 'alert-evaluation': { 'task': 'monitoring.tasks.evaluate_alerts', 'schedule': 60.0, 'options': { 'priority': 8, 'expires': 300, } }, # Data Cleanup - Daily at 2 AM 'data-cleanup': { 'task': 'monitoring.tasks.cleanup_old_data', 'schedule': crontab(hour=2, minute=0), 'options': { 'priority': 5, 'expires': 3600, } }, # System Status Report - Every 5 minutes 'system-status-report': { 'task': 'monitoring.tasks.generate_system_status_report', 'schedule': 300.0, 'options': { 'priority': 6, 'expires': 600, } }, # Backup Tasks - Daily at 3 AM 'database-backup': { 'task': 'core.tasks.backup_database', 'schedule': crontab(hour=3, minute=0), 'options': { 'priority': 4, 'expires': 7200, } }, # AI Model Retraining - Weekly on Sunday at 4 AM 'ai-model-retraining': { 'task': 'analytics_predictive_insights.tasks.retrain_predictive_models', 'schedule': crontab(hour=4, minute=0, day_of_week=0), 'options': { 'priority': 3, 'expires': 14400, } }, # SLA Monitoring - Every 30 seconds 'sla-monitoring': { 'task': 'sla_oncall.tasks.monitor_sla_breaches', 'schedule': 30.0, 'options': { 'priority': 9, 'expires': 60, } }, # Incident Correlation - Every 2 minutes 'incident-correlation': { 'task': 'incident_intelligence.tasks.correlate_incidents', 'schedule': 120.0, 'options': { 'priority': 7, 'expires': 300, } }, # Security Audit - Every hour 'security-audit': { 'task': 'security.tasks.perform_security_audit', 'schedule': 3600.0, 'options': { 'priority': 6, 'expires': 1800, } }, # Compliance Check - Daily at 5 AM 'compliance-check': { 'task': 'compliance_governance.tasks.perform_compliance_check', 'schedule': crontab(hour=5, minute=0), 'options': { 'priority': 5, 'expires': 3600, } }, # Knowledge Base Update - Every 6 hours 'knowledge-base-update': { 'task': 'knowledge_learning.tasks.update_knowledge_base', 'schedule': 21600.0, 'options': { 'priority': 4, 'expires': 1800, } }, # Performance Optimization - Every 15 minutes 'performance-optimization': { 'task': 'monitoring.tasks.optimize_performance', 'schedule': 900.0, 'options': { 'priority': 5, 'expires': 600, } }, } # Celery Configuration app.conf.update( # Task execution settings task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, # Worker settings worker_prefetch_multiplier=1, task_acks_late=True, worker_max_tasks_per_child=1000, worker_disable_rate_limits=False, # Result backend settings result_expires=3600, result_persistent=True, # Task routing task_routes={ 'monitoring.tasks.*': {'queue': 'monitoring'}, 'security.tasks.*': {'queue': 'security'}, 'incident_intelligence.tasks.*': {'queue': 'incidents'}, 'analytics_predictive_insights.tasks.*': {'queue': 'analytics'}, 'sla_oncall.tasks.*': {'queue': 'sla'}, 'compliance_governance.tasks.*': {'queue': 'compliance'}, 'knowledge_learning.tasks.*': {'queue': 'knowledge'}, 'automation_orchestration.tasks.*': {'queue': 'automation'}, 'collaboration_war_rooms.tasks.*': {'queue': 'collaboration'}, 'core.tasks.*': {'queue': 'core'}, }, # Queue configuration task_default_queue='default', task_queues={ 'default': { 'exchange': 'default', 'routing_key': 'default', }, 'monitoring': { 'exchange': 'monitoring', 'routing_key': 'monitoring', }, 'security': { 'exchange': 'security', 'routing_key': 'security', }, 'incidents': { 'exchange': 'incidents', 'routing_key': 'incidents', }, 'analytics': { 'exchange': 'analytics', 'routing_key': 'analytics', }, 'sla': { 'exchange': 'sla', 'routing_key': 'sla', }, 'compliance': { 'exchange': 'compliance', 'routing_key': 'compliance', }, 'knowledge': { 'exchange': 'knowledge', 'routing_key': 'knowledge', }, 'automation': { 'exchange': 'automation', 'routing_key': 'automation', }, 'collaboration': { 'exchange': 'collaboration', 'routing_key': 'collaboration', }, 'core': { 'exchange': 'core', 'routing_key': 'core', }, }, # Error handling task_reject_on_worker_lost=True, task_ignore_result=False, # Monitoring worker_send_task_events=True, task_send_sent_event=True, # Retry settings task_default_retry_delay=60, task_max_retries=3, # Security worker_hijack_root_logger=False, worker_log_color=False, ) # Task error handling @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}') # Global error handler @app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) def error_handler(self, exc, task_id, args, kwargs, einfo): """Global error handler for all tasks""" import logging logger = logging.getLogger(__name__) logger.error(f'Task {task_id} failed: {exc}') logger.error(f'Args: {args}, Kwargs: {kwargs}') logger.error(f'Exception info: {einfo}') # Send alert for critical failures if hasattr(self, 'retries') and self.retries >= 3: from monitoring.tasks import send_critical_alert send_critical_alert.delay( title=f'Critical Task Failure: {self.name}', message=f'Task {task_id} failed after {self.retries} retries: {exc}', severity='CRITICAL' ) # Task monitoring @app.task(bind=True) def monitor_task_health(self): """Monitor the health of all task queues""" from celery import current_app from django.core.cache import cache # Get active tasks inspect = current_app.control.inspect() active_tasks = inspect.active() # Get scheduled tasks scheduled_tasks = inspect.scheduled() # Get registered tasks registered_tasks = inspect.registered() # Store health data in cache health_data = { 'active_tasks': active_tasks, 'scheduled_tasks': scheduled_tasks, 'registered_tasks': registered_tasks, 'timestamp': self.request.id, } cache.set('celery_health', health_data, 300) # 5 minutes return health_data