Files
ETB/ETB-API/collaboration_war_rooms/views/collaboration.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

647 lines
24 KiB
Python

"""
Views for Collaboration & War Rooms module
"""
from rest_framework import viewsets, status, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import SearchFilter, OrderingFilter
from django.db.models import Q
from django.utils import timezone
from ..models import (
WarRoom, ConferenceBridge, IncidentCommandRole,
TimelineEvent, WarRoomMessage, IncidentDecision,
MessageReaction, ChatFile, ChatCommand, ChatBot
)
from ..serializers.collaboration import (
WarRoomSerializer, ConferenceBridgeSerializer, IncidentCommandRoleSerializer,
TimelineEventSerializer, WarRoomMessageSerializer, IncidentDecisionSerializer,
WarRoomSummarySerializer, TimelineEventSummarySerializer,
MessageReactionSerializer, ChatFileSerializer, ChatCommandSerializer, ChatBotSerializer
)
from incident_intelligence.models import Incident
class WarRoomViewSet(viewsets.ModelViewSet):
"""ViewSet for WarRoom model"""
queryset = WarRoom.objects.all()
serializer_class = WarRoomSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['status', 'privacy_level', 'incident__severity']
search_fields = ['name', 'description', 'incident__title']
ordering_fields = ['created_at', 'last_activity', 'message_count']
ordering = ['-created_at']
def get_queryset(self):
"""Filter war rooms based on user access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by user access
accessible_war_rooms = []
for war_room in queryset:
if war_room.can_user_access(user):
accessible_war_rooms.append(war_room.id)
return queryset.filter(id__in=accessible_war_rooms)
def get_serializer_class(self):
"""Return appropriate serializer based on action"""
if self.action == 'list':
return WarRoomSummarySerializer
return WarRoomSerializer
@action(detail=True, methods=['post'])
def add_participant(self, request, pk=None):
"""Add a participant to the war room"""
war_room = self.get_object()
user_id = request.data.get('user_id')
if not user_id:
return Response(
{'error': 'user_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=user_id)
if war_room.can_user_access(user):
war_room.add_participant(user)
return Response({'message': 'Participant added successfully'})
else:
return Response(
{'error': 'User does not have access to this war room'},
status=status.HTTP_403_FORBIDDEN
)
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=True, methods=['post'])
def remove_participant(self, request, pk=None):
"""Remove a participant from the war room"""
war_room = self.get_object()
user_id = request.data.get('user_id')
if not user_id:
return Response(
{'error': 'user_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(id=user_id)
war_room.remove_participant(user)
return Response({'message': 'Participant removed successfully'})
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=True, methods=['get'])
def messages(self, request, pk=None):
"""Get messages for a war room"""
war_room = self.get_object()
messages = war_room.messages.all().order_by('created_at')
# Apply pagination
page = self.paginate_queryset(messages)
if page is not None:
serializer = WarRoomMessageSerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = WarRoomMessageSerializer(messages, many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def pinned_messages(self, request, pk=None):
"""Get pinned messages for a war room"""
war_room = self.get_object()
pinned_messages = war_room.messages.filter(is_pinned=True).order_by('-pinned_at')
serializer = WarRoomMessageSerializer(pinned_messages, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def create_chat_room(self, request, pk=None):
"""Auto-create chat room for incident"""
incident = self.get_object()
# Check if war room already exists for this incident
existing_war_room = WarRoom.objects.filter(incident=incident).first()
if existing_war_room:
return Response(
{'message': 'War room already exists for this incident', 'war_room_id': existing_war_room.id},
status=status.HTTP_200_OK
)
# Create new war room
war_room = WarRoom.objects.create(
name=f"Incident Chat - {incident.title}",
description=f"Chat room for incident: {incident.title}",
incident=incident,
created_by=request.user,
privacy_level='PRIVATE'
)
# Add incident reporter and assignee to war room
if incident.reporter:
war_room.add_participant(incident.reporter)
if incident.assigned_to:
war_room.add_participant(incident.assigned_to)
# Create timeline event
TimelineEvent.create_system_event(
incident=incident,
event_type='WAR_ROOM_CREATED',
title='War Room Created',
description=f'War room "{war_room.name}" was automatically created for incident collaboration',
event_data={'war_room_id': str(war_room.id)}
)
serializer = WarRoomSerializer(war_room)
return Response(serializer.data, status=status.HTTP_201_CREATED)
class ConferenceBridgeViewSet(viewsets.ModelViewSet):
"""ViewSet for ConferenceBridge model"""
queryset = ConferenceBridge.objects.all()
serializer_class = ConferenceBridgeSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['bridge_type', 'status', 'incident__severity']
search_fields = ['name', 'description', 'incident__title']
ordering_fields = ['scheduled_start', 'created_at']
ordering = ['-scheduled_start']
def get_queryset(self):
"""Filter conference bridges based on user access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by user access
accessible_conferences = []
for conference in queryset:
if conference.can_user_join(user):
accessible_conferences.append(conference.id)
return queryset.filter(id__in=accessible_conferences)
@action(detail=True, methods=['post'])
def join_conference(self, request, pk=None):
"""Join a conference"""
conference = self.get_object()
user = request.user
if conference.can_user_join(user):
conference.add_participant(user)
return Response({'message': 'Successfully joined conference'})
else:
return Response(
{'error': 'You do not have permission to join this conference'},
status=status.HTTP_403_FORBIDDEN
)
@action(detail=True, methods=['post'])
def start_conference(self, request, pk=None):
"""Start a conference"""
conference = self.get_object()
if conference.status == 'SCHEDULED':
conference.status = 'ACTIVE'
conference.actual_start = timezone.now()
conference.save()
return Response({'message': 'Conference started successfully'})
else:
return Response(
{'error': 'Conference cannot be started in current status'},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=True, methods=['post'])
def end_conference(self, request, pk=None):
"""End a conference"""
conference = self.get_object()
if conference.status == 'ACTIVE':
conference.status = 'ENDED'
conference.actual_end = timezone.now()
conference.save()
return Response({'message': 'Conference ended successfully'})
else:
return Response(
{'error': 'Conference cannot be ended in current status'},
status=status.HTTP_400_BAD_REQUEST
)
class IncidentCommandRoleViewSet(viewsets.ModelViewSet):
"""ViewSet for IncidentCommandRole model"""
queryset = IncidentCommandRole.objects.all()
serializer_class = IncidentCommandRoleSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['role_type', 'status', 'incident__severity']
search_fields = ['incident__title', 'assigned_user__username']
ordering_fields = ['assigned_at', 'created_at']
ordering = ['-assigned_at']
def get_queryset(self):
"""Filter command roles based on user access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by incident access
accessible_incidents = []
for role in queryset:
if role.incident.is_accessible_by_user(user):
accessible_incidents.append(role.incident.id)
return queryset.filter(incident_id__in=accessible_incidents)
@action(detail=True, methods=['post'])
def reassign_role(self, request, pk=None):
"""Reassign a command role to a new user"""
command_role = self.get_object()
new_user_id = request.data.get('new_user_id')
notes = request.data.get('notes', '')
if not new_user_id:
return Response(
{'error': 'new_user_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
from django.contrib.auth import get_user_model
User = get_user_model()
new_user = User.objects.get(id=new_user_id)
command_role.reassign_role(new_user, request.user, notes)
return Response({'message': 'Role reassigned successfully'})
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
class TimelineEventViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for TimelineEvent model (read-only)"""
queryset = TimelineEvent.objects.all()
serializer_class = TimelineEventSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['event_type', 'source_type', 'is_critical_event', 'incident__severity']
search_fields = ['title', 'description', 'incident__title']
ordering_fields = ['event_time', 'created_at']
ordering = ['event_time']
def get_serializer_class(self):
"""Return appropriate serializer based on action"""
if self.action == 'list':
return TimelineEventSummarySerializer
return TimelineEventSerializer
def get_queryset(self):
"""Filter timeline events based on user access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by incident access
accessible_incidents = []
for event in queryset:
if event.incident.is_accessible_by_user(user):
accessible_incidents.append(event.incident.id)
return queryset.filter(incident_id__in=accessible_incidents)
@action(detail=False, methods=['get'])
def critical_events(self, request):
"""Get critical events for postmortem analysis"""
queryset = self.get_queryset().filter(is_critical_event=True)
# Apply pagination
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class WarRoomMessageViewSet(viewsets.ModelViewSet):
"""ViewSet for WarRoomMessage model"""
queryset = WarRoomMessage.objects.all()
serializer_class = WarRoomMessageSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['message_type', 'war_room', 'sender', 'is_pinned']
search_fields = ['content', 'sender_name']
ordering_fields = ['created_at', 'pinned_at']
ordering = ['created_at']
def get_queryset(self):
"""Filter messages based on war room access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by war room access
accessible_war_rooms = []
for message in queryset:
if message.war_room.can_user_access(user):
accessible_war_rooms.append(message.war_room.id)
return queryset.filter(war_room_id__in=accessible_war_rooms)
@action(detail=True, methods=['post'])
def pin_message(self, request, pk=None):
"""Pin a message"""
message = self.get_object()
message.pin_message(request.user)
# Create timeline event
TimelineEvent.create_user_event(
incident=message.war_room.incident,
user=request.user,
event_type='MANUAL_EVENT',
title='Message Pinned',
description=f'Message "{message.content[:50]}..." was pinned by {request.user.username}',
event_data={'message_id': str(message.id), 'action': 'pinned'}
)
return Response({'message': 'Message pinned successfully'})
@action(detail=True, methods=['post'])
def unpin_message(self, request, pk=None):
"""Unpin a message"""
message = self.get_object()
message.unpin_message()
return Response({'message': 'Message unpinned successfully'})
@action(detail=True, methods=['post'])
def add_reaction(self, request, pk=None):
"""Add a reaction to a message"""
message = self.get_object()
emoji = request.data.get('emoji')
if not emoji:
return Response(
{'error': 'emoji is required'},
status=status.HTTP_400_BAD_REQUEST
)
reaction = message.add_reaction(request.user, emoji)
serializer = MessageReactionSerializer(reaction)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def remove_reaction(self, request, pk=None):
"""Remove a reaction from a message"""
message = self.get_object()
emoji = request.data.get('emoji')
if not emoji:
return Response(
{'error': 'emoji is required'},
status=status.HTTP_400_BAD_REQUEST
)
message.remove_reaction(request.user, emoji)
return Response({'message': 'Reaction removed successfully'})
@action(detail=True, methods=['post'])
def execute_command(self, request, pk=None):
"""Execute a ChatOps command"""
message = self.get_object()
command_text = request.data.get('command_text')
if not command_text:
return Response(
{'error': 'command_text is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Parse command
command_type = self._parse_command_type(command_text)
parameters = self._parse_command_parameters(command_text)
# Create chat command
chat_command = ChatCommand.objects.create(
message=message,
command_type=command_type,
command_text=command_text,
parameters=parameters
)
# Execute command
result = chat_command.execute_command(request.user)
serializer = ChatCommandSerializer(chat_command)
return Response(serializer.data)
def _parse_command_type(self, command_text):
"""Parse command type from command text"""
command_text = command_text.lower().strip()
if command_text.startswith('/status'):
return 'STATUS'
elif command_text.startswith('/runbook'):
return 'RUNBOOK'
elif command_text.startswith('/escalate'):
return 'ESCALATE'
elif command_text.startswith('/assign'):
return 'ASSIGN'
elif command_text.startswith('/update'):
return 'UPDATE'
else:
return 'CUSTOM'
def _parse_command_parameters(self, command_text):
"""Parse command parameters from command text"""
parts = command_text.split()
if len(parts) > 1:
return {'args': parts[1:]}
return {}
class IncidentDecisionViewSet(viewsets.ModelViewSet):
"""ViewSet for IncidentDecision model"""
queryset = IncidentDecision.objects.all()
serializer_class = IncidentDecisionSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['decision_type', 'status', 'incident__severity']
search_fields = ['title', 'description', 'incident__title']
ordering_fields = ['created_at', 'approved_at', 'implemented_at']
ordering = ['-created_at']
def get_queryset(self):
"""Filter decisions based on incident access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by incident access
accessible_incidents = []
for decision in queryset:
if decision.incident.is_accessible_by_user(user):
accessible_incidents.append(decision.incident.id)
return queryset.filter(incident_id__in=accessible_incidents)
@action(detail=True, methods=['post'])
def approve_decision(self, request, pk=None):
"""Approve a decision"""
decision = self.get_object()
if decision.status == 'PENDING':
decision.approve(request.user)
return Response({'message': 'Decision approved successfully'})
else:
return Response(
{'error': 'Decision cannot be approved in current status'},
status=status.HTTP_400_BAD_REQUEST
)
@action(detail=True, methods=['post'])
def implement_decision(self, request, pk=None):
"""Mark a decision as implemented"""
decision = self.get_object()
notes = request.data.get('notes', '')
if decision.status == 'APPROVED':
decision.implement(request.user, notes)
return Response({'message': 'Decision implemented successfully'})
else:
return Response(
{'error': 'Decision must be approved before implementation'},
status=status.HTTP_400_BAD_REQUEST
)
class MessageReactionViewSet(viewsets.ModelViewSet):
"""ViewSet for MessageReaction model"""
queryset = MessageReaction.objects.all()
serializer_class = MessageReactionSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['message', 'user', 'emoji']
ordering_fields = ['created_at']
ordering = ['created_at']
class ChatFileViewSet(viewsets.ModelViewSet):
"""ViewSet for ChatFile model"""
queryset = ChatFile.objects.all()
serializer_class = ChatFileSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['message', 'file_type', 'data_classification']
search_fields = ['filename', 'original_filename']
ordering_fields = ['uploaded_at', 'file_size']
ordering = ['-uploaded_at']
def get_queryset(self):
"""Filter files based on message access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by message access
accessible_messages = []
for file_obj in queryset:
if file_obj.message.war_room.can_user_access(user):
accessible_messages.append(file_obj.message.id)
return queryset.filter(message_id__in=accessible_messages)
@action(detail=True, methods=['post'])
def log_access(self, request, pk=None):
"""Log file access for audit trail"""
file_obj = self.get_object()
file_obj.log_access(request.user)
return Response({'message': 'Access logged successfully'})
class ChatCommandViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for ChatCommand model (read-only)"""
queryset = ChatCommand.objects.all()
serializer_class = ChatCommandSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['command_type', 'execution_status', 'executed_by']
search_fields = ['command_text']
ordering_fields = ['executed_at', 'created_at']
ordering = ['-executed_at']
def get_queryset(self):
"""Filter commands based on message access"""
queryset = super().get_queryset()
user = self.request.user
# Filter by message access
accessible_messages = []
for command in queryset:
if command.message.war_room.can_user_access(user):
accessible_messages.append(command.message.id)
return queryset.filter(message_id__in=accessible_messages)
class ChatBotViewSet(viewsets.ModelViewSet):
"""ViewSet for ChatBot model"""
queryset = ChatBot.objects.all()
serializer_class = ChatBotSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['bot_type', 'is_active']
search_fields = ['name', 'description']
ordering_fields = ['name', 'created_at']
ordering = ['name']
@action(detail=True, methods=['post'])
def generate_response(self, request, pk=None):
"""Generate AI response to a message"""
bot = self.get_object()
message_id = request.data.get('message_id')
if not message_id:
return Response(
{'error': 'message_id is required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
message = WarRoomMessage.objects.get(id=message_id)
response = bot.generate_response(message, request.data.get('context', {}))
return Response(response)
except WarRoomMessage.DoesNotExist:
return Response(
{'error': 'Message not found'},
status=status.HTTP_404_NOT_FOUND
)