Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

458 lines
18 KiB
Python

"""
Views for incident intelligence API endpoints
"""
from rest_framework import viewsets, status, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django_filters.rest_framework import DjangoFilterBackend
from django.db.models import Q, Count, Avg, DurationField, F
from django.db.models.functions import Extract
from django.utils import timezone
from datetime import timedelta
import logging
from ..models import (
Incident, IncidentClassification, SeveritySuggestion, IncidentCorrelation,
DuplicationDetection, IncidentPattern, AIProcessingLog
)
from ..serializers.incident import (
IncidentSerializer, IncidentCreateSerializer, IncidentUpdateSerializer,
IncidentAnalysisSerializer, IncidentSearchSerializer, IncidentStatsSerializer,
IncidentCorrelationSerializer, DuplicationDetectionSerializer, IncidentPatternSerializer
)
from ..ai.classification import IncidentClassifier, SeverityAnalyzer
from ..ai.correlation import IncidentCorrelationEngine
from ..ai.duplication import DuplicationDetector
from ..security import IncidentAccessControl, log_incident_operation, get_user_accessible_incidents
logger = logging.getLogger(__name__)
class IncidentViewSet(viewsets.ModelViewSet):
"""
ViewSet for managing incidents with AI intelligence
"""
queryset = Incident.objects.all()
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['status', 'severity', 'category', 'assigned_to', 'reporter', 'ai_processed', 'is_duplicate']
search_fields = ['title', 'description', 'free_text']
ordering_fields = ['created_at', 'updated_at', 'severity', 'status']
ordering = ['-created_at']
def get_serializer_class(self):
"""Return appropriate serializer based on action"""
if self.action == 'create':
return IncidentCreateSerializer
elif self.action in ['update', 'partial_update']:
return IncidentUpdateSerializer
elif self.action == 'analysis':
return IncidentAnalysisSerializer
else:
return IncidentSerializer
def get_queryset(self):
"""Filter queryset based on user permissions and search parameters"""
# Start with user-accessible incidents
queryset = get_user_accessible_incidents(self.request.user)
# Apply search filters
search_serializer = IncidentSearchSerializer(data=self.request.query_params)
if search_serializer.is_valid():
params = search_serializer.validated_data
if params.get('query'):
queryset = queryset.filter(
Q(title__icontains=params['query']) |
Q(description__icontains=params['query']) |
Q(free_text__icontains=params['query'])
)
if params.get('category'):
queryset = queryset.filter(category=params['category'])
if params.get('severity'):
queryset = queryset.filter(severity=params['severity'])
if params.get('status'):
queryset = queryset.filter(status=params['status'])
if params.get('assigned_to'):
queryset = queryset.filter(assigned_to_id=params['assigned_to'])
if params.get('reporter'):
queryset = queryset.filter(reporter_id=params['reporter'])
if params.get('date_from'):
queryset = queryset.filter(created_at__gte=params['date_from'])
if params.get('date_to'):
queryset = queryset.filter(created_at__lte=params['date_to'])
if params.get('has_ai_analysis') is not None:
queryset = queryset.filter(ai_processed=params['has_ai_analysis'])
if params.get('is_duplicate') is not None:
queryset = queryset.filter(is_duplicate=params['is_duplicate'])
return queryset
def perform_create(self, serializer):
"""Create incident and trigger AI processing"""
# Check create permissions
IncidentAccessControl.check_incident_access(self.request.user, None, 'create')
incident = serializer.save()
# Log the creation
log_incident_operation(self.request.user, incident, 'create')
# Trigger AI processing
try:
from ..tasks import process_incident_ai
process_incident_ai.delay(incident.id)
except Exception as e:
logger.error(f"Failed to trigger AI processing for incident {incident.id}: {e}")
@action(detail=True, methods=['post'])
def analyze(self, request, pk=None):
"""Trigger AI analysis for a specific incident"""
incident = self.get_object()
# Check analyze permissions
IncidentAccessControl.check_incident_access(request.user, incident, 'analyze')
try:
from ..tasks import process_incident_ai
process_incident_ai.delay(incident.id)
# Log the analysis trigger
log_incident_operation(request.user, incident, 'analyze')
return Response({
'message': 'AI analysis triggered successfully',
'incident_id': str(incident.id)
}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
logger.error(f"Failed to trigger AI analysis for incident {incident.id}: {e}")
return Response({
'error': 'Failed to trigger AI analysis',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['get'])
def analysis(self, request, pk=None):
"""Get comprehensive AI analysis for an incident"""
incident = self.get_object()
serializer = self.get_serializer(incident)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def correlations(self, request, pk=None):
"""Get correlations for a specific incident"""
incident = self.get_object()
# Get correlations where this incident is primary
primary_correlations = incident.correlations_as_primary.all()
# Get correlations where this incident is related
related_correlations = incident.correlations_as_related.all()
# Combine and serialize
all_correlations = list(primary_correlations) + list(related_correlations)
serializer = IncidentCorrelationSerializer(all_correlations, many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def duplicates(self, request, pk=None):
"""Get potential duplicates for a specific incident"""
incident = self.get_object()
# Get duplications where this incident is incident_a
duplications_a = incident.duplication_as_a.all()
# Get duplications where this incident is incident_b
duplications_b = incident.duplication_as_b.all()
# Combine and serialize
all_duplications = list(duplications_a) + list(duplications_b)
serializer = DuplicationDetectionSerializer(all_duplications, many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def patterns(self, request, pk=None):
"""Get patterns associated with a specific incident"""
incident = self.get_object()
patterns = incident.patterns.all()
serializer = IncidentPatternSerializer(patterns, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def stats(self, request):
"""Get incident statistics"""
queryset = self.get_queryset()
# Basic counts
total_incidents = queryset.count()
open_incidents = queryset.filter(status__in=['OPEN', 'IN_PROGRESS']).count()
resolved_incidents = queryset.filter(status__in=['RESOLVED', 'CLOSED']).count()
# Severity counts
critical_incidents = queryset.filter(severity='CRITICAL').count()
high_incidents = queryset.filter(severity='HIGH').count()
medium_incidents = queryset.filter(severity='MEDIUM').count()
low_incidents = queryset.filter(severity='LOW').count()
# Average resolution time
resolved_with_time = queryset.filter(
status__in=['RESOLVED', 'CLOSED'],
resolved_at__isnull=False
).annotate(
resolution_time=F('resolved_at') - F('created_at')
)
avg_resolution_time = resolved_with_time.aggregate(
avg_time=Avg('resolution_time')
)['avg_time']
# Category distribution
incidents_by_category = dict(
queryset.values('category').annotate(count=Count('id')).values_list('category', 'count')
)
# Severity distribution
incidents_by_severity = dict(
queryset.values('severity').annotate(count=Count('id')).values_list('severity', 'count')
)
# Status distribution
incidents_by_status = dict(
queryset.values('status').annotate(count=Count('id')).values_list('status', 'count')
)
# AI processing stats
ai_processed_count = queryset.filter(ai_processed=True).count()
duplicate_count = queryset.filter(is_duplicate=True).count()
correlation_count = IncidentCorrelation.objects.count()
pattern_count = IncidentPattern.objects.count()
stats_data = {
'total_incidents': total_incidents,
'open_incidents': open_incidents,
'resolved_incidents': resolved_incidents,
'critical_incidents': critical_incidents,
'high_incidents': high_incidents,
'medium_incidents': medium_incidents,
'low_incidents': low_incidents,
'average_resolution_time': avg_resolution_time,
'incidents_by_category': incidents_by_category,
'incidents_by_severity': incidents_by_severity,
'incidents_by_status': incidents_by_status,
'ai_processed_count': ai_processed_count,
'duplicate_count': duplicate_count,
'correlation_count': correlation_count,
'pattern_count': pattern_count
}
serializer = IncidentStatsSerializer(stats_data)
return Response(serializer.data)
@action(detail=False, methods=['post'])
def batch_analyze(self, request):
"""Trigger AI analysis for multiple incidents"""
incident_ids = request.data.get('incident_ids', [])
if not incident_ids:
return Response({
'error': 'No incident IDs provided'
}, status=status.HTTP_400_BAD_REQUEST)
try:
from ..tasks import batch_process_incidents_ai
batch_process_incidents_ai.delay(incident_ids)
return Response({
'message': f'AI analysis triggered for {len(incident_ids)} incidents',
'incident_ids': incident_ids
}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
logger.error(f"Failed to trigger batch AI analysis: {e}")
return Response({
'error': 'Failed to trigger batch AI analysis',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'])
def detect_duplicates(self, request):
"""Detect duplicates across all incidents"""
try:
from ..tasks import detect_all_duplicates
detect_all_duplicates.delay()
return Response({
'message': 'Duplicate detection process started'
}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
logger.error(f"Failed to trigger duplicate detection: {e}")
return Response({
'error': 'Failed to trigger duplicate detection',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'])
def correlate_incidents(self, request):
"""Run correlation analysis on all incidents"""
try:
from ..tasks import correlate_all_incidents
correlate_all_incidents.delay()
return Response({
'message': 'Correlation analysis process started'
}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
logger.error(f"Failed to trigger correlation analysis: {e}")
return Response({
'error': 'Failed to trigger correlation analysis',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class IncidentCorrelationViewSet(viewsets.ReadOnlyModelViewSet):
"""
ViewSet for viewing incident correlations
"""
queryset = IncidentCorrelation.objects.all()
serializer_class = IncidentCorrelationSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ['correlation_type', 'correlation_strength', 'is_problem_indicator']
ordering_fields = ['confidence_score', 'created_at']
ordering = ['-confidence_score']
@action(detail=False, methods=['get'])
def problem_indicators(self, request):
"""Get correlations that indicate larger problems"""
queryset = self.get_queryset().filter(is_problem_indicator=True)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class DuplicationDetectionViewSet(viewsets.ReadOnlyModelViewSet):
"""
ViewSet for viewing duplication detection results
"""
queryset = DuplicationDetection.objects.all()
serializer_class = DuplicationDetectionSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ['duplication_type', 'status', 'recommended_action']
ordering_fields = ['confidence_score', 'similarity_score', 'created_at']
ordering = ['-confidence_score']
@action(detail=True, methods=['post'])
def approve_merge(self, request, pk=None):
"""Approve merging of duplicate incidents"""
duplication = self.get_object()
if duplication.status != 'DETECTED':
return Response({
'error': 'Only detected duplications can be approved for merging'
}, status=status.HTTP_400_BAD_REQUEST)
try:
# Mark as reviewed and approved
duplication.status = 'REVIEWED'
duplication.reviewed_by = request.user
duplication.reviewed_at = timezone.now()
duplication.save()
# Trigger merge process
from ..tasks import merge_duplicate_incidents
merge_duplicate_incidents.delay(duplication.id)
return Response({
'message': 'Merge approved and queued for processing'
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"Failed to approve merge for duplication {duplication.id}: {e}")
return Response({
'error': 'Failed to approve merge',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=True, methods=['post'])
def reject_merge(self, request, pk=None):
"""Reject merging of duplicate incidents"""
duplication = self.get_object()
if duplication.status != 'DETECTED':
return Response({
'error': 'Only detected duplications can be rejected'
}, status=status.HTTP_400_BAD_REQUEST)
try:
duplication.status = 'REJECTED'
duplication.reviewed_by = request.user
duplication.reviewed_at = timezone.now()
duplication.save()
return Response({
'message': 'Merge rejected'
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"Failed to reject merge for duplication {duplication.id}: {e}")
return Response({
'error': 'Failed to reject merge',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class IncidentPatternViewSet(viewsets.ReadOnlyModelViewSet):
"""
ViewSet for viewing incident patterns
"""
queryset = IncidentPattern.objects.all()
serializer_class = IncidentPatternSerializer
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ['pattern_type', 'is_active', 'is_resolved']
ordering_fields = ['confidence_score', 'incident_count', 'created_at']
ordering = ['-confidence_score']
@action(detail=False, methods=['get'])
def active_patterns(self, request):
"""Get active patterns that need attention"""
queryset = self.get_queryset().filter(is_active=True, is_resolved=False)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def resolve_pattern(self, request, pk=None):
"""Mark a pattern as resolved"""
pattern = self.get_object()
try:
pattern.is_resolved = True
pattern.is_active = False
pattern.save()
return Response({
'message': 'Pattern marked as resolved'
}, status=status.HTTP_200_OK)
except Exception as e:
logger.error(f"Failed to resolve pattern {pattern.id}: {e}")
return Response({
'error': 'Failed to resolve pattern',
'details': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)