733 lines
27 KiB
Python
733 lines
27 KiB
Python
"""
|
|
Views for Compliance & Governance API endpoints
|
|
"""
|
|
from rest_framework import viewsets, status, permissions
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from rest_framework.pagination import PageNumberPagination
|
|
from django_filters.rest_framework import DjangoFilterBackend
|
|
from django_filters import rest_framework as filters
|
|
from django.db.models import Q, Count, Avg
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
|
|
from ..models import (
|
|
RegulatoryFramework,
|
|
ComplianceRequirement,
|
|
RegulatoryWorkflow,
|
|
WorkflowInstance,
|
|
EvidenceCollection,
|
|
RetentionPolicy,
|
|
ExportRequest,
|
|
ComplianceReport,
|
|
LegalHold,
|
|
)
|
|
from ..serializers.compliance import (
|
|
RegulatoryFrameworkSerializer,
|
|
ComplianceRequirementSerializer,
|
|
ComplianceRequirementDetailSerializer,
|
|
RegulatoryWorkflowSerializer,
|
|
WorkflowInstanceSerializer,
|
|
WorkflowInstanceDetailSerializer,
|
|
EvidenceCollectionSerializer,
|
|
EvidenceCollectionDetailSerializer,
|
|
RetentionPolicySerializer,
|
|
ExportRequestSerializer,
|
|
ExportRequestDetailSerializer,
|
|
ComplianceReportSerializer,
|
|
ComplianceReportDetailSerializer,
|
|
LegalHoldSerializer,
|
|
LegalHoldDetailSerializer,
|
|
)
|
|
|
|
|
|
class CompliancePagination(PageNumberPagination):
|
|
"""Custom pagination for compliance endpoints"""
|
|
page_size = 20
|
|
page_size_query_param = 'page_size'
|
|
max_page_size = 100
|
|
|
|
|
|
class RegulatoryFrameworkFilter(filters.FilterSet):
|
|
"""Filter for Regulatory Framework"""
|
|
|
|
framework_type = filters.ChoiceFilter(choices=RegulatoryFramework.FRAMEWORK_TYPES)
|
|
is_active = filters.BooleanFilter()
|
|
effective_date_after = filters.DateFilter(field_name='effective_date', lookup_expr='gte')
|
|
effective_date_before = filters.DateFilter(field_name='effective_date', lookup_expr='lte')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = RegulatoryFramework
|
|
fields = ['framework_type', 'is_active', 'effective_date_after', 'effective_date_before', 'search']
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(name__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(applicable_regions__icontains=value) |
|
|
Q(industry_sectors__icontains=value)
|
|
)
|
|
|
|
|
|
class RegulatoryFrameworkViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Regulatory Framework"""
|
|
|
|
queryset = RegulatoryFramework.objects.all()
|
|
serializer_class = RegulatoryFrameworkSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = RegulatoryFrameworkFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return RegulatoryFramework.objects.select_related('created_by').prefetch_related('requirements')
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def requirements(self, request, pk=None):
|
|
"""Get all requirements for a framework"""
|
|
framework = self.get_object()
|
|
requirements = framework.requirements.all()
|
|
|
|
# Apply filters if provided
|
|
status_filter = request.query_params.get('compliance_status')
|
|
if status_filter:
|
|
requirements = requirements.filter(compliance_status=status_filter)
|
|
|
|
priority_filter = request.query_params.get('priority')
|
|
if priority_filter:
|
|
requirements = requirements.filter(priority=priority_filter)
|
|
|
|
serializer = ComplianceRequirementSerializer(requirements, many=True)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def compliance_summary(self, request, pk=None):
|
|
"""Get compliance summary for a framework"""
|
|
framework = self.get_object()
|
|
requirements = framework.requirements.all()
|
|
|
|
summary = {
|
|
'total_requirements': requirements.count(),
|
|
'compliant': requirements.filter(compliance_status='COMPLIANT').count(),
|
|
'partially_compliant': requirements.filter(compliance_status='PARTIALLY_COMPLIANT').count(),
|
|
'non_compliant': requirements.filter(compliance_status='NON_COMPLIANT').count(),
|
|
'not_assessed': requirements.filter(compliance_status='NOT_ASSESSED').count(),
|
|
'implemented': requirements.filter(is_implemented=True).count(),
|
|
'overdue_assessments': requirements.filter(
|
|
next_assessment_date__lt=timezone.now().date()
|
|
).count(),
|
|
}
|
|
|
|
# Calculate compliance percentage
|
|
assessed_count = summary['compliant'] + summary['partially_compliant'] + summary['non_compliant']
|
|
if assessed_count > 0:
|
|
summary['compliance_percentage'] = (summary['compliant'] / assessed_count) * 100
|
|
else:
|
|
summary['compliance_percentage'] = 0
|
|
|
|
return Response(summary)
|
|
|
|
|
|
class ComplianceRequirementFilter(filters.FilterSet):
|
|
"""Filter for Compliance Requirement"""
|
|
|
|
framework = filters.UUIDFilter()
|
|
compliance_status = filters.ChoiceFilter(choices=ComplianceRequirement._meta.get_field('compliance_status').choices)
|
|
requirement_type = filters.ChoiceFilter(choices=ComplianceRequirement.REQUIREMENT_TYPES)
|
|
priority = filters.ChoiceFilter(choices=ComplianceRequirement.PRIORITY_LEVELS)
|
|
is_implemented = filters.BooleanFilter()
|
|
assigned_to = filters.UUIDFilter()
|
|
overdue = filters.BooleanFilter(method='filter_overdue')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = ComplianceRequirement
|
|
fields = ['framework', 'compliance_status', 'requirement_type', 'priority', 'is_implemented', 'assigned_to', 'overdue', 'search']
|
|
|
|
def filter_overdue(self, queryset, name, value):
|
|
if value:
|
|
return queryset.filter(next_assessment_date__lt=timezone.now().date())
|
|
return queryset
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(title__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(requirement_id__icontains=value) |
|
|
Q(responsible_team__icontains=value)
|
|
)
|
|
|
|
|
|
class ComplianceRequirementViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Compliance Requirement"""
|
|
|
|
queryset = ComplianceRequirement.objects.all()
|
|
serializer_class = ComplianceRequirementSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = ComplianceRequirementFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return ComplianceRequirement.objects.select_related(
|
|
'framework', 'assigned_to'
|
|
).prefetch_related('evidence_collection', 'workflow_instances')
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'retrieve':
|
|
return ComplianceRequirementDetailSerializer
|
|
return ComplianceRequirementSerializer
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def update_status(self, request, pk=None):
|
|
"""Update compliance status"""
|
|
requirement = self.get_object()
|
|
new_status = request.data.get('compliance_status')
|
|
|
|
if new_status not in dict(ComplianceRequirement._meta.get_field('compliance_status').choices):
|
|
return Response(
|
|
{'error': 'Invalid compliance status'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
requirement.compliance_status = new_status
|
|
requirement.last_assessment_date = timezone.now().date()
|
|
requirement.save()
|
|
|
|
serializer = self.get_serializer(requirement)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def schedule_assessment(self, request, pk=None):
|
|
"""Schedule next assessment"""
|
|
requirement = self.get_object()
|
|
assessment_date = request.data.get('assessment_date')
|
|
|
|
if not assessment_date:
|
|
return Response(
|
|
{'error': 'Assessment date is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
requirement.next_assessment_date = assessment_date
|
|
requirement.save()
|
|
|
|
serializer = self.get_serializer(requirement)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class RegulatoryWorkflowFilter(filters.FilterSet):
|
|
"""Filter for Regulatory Workflow"""
|
|
|
|
workflow_type = filters.ChoiceFilter(choices=RegulatoryWorkflow.WORKFLOW_TYPES)
|
|
status = filters.ChoiceFilter(choices=RegulatoryWorkflow.STATUS_CHOICES)
|
|
is_template = filters.BooleanFilter()
|
|
applicable_frameworks = filters.UUIDFilter(field_name='applicable_frameworks')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = RegulatoryWorkflow
|
|
fields = ['workflow_type', 'status', 'is_template', 'applicable_frameworks', 'search']
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(name__icontains=value) |
|
|
Q(description__icontains=value)
|
|
)
|
|
|
|
|
|
class RegulatoryWorkflowViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Regulatory Workflow"""
|
|
|
|
queryset = RegulatoryWorkflow.objects.all()
|
|
serializer_class = RegulatoryWorkflowSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = RegulatoryWorkflowFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return RegulatoryWorkflow.objects.select_related('created_by').prefetch_related(
|
|
'applicable_frameworks', 'instances'
|
|
)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def create_instance(self, request, pk=None):
|
|
"""Create a new workflow instance"""
|
|
workflow = self.get_object()
|
|
|
|
if workflow.status != 'ACTIVE':
|
|
return Response(
|
|
{'error': 'Cannot create instance from inactive workflow'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
instance_data = request.data.copy()
|
|
instance_data['workflow'] = workflow.id
|
|
instance_data['created_by'] = request.user.id
|
|
|
|
serializer = WorkflowInstanceSerializer(data=instance_data)
|
|
if serializer.is_valid():
|
|
instance = serializer.save()
|
|
return Response(
|
|
WorkflowInstanceSerializer(instance).data,
|
|
status=status.HTTP_201_CREATED
|
|
)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class WorkflowInstanceFilter(filters.FilterSet):
|
|
"""Filter for Workflow Instance"""
|
|
|
|
workflow = filters.UUIDFilter()
|
|
status = filters.ChoiceFilter(choices=WorkflowInstance.STATUS_CHOICES)
|
|
assigned_to = filters.UUIDFilter()
|
|
related_incident = filters.UUIDFilter()
|
|
overdue = filters.BooleanFilter(method='filter_overdue')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = WorkflowInstance
|
|
fields = ['workflow', 'status', 'assigned_to', 'related_incident', 'overdue', 'search']
|
|
|
|
def filter_overdue(self, queryset, name, value):
|
|
if value:
|
|
return queryset.filter(
|
|
due_date__lt=timezone.now(),
|
|
status__in=['PENDING', 'IN_PROGRESS']
|
|
)
|
|
return queryset
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(title__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(current_step__icontains=value)
|
|
)
|
|
|
|
|
|
class WorkflowInstanceViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Workflow Instance"""
|
|
|
|
queryset = WorkflowInstance.objects.all()
|
|
serializer_class = WorkflowInstanceSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = WorkflowInstanceFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return WorkflowInstance.objects.select_related(
|
|
'workflow', 'assigned_to', 'created_by', 'related_incident', 'related_requirement'
|
|
).prefetch_related('stakeholders', 'evidence_collection')
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'retrieve':
|
|
return WorkflowInstanceDetailSerializer
|
|
return WorkflowInstanceSerializer
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def advance_step(self, request, pk=None):
|
|
"""Advance workflow to next step"""
|
|
instance = self.get_object()
|
|
next_step = request.data.get('next_step')
|
|
step_data = request.data.get('step_data', {})
|
|
|
|
if not next_step:
|
|
return Response(
|
|
{'error': 'Next step is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Add current step to completed steps
|
|
if instance.current_step:
|
|
instance.completed_steps.append({
|
|
'step': instance.current_step,
|
|
'completed_at': timezone.now().isoformat(),
|
|
'completed_by': request.user.id
|
|
})
|
|
|
|
# Update current step and execution data
|
|
instance.current_step = next_step
|
|
instance.execution_data.update(step_data)
|
|
instance.updated_at = timezone.now()
|
|
|
|
# Check if workflow is complete
|
|
workflow_definition = instance.workflow.workflow_definition
|
|
if next_step in workflow_definition.get('end_steps', []):
|
|
instance.status = 'COMPLETED'
|
|
instance.completed_at = timezone.now()
|
|
|
|
instance.save()
|
|
|
|
serializer = self.get_serializer(instance)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def assign_stakeholder(self, request, pk=None):
|
|
"""Assign stakeholder to workflow instance"""
|
|
instance = self.get_object()
|
|
stakeholder_id = request.data.get('stakeholder_id')
|
|
|
|
if not stakeholder_id:
|
|
return Response(
|
|
{'error': 'Stakeholder ID is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
from django.contrib.auth import get_user_model
|
|
User = get_user_model()
|
|
|
|
try:
|
|
stakeholder = User.objects.get(id=stakeholder_id)
|
|
instance.stakeholders.add(stakeholder)
|
|
return Response({'message': 'Stakeholder assigned successfully'})
|
|
except User.DoesNotExist:
|
|
return Response(
|
|
{'error': 'User not found'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
|
|
class EvidenceCollectionFilter(filters.FilterSet):
|
|
"""Filter for Evidence Collection"""
|
|
|
|
incident = filters.UUIDFilter()
|
|
evidence_type = filters.ChoiceFilter(choices=EvidenceCollection.EVIDENCE_TYPES)
|
|
status = filters.ChoiceFilter(choices=EvidenceCollection.STATUS_CHOICES)
|
|
collected_by = filters.UUIDFilter()
|
|
compliance_requirement = filters.UUIDFilter()
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = EvidenceCollection
|
|
fields = ['incident', 'evidence_type', 'status', 'collected_by', 'compliance_requirement', 'search']
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(title__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(collection_notes__icontains=value)
|
|
)
|
|
|
|
|
|
class EvidenceCollectionViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Evidence Collection"""
|
|
|
|
queryset = EvidenceCollection.objects.all()
|
|
serializer_class = EvidenceCollectionSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = EvidenceCollectionFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return EvidenceCollection.objects.select_related(
|
|
'incident', 'collected_by', 'verified_by', 'workflow_instance', 'compliance_requirement'
|
|
)
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'retrieve':
|
|
return EvidenceCollectionDetailSerializer
|
|
return EvidenceCollectionSerializer
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def add_custody_record(self, request, pk=None):
|
|
"""Add custody record to evidence"""
|
|
evidence = self.get_object()
|
|
action = request.data.get('action')
|
|
notes = request.data.get('notes', '')
|
|
|
|
if not action:
|
|
return Response(
|
|
{'error': 'Action is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
evidence.add_custody_record(request.user, action, notes)
|
|
|
|
serializer = self.get_serializer(evidence)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def verify_evidence(self, request, pk=None):
|
|
"""Verify evidence"""
|
|
evidence = self.get_object()
|
|
verification_notes = request.data.get('verification_notes', '')
|
|
|
|
evidence.status = 'VERIFIED'
|
|
evidence.verified_by = request.user
|
|
evidence.updated_at = timezone.now()
|
|
evidence.save()
|
|
|
|
# Add custody record
|
|
evidence.add_custody_record(request.user, 'VERIFIED', verification_notes)
|
|
|
|
serializer = self.get_serializer(evidence)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class RetentionPolicyFilter(filters.FilterSet):
|
|
"""Filter for Retention Policy"""
|
|
|
|
policy_type = filters.ChoiceFilter(choices=RetentionPolicy.POLICY_TYPES)
|
|
is_active = filters.BooleanFilter()
|
|
applicable_frameworks = filters.UUIDFilter(field_name='applicable_frameworks')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = RetentionPolicy
|
|
fields = ['policy_type', 'is_active', 'applicable_frameworks', 'search']
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(name__icontains=value) |
|
|
Q(description__icontains=value)
|
|
)
|
|
|
|
|
|
class RetentionPolicyViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Retention Policy"""
|
|
|
|
queryset = RetentionPolicy.objects.all()
|
|
serializer_class = RetentionPolicySerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = RetentionPolicyFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return RetentionPolicy.objects.select_related('created_by').prefetch_related('applicable_frameworks')
|
|
|
|
|
|
class ExportRequestFilter(filters.FilterSet):
|
|
"""Filter for Export Request"""
|
|
|
|
request_type = filters.ChoiceFilter(choices=ExportRequest.REQUEST_TYPES)
|
|
status = filters.ChoiceFilter(choices=ExportRequest.STATUS_CHOICES)
|
|
requester_email = filters.CharFilter(lookup_expr='icontains')
|
|
overdue = filters.BooleanFilter(method='filter_overdue')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = ExportRequest
|
|
fields = ['request_type', 'status', 'requester_email', 'overdue', 'search']
|
|
|
|
def filter_overdue(self, queryset, name, value):
|
|
if value:
|
|
return queryset.filter(
|
|
due_date__lt=timezone.now(),
|
|
status__in=['PENDING', 'APPROVED', 'IN_PROGRESS']
|
|
)
|
|
return queryset
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(title__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(requester_name__icontains=value) |
|
|
Q(requester_organization__icontains=value)
|
|
)
|
|
|
|
|
|
class ExportRequestViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Export Request"""
|
|
|
|
queryset = ExportRequest.objects.all()
|
|
serializer_class = ExportRequestSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = ExportRequestFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return ExportRequest.objects.select_related(
|
|
'approved_by', 'executed_by', 'created_by'
|
|
).prefetch_related('applicable_frameworks')
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'retrieve':
|
|
return ExportRequestDetailSerializer
|
|
return ExportRequestSerializer
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def approve(self, request, pk=None):
|
|
"""Approve export request"""
|
|
export_request = self.get_object()
|
|
approval_notes = request.data.get('approval_notes', '')
|
|
|
|
if export_request.status != 'PENDING':
|
|
return Response(
|
|
{'error': 'Only pending requests can be approved'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
export_request.status = 'APPROVED'
|
|
export_request.approved_by = request.user
|
|
export_request.approved_at = timezone.now()
|
|
export_request.approval_notes = approval_notes
|
|
export_request.save()
|
|
|
|
serializer = self.get_serializer(export_request)
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def execute(self, request, pk=None):
|
|
"""Execute export request"""
|
|
export_request = self.get_object()
|
|
|
|
if export_request.status != 'APPROVED':
|
|
return Response(
|
|
{'error': 'Only approved requests can be executed'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# This would typically trigger an async task to generate the export
|
|
export_request.status = 'IN_PROGRESS'
|
|
export_request.executed_by = request.user
|
|
export_request.save()
|
|
|
|
# TODO: Implement actual export generation logic
|
|
|
|
serializer = self.get_serializer(export_request)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class ComplianceReportFilter(filters.FilterSet):
|
|
"""Filter for Compliance Report"""
|
|
|
|
framework = filters.UUIDFilter()
|
|
report_type = filters.ChoiceFilter(choices=ComplianceReport.REPORT_TYPES)
|
|
status = filters.ChoiceFilter(choices=ComplianceReport.STATUS_CHOICES)
|
|
prepared_by = filters.UUIDFilter()
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = ComplianceReport
|
|
fields = ['framework', 'report_type', 'status', 'prepared_by', 'search']
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(title__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(executive_summary__icontains=value)
|
|
)
|
|
|
|
|
|
class ComplianceReportViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Compliance Report"""
|
|
|
|
queryset = ComplianceReport.objects.all()
|
|
serializer_class = ComplianceReportSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = ComplianceReportFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return ComplianceReport.objects.select_related(
|
|
'framework', 'prepared_by', 'reviewed_by', 'approved_by'
|
|
).prefetch_related('applicable_requirements')
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'retrieve':
|
|
return ComplianceReportDetailSerializer
|
|
return ComplianceReportSerializer
|
|
|
|
|
|
class LegalHoldFilter(filters.FilterSet):
|
|
"""Filter for Legal Hold"""
|
|
|
|
status = filters.ChoiceFilter(choices=LegalHold.STATUS_CHOICES)
|
|
active = filters.BooleanFilter(method='filter_active')
|
|
search = filters.CharFilter(method='filter_search')
|
|
|
|
class Meta:
|
|
model = LegalHold
|
|
fields = ['status', 'active', 'search']
|
|
|
|
def filter_active(self, queryset, name, value):
|
|
if value:
|
|
return queryset.filter(
|
|
status='ACTIVE',
|
|
expiration_date__gt=timezone.now().date()
|
|
)
|
|
return queryset
|
|
|
|
def filter_search(self, queryset, name, value):
|
|
return queryset.filter(
|
|
Q(case_name__icontains=value) |
|
|
Q(case_number__icontains=value) |
|
|
Q(description__icontains=value) |
|
|
Q(legal_counsel__icontains=value) |
|
|
Q(law_firm__icontains=value)
|
|
)
|
|
|
|
|
|
class LegalHoldViewSet(viewsets.ModelViewSet):
|
|
"""ViewSet for Legal Hold"""
|
|
|
|
queryset = LegalHold.objects.all()
|
|
serializer_class = LegalHoldSerializer
|
|
pagination_class = CompliancePagination
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_class = LegalHoldFilter
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self):
|
|
return LegalHold.objects.select_related('created_by').prefetch_related(
|
|
'related_incidents', 'related_evidence'
|
|
)
|
|
|
|
def get_serializer_class(self):
|
|
if self.action == 'retrieve':
|
|
return LegalHoldDetailSerializer
|
|
return LegalHoldSerializer
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def add_incident(self, request, pk=None):
|
|
"""Add incident to legal hold"""
|
|
legal_hold = self.get_object()
|
|
incident_id = request.data.get('incident_id')
|
|
|
|
if not incident_id:
|
|
return Response(
|
|
{'error': 'Incident ID is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
from incident_intelligence.models import Incident
|
|
|
|
try:
|
|
incident = Incident.objects.get(id=incident_id)
|
|
legal_hold.related_incidents.add(incident)
|
|
return Response({'message': 'Incident added to legal hold successfully'})
|
|
except Incident.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Incident not found'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def add_evidence(self, request, pk=None):
|
|
"""Add evidence to legal hold"""
|
|
legal_hold = self.get_object()
|
|
evidence_id = request.data.get('evidence_id')
|
|
|
|
if not evidence_id:
|
|
return Response(
|
|
{'error': 'Evidence ID is required'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
try:
|
|
evidence = EvidenceCollection.objects.get(id=evidence_id)
|
|
legal_hold.related_evidence.add(evidence)
|
|
return Response({'message': 'Evidence added to legal hold successfully'})
|
|
except EvidenceCollection.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Evidence not found'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|