Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

733 lines
27 KiB
Python

"""
Views for Compliance & Governance API endpoints
"""
from rest_framework import viewsets, status, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.pagination import PageNumberPagination
from django_filters.rest_framework import DjangoFilterBackend
from django_filters import rest_framework as filters
from django.db.models import Q, Count, Avg
from django.utils import timezone
from datetime import timedelta
from ..models import (
RegulatoryFramework,
ComplianceRequirement,
RegulatoryWorkflow,
WorkflowInstance,
EvidenceCollection,
RetentionPolicy,
ExportRequest,
ComplianceReport,
LegalHold,
)
from ..serializers.compliance import (
RegulatoryFrameworkSerializer,
ComplianceRequirementSerializer,
ComplianceRequirementDetailSerializer,
RegulatoryWorkflowSerializer,
WorkflowInstanceSerializer,
WorkflowInstanceDetailSerializer,
EvidenceCollectionSerializer,
EvidenceCollectionDetailSerializer,
RetentionPolicySerializer,
ExportRequestSerializer,
ExportRequestDetailSerializer,
ComplianceReportSerializer,
ComplianceReportDetailSerializer,
LegalHoldSerializer,
LegalHoldDetailSerializer,
)
class CompliancePagination(PageNumberPagination):
"""Custom pagination for compliance endpoints"""
page_size = 20
page_size_query_param = 'page_size'
max_page_size = 100
class RegulatoryFrameworkFilter(filters.FilterSet):
"""Filter for Regulatory Framework"""
framework_type = filters.ChoiceFilter(choices=RegulatoryFramework.FRAMEWORK_TYPES)
is_active = filters.BooleanFilter()
effective_date_after = filters.DateFilter(field_name='effective_date', lookup_expr='gte')
effective_date_before = filters.DateFilter(field_name='effective_date', lookup_expr='lte')
search = filters.CharFilter(method='filter_search')
class Meta:
model = RegulatoryFramework
fields = ['framework_type', 'is_active', 'effective_date_after', 'effective_date_before', 'search']
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(name__icontains=value) |
Q(description__icontains=value) |
Q(applicable_regions__icontains=value) |
Q(industry_sectors__icontains=value)
)
class RegulatoryFrameworkViewSet(viewsets.ModelViewSet):
"""ViewSet for Regulatory Framework"""
queryset = RegulatoryFramework.objects.all()
serializer_class = RegulatoryFrameworkSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = RegulatoryFrameworkFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return RegulatoryFramework.objects.select_related('created_by').prefetch_related('requirements')
@action(detail=True, methods=['get'])
def requirements(self, request, pk=None):
"""Get all requirements for a framework"""
framework = self.get_object()
requirements = framework.requirements.all()
# Apply filters if provided
status_filter = request.query_params.get('compliance_status')
if status_filter:
requirements = requirements.filter(compliance_status=status_filter)
priority_filter = request.query_params.get('priority')
if priority_filter:
requirements = requirements.filter(priority=priority_filter)
serializer = ComplianceRequirementSerializer(requirements, many=True)
return Response(serializer.data)
@action(detail=True, methods=['get'])
def compliance_summary(self, request, pk=None):
"""Get compliance summary for a framework"""
framework = self.get_object()
requirements = framework.requirements.all()
summary = {
'total_requirements': requirements.count(),
'compliant': requirements.filter(compliance_status='COMPLIANT').count(),
'partially_compliant': requirements.filter(compliance_status='PARTIALLY_COMPLIANT').count(),
'non_compliant': requirements.filter(compliance_status='NON_COMPLIANT').count(),
'not_assessed': requirements.filter(compliance_status='NOT_ASSESSED').count(),
'implemented': requirements.filter(is_implemented=True).count(),
'overdue_assessments': requirements.filter(
next_assessment_date__lt=timezone.now().date()
).count(),
}
# Calculate compliance percentage
assessed_count = summary['compliant'] + summary['partially_compliant'] + summary['non_compliant']
if assessed_count > 0:
summary['compliance_percentage'] = (summary['compliant'] / assessed_count) * 100
else:
summary['compliance_percentage'] = 0
return Response(summary)
class ComplianceRequirementFilter(filters.FilterSet):
"""Filter for Compliance Requirement"""
framework = filters.UUIDFilter()
compliance_status = filters.ChoiceFilter(choices=ComplianceRequirement._meta.get_field('compliance_status').choices)
requirement_type = filters.ChoiceFilter(choices=ComplianceRequirement.REQUIREMENT_TYPES)
priority = filters.ChoiceFilter(choices=ComplianceRequirement.PRIORITY_LEVELS)
is_implemented = filters.BooleanFilter()
assigned_to = filters.UUIDFilter()
overdue = filters.BooleanFilter(method='filter_overdue')
search = filters.CharFilter(method='filter_search')
class Meta:
model = ComplianceRequirement
fields = ['framework', 'compliance_status', 'requirement_type', 'priority', 'is_implemented', 'assigned_to', 'overdue', 'search']
def filter_overdue(self, queryset, name, value):
if value:
return queryset.filter(next_assessment_date__lt=timezone.now().date())
return queryset
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(title__icontains=value) |
Q(description__icontains=value) |
Q(requirement_id__icontains=value) |
Q(responsible_team__icontains=value)
)
class ComplianceRequirementViewSet(viewsets.ModelViewSet):
"""ViewSet for Compliance Requirement"""
queryset = ComplianceRequirement.objects.all()
serializer_class = ComplianceRequirementSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = ComplianceRequirementFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ComplianceRequirement.objects.select_related(
'framework', 'assigned_to'
).prefetch_related('evidence_collection', 'workflow_instances')
def get_serializer_class(self):
if self.action == 'retrieve':
return ComplianceRequirementDetailSerializer
return ComplianceRequirementSerializer
@action(detail=True, methods=['post'])
def update_status(self, request, pk=None):
"""Update compliance status"""
requirement = self.get_object()
new_status = request.data.get('compliance_status')
if new_status not in dict(ComplianceRequirement._meta.get_field('compliance_status').choices):
return Response(
{'error': 'Invalid compliance status'},
status=status.HTTP_400_BAD_REQUEST
)
requirement.compliance_status = new_status
requirement.last_assessment_date = timezone.now().date()
requirement.save()
serializer = self.get_serializer(requirement)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def schedule_assessment(self, request, pk=None):
"""Schedule next assessment"""
requirement = self.get_object()
assessment_date = request.data.get('assessment_date')
if not assessment_date:
return Response(
{'error': 'Assessment date is required'},
status=status.HTTP_400_BAD_REQUEST
)
requirement.next_assessment_date = assessment_date
requirement.save()
serializer = self.get_serializer(requirement)
return Response(serializer.data)
class RegulatoryWorkflowFilter(filters.FilterSet):
"""Filter for Regulatory Workflow"""
workflow_type = filters.ChoiceFilter(choices=RegulatoryWorkflow.WORKFLOW_TYPES)
status = filters.ChoiceFilter(choices=RegulatoryWorkflow.STATUS_CHOICES)
is_template = filters.BooleanFilter()
applicable_frameworks = filters.UUIDFilter(field_name='applicable_frameworks')
search = filters.CharFilter(method='filter_search')
class Meta:
model = RegulatoryWorkflow
fields = ['workflow_type', 'status', 'is_template', 'applicable_frameworks', 'search']
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(name__icontains=value) |
Q(description__icontains=value)
)
class RegulatoryWorkflowViewSet(viewsets.ModelViewSet):
"""ViewSet for Regulatory Workflow"""
queryset = RegulatoryWorkflow.objects.all()
serializer_class = RegulatoryWorkflowSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = RegulatoryWorkflowFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return RegulatoryWorkflow.objects.select_related('created_by').prefetch_related(
'applicable_frameworks', 'instances'
)
@action(detail=True, methods=['post'])
def create_instance(self, request, pk=None):
"""Create a new workflow instance"""
workflow = self.get_object()
if workflow.status != 'ACTIVE':
return Response(
{'error': 'Cannot create instance from inactive workflow'},
status=status.HTTP_400_BAD_REQUEST
)
instance_data = request.data.copy()
instance_data['workflow'] = workflow.id
instance_data['created_by'] = request.user.id
serializer = WorkflowInstanceSerializer(data=instance_data)
if serializer.is_valid():
instance = serializer.save()
return Response(
WorkflowInstanceSerializer(instance).data,
status=status.HTTP_201_CREATED
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class WorkflowInstanceFilter(filters.FilterSet):
"""Filter for Workflow Instance"""
workflow = filters.UUIDFilter()
status = filters.ChoiceFilter(choices=WorkflowInstance.STATUS_CHOICES)
assigned_to = filters.UUIDFilter()
related_incident = filters.UUIDFilter()
overdue = filters.BooleanFilter(method='filter_overdue')
search = filters.CharFilter(method='filter_search')
class Meta:
model = WorkflowInstance
fields = ['workflow', 'status', 'assigned_to', 'related_incident', 'overdue', 'search']
def filter_overdue(self, queryset, name, value):
if value:
return queryset.filter(
due_date__lt=timezone.now(),
status__in=['PENDING', 'IN_PROGRESS']
)
return queryset
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(title__icontains=value) |
Q(description__icontains=value) |
Q(current_step__icontains=value)
)
class WorkflowInstanceViewSet(viewsets.ModelViewSet):
"""ViewSet for Workflow Instance"""
queryset = WorkflowInstance.objects.all()
serializer_class = WorkflowInstanceSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = WorkflowInstanceFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return WorkflowInstance.objects.select_related(
'workflow', 'assigned_to', 'created_by', 'related_incident', 'related_requirement'
).prefetch_related('stakeholders', 'evidence_collection')
def get_serializer_class(self):
if self.action == 'retrieve':
return WorkflowInstanceDetailSerializer
return WorkflowInstanceSerializer
@action(detail=True, methods=['post'])
def advance_step(self, request, pk=None):
"""Advance workflow to next step"""
instance = self.get_object()
next_step = request.data.get('next_step')
step_data = request.data.get('step_data', {})
if not next_step:
return Response(
{'error': 'Next step is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Add current step to completed steps
if instance.current_step:
instance.completed_steps.append({
'step': instance.current_step,
'completed_at': timezone.now().isoformat(),
'completed_by': request.user.id
})
# Update current step and execution data
instance.current_step = next_step
instance.execution_data.update(step_data)
instance.updated_at = timezone.now()
# Check if workflow is complete
workflow_definition = instance.workflow.workflow_definition
if next_step in workflow_definition.get('end_steps', []):
instance.status = 'COMPLETED'
instance.completed_at = timezone.now()
instance.save()
serializer = self.get_serializer(instance)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def assign_stakeholder(self, request, pk=None):
"""Assign stakeholder to workflow instance"""
instance = self.get_object()
stakeholder_id = request.data.get('stakeholder_id')
if not stakeholder_id:
return Response(
{'error': 'Stakeholder ID is required'},
status=status.HTTP_400_BAD_REQUEST
)
from django.contrib.auth import get_user_model
User = get_user_model()
try:
stakeholder = User.objects.get(id=stakeholder_id)
instance.stakeholders.add(stakeholder)
return Response({'message': 'Stakeholder assigned successfully'})
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
class EvidenceCollectionFilter(filters.FilterSet):
"""Filter for Evidence Collection"""
incident = filters.UUIDFilter()
evidence_type = filters.ChoiceFilter(choices=EvidenceCollection.EVIDENCE_TYPES)
status = filters.ChoiceFilter(choices=EvidenceCollection.STATUS_CHOICES)
collected_by = filters.UUIDFilter()
compliance_requirement = filters.UUIDFilter()
search = filters.CharFilter(method='filter_search')
class Meta:
model = EvidenceCollection
fields = ['incident', 'evidence_type', 'status', 'collected_by', 'compliance_requirement', 'search']
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(title__icontains=value) |
Q(description__icontains=value) |
Q(collection_notes__icontains=value)
)
class EvidenceCollectionViewSet(viewsets.ModelViewSet):
"""ViewSet for Evidence Collection"""
queryset = EvidenceCollection.objects.all()
serializer_class = EvidenceCollectionSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = EvidenceCollectionFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return EvidenceCollection.objects.select_related(
'incident', 'collected_by', 'verified_by', 'workflow_instance', 'compliance_requirement'
)
def get_serializer_class(self):
if self.action == 'retrieve':
return EvidenceCollectionDetailSerializer
return EvidenceCollectionSerializer
@action(detail=True, methods=['post'])
def add_custody_record(self, request, pk=None):
"""Add custody record to evidence"""
evidence = self.get_object()
action = request.data.get('action')
notes = request.data.get('notes', '')
if not action:
return Response(
{'error': 'Action is required'},
status=status.HTTP_400_BAD_REQUEST
)
evidence.add_custody_record(request.user, action, notes)
serializer = self.get_serializer(evidence)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def verify_evidence(self, request, pk=None):
"""Verify evidence"""
evidence = self.get_object()
verification_notes = request.data.get('verification_notes', '')
evidence.status = 'VERIFIED'
evidence.verified_by = request.user
evidence.updated_at = timezone.now()
evidence.save()
# Add custody record
evidence.add_custody_record(request.user, 'VERIFIED', verification_notes)
serializer = self.get_serializer(evidence)
return Response(serializer.data)
class RetentionPolicyFilter(filters.FilterSet):
"""Filter for Retention Policy"""
policy_type = filters.ChoiceFilter(choices=RetentionPolicy.POLICY_TYPES)
is_active = filters.BooleanFilter()
applicable_frameworks = filters.UUIDFilter(field_name='applicable_frameworks')
search = filters.CharFilter(method='filter_search')
class Meta:
model = RetentionPolicy
fields = ['policy_type', 'is_active', 'applicable_frameworks', 'search']
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(name__icontains=value) |
Q(description__icontains=value)
)
class RetentionPolicyViewSet(viewsets.ModelViewSet):
"""ViewSet for Retention Policy"""
queryset = RetentionPolicy.objects.all()
serializer_class = RetentionPolicySerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = RetentionPolicyFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return RetentionPolicy.objects.select_related('created_by').prefetch_related('applicable_frameworks')
class ExportRequestFilter(filters.FilterSet):
"""Filter for Export Request"""
request_type = filters.ChoiceFilter(choices=ExportRequest.REQUEST_TYPES)
status = filters.ChoiceFilter(choices=ExportRequest.STATUS_CHOICES)
requester_email = filters.CharFilter(lookup_expr='icontains')
overdue = filters.BooleanFilter(method='filter_overdue')
search = filters.CharFilter(method='filter_search')
class Meta:
model = ExportRequest
fields = ['request_type', 'status', 'requester_email', 'overdue', 'search']
def filter_overdue(self, queryset, name, value):
if value:
return queryset.filter(
due_date__lt=timezone.now(),
status__in=['PENDING', 'APPROVED', 'IN_PROGRESS']
)
return queryset
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(title__icontains=value) |
Q(description__icontains=value) |
Q(requester_name__icontains=value) |
Q(requester_organization__icontains=value)
)
class ExportRequestViewSet(viewsets.ModelViewSet):
"""ViewSet for Export Request"""
queryset = ExportRequest.objects.all()
serializer_class = ExportRequestSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = ExportRequestFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ExportRequest.objects.select_related(
'approved_by', 'executed_by', 'created_by'
).prefetch_related('applicable_frameworks')
def get_serializer_class(self):
if self.action == 'retrieve':
return ExportRequestDetailSerializer
return ExportRequestSerializer
@action(detail=True, methods=['post'])
def approve(self, request, pk=None):
"""Approve export request"""
export_request = self.get_object()
approval_notes = request.data.get('approval_notes', '')
if export_request.status != 'PENDING':
return Response(
{'error': 'Only pending requests can be approved'},
status=status.HTTP_400_BAD_REQUEST
)
export_request.status = 'APPROVED'
export_request.approved_by = request.user
export_request.approved_at = timezone.now()
export_request.approval_notes = approval_notes
export_request.save()
serializer = self.get_serializer(export_request)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def execute(self, request, pk=None):
"""Execute export request"""
export_request = self.get_object()
if export_request.status != 'APPROVED':
return Response(
{'error': 'Only approved requests can be executed'},
status=status.HTTP_400_BAD_REQUEST
)
# This would typically trigger an async task to generate the export
export_request.status = 'IN_PROGRESS'
export_request.executed_by = request.user
export_request.save()
# TODO: Implement actual export generation logic
serializer = self.get_serializer(export_request)
return Response(serializer.data)
class ComplianceReportFilter(filters.FilterSet):
"""Filter for Compliance Report"""
framework = filters.UUIDFilter()
report_type = filters.ChoiceFilter(choices=ComplianceReport.REPORT_TYPES)
status = filters.ChoiceFilter(choices=ComplianceReport.STATUS_CHOICES)
prepared_by = filters.UUIDFilter()
search = filters.CharFilter(method='filter_search')
class Meta:
model = ComplianceReport
fields = ['framework', 'report_type', 'status', 'prepared_by', 'search']
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(title__icontains=value) |
Q(description__icontains=value) |
Q(executive_summary__icontains=value)
)
class ComplianceReportViewSet(viewsets.ModelViewSet):
"""ViewSet for Compliance Report"""
queryset = ComplianceReport.objects.all()
serializer_class = ComplianceReportSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = ComplianceReportFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return ComplianceReport.objects.select_related(
'framework', 'prepared_by', 'reviewed_by', 'approved_by'
).prefetch_related('applicable_requirements')
def get_serializer_class(self):
if self.action == 'retrieve':
return ComplianceReportDetailSerializer
return ComplianceReportSerializer
class LegalHoldFilter(filters.FilterSet):
"""Filter for Legal Hold"""
status = filters.ChoiceFilter(choices=LegalHold.STATUS_CHOICES)
active = filters.BooleanFilter(method='filter_active')
search = filters.CharFilter(method='filter_search')
class Meta:
model = LegalHold
fields = ['status', 'active', 'search']
def filter_active(self, queryset, name, value):
if value:
return queryset.filter(
status='ACTIVE',
expiration_date__gt=timezone.now().date()
)
return queryset
def filter_search(self, queryset, name, value):
return queryset.filter(
Q(case_name__icontains=value) |
Q(case_number__icontains=value) |
Q(description__icontains=value) |
Q(legal_counsel__icontains=value) |
Q(law_firm__icontains=value)
)
class LegalHoldViewSet(viewsets.ModelViewSet):
"""ViewSet for Legal Hold"""
queryset = LegalHold.objects.all()
serializer_class = LegalHoldSerializer
pagination_class = CompliancePagination
filter_backends = [DjangoFilterBackend]
filterset_class = LegalHoldFilter
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return LegalHold.objects.select_related('created_by').prefetch_related(
'related_incidents', 'related_evidence'
)
def get_serializer_class(self):
if self.action == 'retrieve':
return LegalHoldDetailSerializer
return LegalHoldSerializer
@action(detail=True, methods=['post'])
def add_incident(self, request, pk=None):
"""Add incident to legal hold"""
legal_hold = self.get_object()
incident_id = request.data.get('incident_id')
if not incident_id:
return Response(
{'error': 'Incident ID is required'},
status=status.HTTP_400_BAD_REQUEST
)
from incident_intelligence.models import Incident
try:
incident = Incident.objects.get(id=incident_id)
legal_hold.related_incidents.add(incident)
return Response({'message': 'Incident added to legal hold successfully'})
except Incident.DoesNotExist:
return Response(
{'error': 'Incident not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=True, methods=['post'])
def add_evidence(self, request, pk=None):
"""Add evidence to legal hold"""
legal_hold = self.get_object()
evidence_id = request.data.get('evidence_id')
if not evidence_id:
return Response(
{'error': 'Evidence ID is required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
evidence = EvidenceCollection.objects.get(id=evidence_id)
legal_hold.related_evidence.add(evidence)
return Response({'message': 'Evidence added to legal hold successfully'})
except EvidenceCollection.DoesNotExist:
return Response(
{'error': 'Evidence not found'},
status=status.HTTP_404_NOT_FOUND
)