412 lines
15 KiB
Python
412 lines
15 KiB
Python
"""
|
|
Views for Automation & Orchestration models
|
|
"""
|
|
from rest_framework import viewsets, status, permissions
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from django_filters.rest_framework import DjangoFilterBackend
|
|
from rest_framework.filters import SearchFilter, OrderingFilter
|
|
from django.utils import timezone
|
|
from django.db.models import Q
|
|
|
|
from ..models import (
|
|
Runbook,
|
|
RunbookExecution,
|
|
Integration,
|
|
ChatOpsIntegration,
|
|
ChatOpsCommand,
|
|
AutoRemediation,
|
|
AutoRemediationExecution,
|
|
MaintenanceWindow,
|
|
WorkflowTemplate,
|
|
WorkflowExecution,
|
|
)
|
|
from ..serializers.automation import (
|
|
RunbookSerializer,
|
|
RunbookExecutionSerializer,
|
|
IntegrationSerializer,
|
|
ChatOpsIntegrationSerializer,
|
|
ChatOpsCommandSerializer,
|
|
AutoRemediationSerializer,
|
|
AutoRemediationExecutionSerializer,
|
|
MaintenanceWindowSerializer,
|
|
WorkflowTemplateSerializer,
|
|
WorkflowExecutionSerializer,
|
|
)
|
|
|
|
|
|
class RunbookViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Runbook model"""
|
|
|
|
queryset = Runbook.objects.all()
|
|
serializer_class = RunbookSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['status', 'trigger_type', 'category', 'is_public']
|
|
search_fields = ['name', 'description', 'category']
|
|
ordering_fields = ['name', 'created_at', 'updated_at', 'execution_count', 'success_rate']
|
|
ordering = ['-created_at']
|
|
|
|
def get_queryset(self):
|
|
"""Filter runbooks based on user permissions"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Filter by public runbooks or user's own runbooks
|
|
if not self.request.user.is_staff:
|
|
queryset = queryset.filter(
|
|
Q(is_public=True) | Q(created_by=self.request.user)
|
|
)
|
|
|
|
return queryset
|
|
|
|
def perform_create(self, serializer):
|
|
"""Set the creator when creating a runbook"""
|
|
serializer.save(created_by=self.request.user)
|
|
|
|
def perform_update(self, serializer):
|
|
"""Set the last modifier when updating a runbook"""
|
|
serializer.save(last_modified_by=self.request.user)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def execute(self, request, pk=None):
|
|
"""Execute a runbook"""
|
|
runbook = self.get_object()
|
|
|
|
if not runbook.can_be_triggered_by(request.user):
|
|
return Response(
|
|
{'error': 'You do not have permission to execute this runbook'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Create execution record
|
|
execution = RunbookExecution.objects.create(
|
|
runbook=runbook,
|
|
triggered_by=request.user,
|
|
trigger_type='MANUAL',
|
|
trigger_data=request.data.get('trigger_data', {}),
|
|
total_steps=len(runbook.steps)
|
|
)
|
|
|
|
# TODO: Start actual execution in background task
|
|
|
|
serializer = RunbookExecutionSerializer(execution, context={'request': request})
|
|
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def available_for_trigger(self, request):
|
|
"""Get runbooks available for triggering by current user"""
|
|
queryset = self.get_queryset().filter(status='ACTIVE')
|
|
available_runbooks = [rb for rb in queryset if rb.can_be_triggered_by(request.user)]
|
|
|
|
serializer = self.get_serializer(available_runbooks, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class RunbookExecutionViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""ViewSet for RunbookExecution model (read-only)"""
|
|
|
|
queryset = RunbookExecution.objects.all()
|
|
serializer_class = RunbookExecutionSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['status', 'trigger_type', 'runbook', 'incident']
|
|
search_fields = ['runbook__name']
|
|
ordering_fields = ['started_at', 'completed_at', 'duration']
|
|
ordering = ['-started_at']
|
|
|
|
def get_queryset(self):
|
|
"""Filter executions based on user permissions"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Users can only see executions they triggered or for incidents they have access to
|
|
if not self.request.user.is_staff:
|
|
queryset = queryset.filter(
|
|
Q(triggered_by=self.request.user) |
|
|
Q(incident__assigned_to=self.request.user) |
|
|
Q(incident__reporter=self.request.user)
|
|
)
|
|
|
|
return queryset
|
|
|
|
|
|
class IntegrationViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Integration model"""
|
|
|
|
queryset = Integration.objects.all()
|
|
serializer_class = IntegrationSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['integration_type', 'status', 'health_status']
|
|
search_fields = ['name', 'description']
|
|
ordering_fields = ['name', 'created_at', 'last_used_at']
|
|
ordering = ['name']
|
|
|
|
def perform_create(self, serializer):
|
|
"""Set the creator when creating an integration"""
|
|
serializer.save(created_by=self.request.user)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def test_connection(self, request, pk=None):
|
|
"""Test integration connection"""
|
|
integration = self.get_object()
|
|
|
|
# TODO: Implement actual connection testing
|
|
# For now, just return a mock response
|
|
return Response({
|
|
'status': 'success',
|
|
'message': f'Connection test for {integration.name} completed',
|
|
'health_status': 'HEALTHY'
|
|
})
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def health_check(self, request, pk=None):
|
|
"""Perform health check on integration"""
|
|
integration = self.get_object()
|
|
|
|
# TODO: Implement actual health check
|
|
# For now, just update the timestamp
|
|
integration.last_health_check = timezone.now()
|
|
integration.health_status = 'HEALTHY'
|
|
integration.save()
|
|
|
|
return Response({
|
|
'status': 'success',
|
|
'health_status': integration.health_status,
|
|
'last_health_check': integration.last_health_check
|
|
})
|
|
|
|
|
|
class ChatOpsIntegrationViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for ChatOpsIntegration model"""
|
|
|
|
queryset = ChatOpsIntegration.objects.all()
|
|
serializer_class = ChatOpsIntegrationSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['platform', 'is_active']
|
|
search_fields = ['name']
|
|
ordering_fields = ['name', 'created_at', 'last_activity']
|
|
ordering = ['name']
|
|
|
|
def perform_create(self, serializer):
|
|
"""Set the creator when creating a ChatOps integration"""
|
|
serializer.save(created_by=self.request.user)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def test_webhook(self, request, pk=None):
|
|
"""Test ChatOps webhook"""
|
|
integration = self.get_object()
|
|
|
|
# TODO: Implement actual webhook testing
|
|
return Response({
|
|
'status': 'success',
|
|
'message': f'Webhook test for {integration.name} completed'
|
|
})
|
|
|
|
|
|
class ChatOpsCommandViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""ViewSet for ChatOpsCommand model (read-only)"""
|
|
|
|
queryset = ChatOpsCommand.objects.all()
|
|
serializer_class = ChatOpsCommandSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['status', 'chatops_integration', 'command']
|
|
search_fields = ['command', 'user_id']
|
|
ordering_fields = ['executed_at', 'completed_at']
|
|
ordering = ['-executed_at']
|
|
|
|
|
|
class AutoRemediationViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for AutoRemediation model"""
|
|
|
|
queryset = AutoRemediation.objects.all()
|
|
serializer_class = AutoRemediationSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['remediation_type', 'trigger_condition_type', 'is_active', 'requires_approval']
|
|
search_fields = ['name', 'description']
|
|
ordering_fields = ['name', 'created_at', 'execution_count', 'success_count']
|
|
ordering = ['name']
|
|
|
|
def perform_create(self, serializer):
|
|
"""Set the creator when creating an auto-remediation"""
|
|
serializer.save(created_by=self.request.user)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def test_trigger(self, request, pk=None):
|
|
"""Test auto-remediation trigger conditions"""
|
|
remediation = self.get_object()
|
|
|
|
# TODO: Implement actual trigger testing
|
|
return Response({
|
|
'status': 'success',
|
|
'message': f'Trigger test for {remediation.name} completed',
|
|
'trigger_conditions': remediation.trigger_conditions
|
|
})
|
|
|
|
|
|
class AutoRemediationExecutionViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""ViewSet for AutoRemediationExecution model (read-only)"""
|
|
|
|
queryset = AutoRemediationExecution.objects.all()
|
|
serializer_class = AutoRemediationExecutionSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['status', 'auto_remediation', 'incident']
|
|
search_fields = ['auto_remediation__name', 'incident__title']
|
|
ordering_fields = ['triggered_at', 'started_at', 'completed_at']
|
|
ordering = ['-triggered_at']
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def approve(self, request, pk=None):
|
|
"""Approve a pending auto-remediation execution"""
|
|
execution = self.get_object()
|
|
|
|
if execution.status != 'PENDING':
|
|
return Response(
|
|
{'error': 'Only pending remediations can be approved'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
if not execution.auto_remediation.requires_approval:
|
|
return Response(
|
|
{'error': 'This remediation does not require approval'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
if request.user not in execution.auto_remediation.approval_users.all():
|
|
return Response(
|
|
{'error': 'You do not have permission to approve this remediation'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
execution.status = 'APPROVED'
|
|
execution.approved_by = request.user
|
|
execution.approved_at = timezone.now()
|
|
execution.approval_notes = request.data.get('approval_notes', '')
|
|
execution.save()
|
|
|
|
# TODO: Start actual remediation execution
|
|
|
|
serializer = self.get_serializer(execution)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def reject(self, request, pk=None):
|
|
"""Reject a pending auto-remediation execution"""
|
|
execution = self.get_object()
|
|
|
|
if execution.status != 'PENDING':
|
|
return Response(
|
|
{'error': 'Only pending remediations can be rejected'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
execution.status = 'REJECTED'
|
|
execution.approved_by = request.user
|
|
execution.approved_at = timezone.now()
|
|
execution.approval_notes = request.data.get('rejection_notes', '')
|
|
execution.save()
|
|
|
|
serializer = self.get_serializer(execution)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class MaintenanceWindowViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for MaintenanceWindow model"""
|
|
|
|
queryset = MaintenanceWindow.objects.all()
|
|
serializer_class = MaintenanceWindowSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['status']
|
|
search_fields = ['name', 'description']
|
|
ordering_fields = ['name', 'start_time', 'end_time', 'created_at']
|
|
ordering = ['start_time']
|
|
|
|
def perform_create(self, serializer):
|
|
"""Set the creator when creating a maintenance window"""
|
|
serializer.save(created_by=self.request.user)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def active(self, request):
|
|
"""Get currently active maintenance windows"""
|
|
now = timezone.now()
|
|
active_windows = self.get_queryset().filter(
|
|
start_time__lte=now,
|
|
end_time__gte=now,
|
|
status='ACTIVE'
|
|
)
|
|
|
|
serializer = self.get_serializer(active_windows, many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def upcoming(self, request):
|
|
"""Get upcoming maintenance windows"""
|
|
now = timezone.now()
|
|
upcoming_windows = self.get_queryset().filter(
|
|
start_time__gt=now,
|
|
status='SCHEDULED'
|
|
)
|
|
|
|
serializer = self.get_serializer(upcoming_windows, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class WorkflowTemplateViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for WorkflowTemplate model"""
|
|
|
|
queryset = WorkflowTemplate.objects.all()
|
|
serializer_class = WorkflowTemplateSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['template_type', 'is_public']
|
|
search_fields = ['name', 'description']
|
|
ordering_fields = ['name', 'created_at', 'usage_count']
|
|
ordering = ['name']
|
|
|
|
def get_queryset(self):
|
|
"""Filter templates based on user permissions"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Filter by public templates or user's own templates
|
|
if not self.request.user.is_staff:
|
|
queryset = queryset.filter(
|
|
Q(is_public=True) | Q(created_by=self.request.user)
|
|
)
|
|
|
|
return queryset
|
|
|
|
def perform_create(self, serializer):
|
|
"""Set the creator when creating a workflow template"""
|
|
serializer.save(created_by=self.request.user)
|
|
|
|
|
|
class WorkflowExecutionViewSet(viewsets.ReadOnlyModelViewSet):
|
|
"""ViewSet for WorkflowExecution model (read-only)"""
|
|
|
|
queryset = WorkflowExecution.objects.all()
|
|
serializer_class = WorkflowExecutionSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
|
|
filterset_fields = ['status', 'workflow_template', 'trigger_type']
|
|
search_fields = ['name', 'workflow_template__name']
|
|
ordering_fields = ['started_at', 'completed_at', 'duration']
|
|
ordering = ['-started_at']
|
|
|
|
def get_queryset(self):
|
|
"""Filter executions based on user permissions"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Users can only see executions they triggered or for incidents they have access to
|
|
if not self.request.user.is_staff:
|
|
queryset = queryset.filter(
|
|
Q(triggered_by=self.request.user) |
|
|
Q(related_incident__assigned_to=self.request.user) |
|
|
Q(related_incident__reporter=self.request.user)
|
|
)
|
|
|
|
return queryset
|