""" Alerting service for monitoring system """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, timedelta from django.utils import timezone from django.core.mail import send_mail from django.conf import settings from django.contrib.auth import get_user_model from monitoring.models import AlertRule, Alert, SystemMetric, MetricMeasurement, MonitoringTarget User = get_user_model() logger = logging.getLogger(__name__) class AlertEvaluator: """Service for evaluating alert conditions""" def __init__(self): self.aggregator = None # Will be imported to avoid circular imports def evaluate_alert_rules(self) -> List[Dict[str, Any]]: """Evaluate all active alert rules""" triggered_alerts = [] active_rules = AlertRule.objects.filter( status='ACTIVE', is_enabled=True ) for rule in active_rules: try: if self._evaluate_rule(rule): alert_data = self._create_alert(rule) triggered_alerts.append(alert_data) except Exception as e: logger.error(f"Failed to evaluate alert rule {rule.name}: {e}") return triggered_alerts def _evaluate_rule(self, rule: AlertRule) -> bool: """Evaluate if an alert rule condition is met""" condition = rule.condition condition_type = condition.get('type') if condition_type == 'THRESHOLD': return self._evaluate_threshold_condition(rule, condition) elif condition_type == 'ANOMALY': return self._evaluate_anomaly_condition(rule, condition) elif condition_type == 'AVAILABILITY': return self._evaluate_availability_condition(rule, condition) elif condition_type == 'PATTERN': return self._evaluate_pattern_condition(rule, condition) else: logger.warning(f"Unknown condition type: {condition_type}") return False def _evaluate_threshold_condition(self, rule: AlertRule, condition: Dict[str, Any]) -> bool: """Evaluate threshold-based alert conditions""" if not rule.metric: return False # Get latest metric value latest_measurement = MetricMeasurement.objects.filter( metric=rule.metric ).order_by('-timestamp').first() if not latest_measurement: return False current_value = float(latest_measurement.value) threshold_value = condition.get('threshold') operator = condition.get('operator', '>') if operator == '>': return current_value > threshold_value elif operator == '>=': return current_value >= threshold_value elif operator == '<': return current_value < threshold_value elif operator == '<=': return current_value <= threshold_value elif operator == '==': return current_value == threshold_value elif operator == '!=': return current_value != threshold_value else: logger.warning(f"Unknown operator: {operator}") return False def _evaluate_anomaly_condition(self, rule: AlertRule, condition: Dict[str, Any]) -> bool: """Evaluate anomaly-based alert conditions""" # This would integrate with anomaly detection models # For now, implement a simple statistical anomaly detection if not rule.metric: return False # Get recent measurements since = timezone.now() - timedelta(hours=24) measurements = MetricMeasurement.objects.filter( metric=rule.metric, timestamp__gte=since ).order_by('-timestamp')[:100] # Last 100 measurements if len(measurements) < 10: # Need minimum data points return False values = [float(m.value) for m in measurements] # Calculate mean and standard deviation mean = sum(values) / len(values) variance = sum((x - mean) ** 2 for x in values) / len(values) std_dev = variance ** 0.5 # Check if latest value is an anomaly (more than 2 standard deviations) latest_value = values[0] anomaly_threshold = condition.get('threshold', 2.0) # Default 2 sigma return abs(latest_value - mean) > (anomaly_threshold * std_dev) def _evaluate_availability_condition(self, rule: AlertRule, condition: Dict[str, Any]) -> bool: """Evaluate availability-based alert conditions""" if not rule.target: return False # Check if target is in critical state return rule.target.last_status == 'CRITICAL' def _evaluate_pattern_condition(self, rule: AlertRule, condition: Dict[str, Any]) -> bool: """Evaluate pattern-based alert conditions""" # This would integrate with pattern detection algorithms # For now, return False as placeholder return False def _create_alert(self, rule: AlertRule) -> Dict[str, Any]: """Create an alert instance""" # Get current value for context current_value = None threshold_value = None if rule.metric: latest_measurement = MetricMeasurement.objects.filter( metric=rule.metric ).order_by('-timestamp').first() if latest_measurement: current_value = float(latest_measurement.value) threshold_value = rule.metric.critical_threshold # Create alert alert = Alert.objects.create( rule=rule, title=f"{rule.name} - {rule.severity}", description=self._generate_alert_description(rule, current_value, threshold_value), severity=rule.severity, triggered_value=current_value, threshold_value=threshold_value, context_data={ 'rule_id': str(rule.id), 'metric_name': rule.metric.name if rule.metric else None, 'target_name': rule.target.name if rule.target else None, 'condition': rule.condition } ) return { 'alert_id': str(alert.id), 'rule_name': rule.name, 'severity': rule.severity, 'title': alert.title, 'description': alert.description, 'current_value': current_value, 'threshold_value': threshold_value } def _generate_alert_description(self, rule: AlertRule, current_value: Optional[float], threshold_value: Optional[float]) -> str: """Generate alert description""" description = f"Alert rule '{rule.name}' has been triggered.\n" if rule.metric and current_value is not None: description += f"Current value: {current_value} {rule.metric.unit}\n" if threshold_value is not None: description += f"Threshold: {threshold_value} {rule.metric.unit if rule.metric else ''}\n" if rule.target: description += f"Target: {rule.target.name}\n" description += f"Severity: {rule.severity}\n" description += f"Time: {timezone.now().strftime('%Y-%m-%d %H:%M:%S')}" return description class NotificationService: """Service for sending alert notifications""" def __init__(self): self.evaluator = AlertEvaluator() def send_alert_notifications(self, alert_data: Dict[str, Any]) -> Dict[str, Any]: """Send notifications for an alert""" results = {} # Get alert rule to determine notification channels rule_id = alert_data.get('rule_id') if not rule_id: return {'error': 'No rule ID provided'} try: rule = AlertRule.objects.get(id=rule_id) except AlertRule.DoesNotExist: return {'error': 'Alert rule not found'} notification_channels = rule.notification_channels or [] for channel in notification_channels: try: if channel['type'] == 'EMAIL': result = self._send_email_notification(alert_data, channel) elif channel['type'] == 'SLACK': result = self._send_slack_notification(alert_data, channel) elif channel['type'] == 'WEBHOOK': result = self._send_webhook_notification(alert_data, channel) else: result = {'error': f'Unknown notification channel type: {channel["type"]}'} results[channel['type']] = result except Exception as e: logger.error(f"Failed to send {channel['type']} notification: {e}") results[channel['type']] = {'error': str(e)} return results def _send_email_notification(self, alert_data: Dict[str, Any], channel: Dict[str, Any]) -> Dict[str, Any]: """Send email notification""" try: recipients = channel.get('recipients', []) if not recipients: return {'error': 'No email recipients configured'} subject = f"[{alert_data.get('severity', 'ALERT')}] {alert_data.get('title', 'System Alert')}" message = alert_data.get('description', '') send_mail( subject=subject, message=message, from_email=settings.DEFAULT_FROM_EMAIL, recipient_list=recipients, fail_silently=False ) return {'status': 'sent', 'recipients': recipients} except Exception as e: return {'error': str(e)} def _send_slack_notification(self, alert_data: Dict[str, Any], channel: Dict[str, Any]) -> Dict[str, Any]: """Send Slack notification""" try: webhook_url = channel.get('webhook_url') if not webhook_url: return {'error': 'No Slack webhook URL configured'} # Create Slack message color = self._get_slack_color(alert_data.get('severity', 'MEDIUM')) slack_message = { "text": alert_data.get('title', 'System Alert'), "attachments": [ { "color": color, "fields": [ { "title": "Description", "value": alert_data.get('description', ''), "short": False }, { "title": "Severity", "value": alert_data.get('severity', 'UNKNOWN'), "short": True }, { "title": "Time", "value": timezone.now().strftime('%Y-%m-%d %H:%M:%S'), "short": True } ] } ] } # Send to Slack (would use requests in real implementation) # requests.post(webhook_url, json=slack_message) return {'status': 'sent', 'channel': channel.get('channel', '#alerts')} except Exception as e: return {'error': str(e)} def _send_webhook_notification(self, alert_data: Dict[str, Any], channel: Dict[str, Any]) -> Dict[str, Any]: """Send webhook notification""" try: webhook_url = channel.get('url') if not webhook_url: return {'error': 'No webhook URL configured'} # Prepare webhook payload payload = { 'alert': alert_data, 'timestamp': timezone.now().isoformat(), 'source': 'ETB-API-Monitoring' } # Send webhook (would use requests in real implementation) # requests.post(webhook_url, json=payload) return {'status': 'sent', 'url': webhook_url} except Exception as e: return {'error': str(e)} def _get_slack_color(self, severity: str) -> str: """Get Slack color based on severity""" color_map = { 'LOW': 'good', 'MEDIUM': 'warning', 'HIGH': 'danger', 'CRITICAL': 'danger' } return color_map.get(severity, 'warning') class AlertingService: """Main alerting service that coordinates alert evaluation and notification""" def __init__(self): self.evaluator = AlertEvaluator() self.notification_service = NotificationService() def run_alert_evaluation(self) -> Dict[str, Any]: """Run alert evaluation and send notifications""" results = { 'evaluated_rules': 0, 'triggered_alerts': 0, 'notifications_sent': 0, 'errors': [] } try: # Evaluate all alert rules triggered_alerts = self.evaluator.evaluate_alert_rules() results['triggered_alerts'] = len(triggered_alerts) # Send notifications for triggered alerts for alert_data in triggered_alerts: try: notification_results = self.notification_service.send_alert_notifications(alert_data) results['notifications_sent'] += 1 except Exception as e: logger.error(f"Failed to send notifications for alert {alert_data.get('alert_id')}: {e}") results['errors'].append(str(e)) # Count evaluated rules results['evaluated_rules'] = AlertRule.objects.filter( status='ACTIVE', is_enabled=True ).count() except Exception as e: logger.error(f"Alert evaluation failed: {e}") results['errors'].append(str(e)) return results def acknowledge_alert(self, alert_id: str, user: User) -> Dict[str, Any]: """Acknowledge an alert""" try: alert = Alert.objects.get(id=alert_id) alert.status = 'ACKNOWLEDGED' alert.acknowledged_by = user alert.acknowledged_at = timezone.now() alert.save() return { 'status': 'success', 'message': f'Alert {alert_id} acknowledged by {user.username}' } except Alert.DoesNotExist: return { 'status': 'error', 'message': f'Alert {alert_id} not found' } except Exception as e: return { 'status': 'error', 'message': str(e) } def resolve_alert(self, alert_id: str, user: User) -> Dict[str, Any]: """Resolve an alert""" try: alert = Alert.objects.get(id=alert_id) alert.status = 'RESOLVED' alert.resolved_by = user alert.resolved_at = timezone.now() alert.save() return { 'status': 'success', 'message': f'Alert {alert_id} resolved by {user.username}' } except Alert.DoesNotExist: return { 'status': 'error', 'message': f'Alert {alert_id} not found' } except Exception as e: return { 'status': 'error', 'message': str(e) } def get_active_alerts(self, severity: Optional[str] = None) -> List[Dict[str, Any]]: """Get active alerts""" alerts = Alert.objects.filter(status='TRIGGERED') if severity: alerts = alerts.filter(severity=severity) return [ { 'id': str(alert.id), 'title': alert.title, 'description': alert.description, 'severity': alert.severity, 'triggered_at': alert.triggered_at, 'rule_name': alert.rule.name, 'current_value': float(alert.triggered_value) if alert.triggered_value else None, 'threshold_value': float(alert.threshold_value) if alert.threshold_value else None } for alert in alerts.order_by('-triggered_at') ]