""" Management command to set up Compliance & Governance module with sample data """ from django.core.management.base import BaseCommand from django.contrib.auth import get_user_model from django.utils import timezone from datetime import date, timedelta import json from compliance_governance.models import ( RegulatoryFramework, ComplianceRequirement, RegulatoryWorkflow, RetentionPolicy, ) User = get_user_model() class Command(BaseCommand): help = 'Set up Compliance & Governance module with sample data' def add_arguments(self, parser): parser.add_argument( '--reset', action='store_true', help='Reset existing data before creating new data', ) def handle(self, *args, **options): if options['reset']: self.stdout.write('Resetting existing compliance data...') self.reset_data() self.stdout.write('Setting up Compliance & Governance module...') # Create regulatory frameworks self.create_regulatory_frameworks() # Create compliance requirements self.create_compliance_requirements() # Create regulatory workflows self.create_regulatory_workflows() # Create retention policies self.create_retention_policies() self.stdout.write( self.style.SUCCESS('Successfully set up Compliance & Governance module!') ) def reset_data(self): """Reset existing compliance data""" RetentionPolicy.objects.all().delete() RegulatoryWorkflow.objects.all().delete() ComplianceRequirement.objects.all().delete() RegulatoryFramework.objects.all().delete() def create_regulatory_frameworks(self): """Create sample regulatory frameworks""" self.stdout.write('Creating regulatory frameworks...') frameworks_data = [ { 'name': 'GDPR Compliance Framework', 'framework_type': 'GDPR', 'version': '1.0', 'description': 'General Data Protection Regulation compliance framework for EU data protection requirements.', 'applicable_regions': ['EU', 'EEA', 'UK'], 'industry_sectors': ['Technology', 'Healthcare', 'Finance', 'Retail'], 'compliance_requirements': [ 'Data Protection Impact Assessment', 'Privacy by Design', 'Data Subject Rights', 'Data Breach Notification', 'Consent Management', 'Data Processing Records' ], 'is_active': True, 'effective_date': date(2018, 5, 25), 'review_date': date(2024, 5, 25), }, { 'name': 'HIPAA Compliance Framework', 'framework_type': 'HIPAA', 'version': '1.0', 'description': 'Health Insurance Portability and Accountability Act compliance framework for healthcare data protection.', 'applicable_regions': ['US'], 'industry_sectors': ['Healthcare', 'Health Insurance', 'Healthcare Technology'], 'compliance_requirements': [ 'Administrative Safeguards', 'Physical Safeguards', 'Technical Safeguards', 'Business Associate Agreements', 'Risk Assessment', 'Incident Response' ], 'is_active': True, 'effective_date': date(1996, 8, 21), 'review_date': date(2024, 8, 21), }, { 'name': 'SOX Compliance Framework', 'framework_type': 'SOX', 'version': '1.0', 'description': 'Sarbanes-Oxley Act compliance framework for financial reporting and internal controls.', 'applicable_regions': ['US'], 'industry_sectors': ['Finance', 'Public Companies', 'Accounting'], 'compliance_requirements': [ 'Internal Controls Assessment', 'Financial Reporting Controls', 'IT General Controls', 'Management Assessment', 'External Audit', 'Documentation Requirements' ], 'is_active': True, 'effective_date': date(2002, 7, 30), 'review_date': date(2024, 7, 30), }, { 'name': 'ISO 27001 Information Security Management', 'framework_type': 'ISO27001', 'version': '1.0', 'description': 'ISO/IEC 27001 Information Security Management System standard.', 'applicable_regions': ['Global'], 'industry_sectors': ['Technology', 'Finance', 'Healthcare', 'Government', 'Manufacturing'], 'compliance_requirements': [ 'Information Security Policy', 'Risk Assessment and Treatment', 'Access Control', 'Cryptography', 'Physical Security', 'Operations Security', 'Communications Security', 'System Acquisition and Development', 'Supplier Relationships', 'Information Security Incident Management', 'Business Continuity', 'Compliance' ], 'is_active': True, 'effective_date': date(2013, 10, 1), 'review_date': date(2024, 10, 1), }, ] for framework_data in frameworks_data: framework, created = RegulatoryFramework.objects.get_or_create( name=framework_data['name'], defaults=framework_data ) if created: self.stdout.write(f' Created framework: {framework.name}') else: self.stdout.write(f' Framework already exists: {framework.name}') def create_compliance_requirements(self): """Create sample compliance requirements""" self.stdout.write('Creating compliance requirements...') # Get frameworks gdpr_framework = RegulatoryFramework.objects.get(name='GDPR Compliance Framework') hipaa_framework = RegulatoryFramework.objects.get(name='HIPAA Compliance Framework') sox_framework = RegulatoryFramework.objects.get(name='SOX Compliance Framework') iso_framework = RegulatoryFramework.objects.get(name='ISO 27001 Information Security Management') requirements_data = [ # GDPR Requirements { 'framework': gdpr_framework, 'requirement_id': 'GDPR-001', 'title': 'Data Protection Impact Assessment (DPIA)', 'description': 'Conduct Data Protection Impact Assessments for high-risk processing activities.', 'requirement_type': 'PROCEDURAL', 'priority': 'HIGH', 'implementation_guidance': 'Implement DPIA process for all new data processing activities that may result in high risk to individuals.', 'evidence_requirements': ['DPIA Documentation', 'Risk Assessment Records', 'Mitigation Plans'], 'testing_procedures': 'Review DPIA documentation and verify implementation of identified controls.', 'responsible_team': 'Privacy Team', 'next_assessment_date': date.today() + timedelta(days=90), }, { 'framework': gdpr_framework, 'requirement_id': 'GDPR-002', 'title': 'Data Subject Rights Management', 'description': 'Implement processes to handle data subject rights requests (access, rectification, erasure, etc.).', 'requirement_type': 'PROCEDURAL', 'priority': 'CRITICAL', 'implementation_guidance': 'Establish clear procedures for handling data subject requests within 30 days.', 'evidence_requirements': ['Request Handling Procedures', 'Response Templates', 'Processing Records'], 'testing_procedures': 'Test data subject request handling process and verify response times.', 'responsible_team': 'Legal and Privacy Team', 'next_assessment_date': date.today() + timedelta(days=60), }, # HIPAA Requirements { 'framework': hipaa_framework, 'requirement_id': 'HIPAA-001', 'title': 'Administrative Safeguards', 'description': 'Implement administrative safeguards including security officer designation and workforce training.', 'requirement_type': 'ADMINISTRATIVE', 'priority': 'CRITICAL', 'implementation_guidance': 'Designate security officer, implement workforce training, and establish access management procedures.', 'evidence_requirements': ['Security Officer Documentation', 'Training Records', 'Access Management Procedures'], 'testing_procedures': 'Review training records and verify access management implementation.', 'responsible_team': 'Security Team', 'next_assessment_date': date.today() + timedelta(days=120), }, { 'framework': hipaa_framework, 'requirement_id': 'HIPAA-002', 'title': 'Technical Safeguards', 'description': 'Implement technical safeguards including access control, audit controls, and encryption.', 'requirement_type': 'TECHNICAL', 'priority': 'CRITICAL', 'implementation_guidance': 'Implement access controls, audit logging, and encryption for PHI.', 'evidence_requirements': ['Access Control Documentation', 'Audit Logs', 'Encryption Implementation'], 'testing_procedures': 'Test access controls and verify audit logging functionality.', 'responsible_team': 'IT Security Team', 'next_assessment_date': date.today() + timedelta(days=90), }, # SOX Requirements { 'framework': sox_framework, 'requirement_id': 'SOX-001', 'title': 'Internal Controls Assessment', 'description': 'Assess and document internal controls over financial reporting.', 'requirement_type': 'ADMINISTRATIVE', 'priority': 'CRITICAL', 'implementation_guidance': 'Document and test internal controls related to financial reporting processes.', 'evidence_requirements': ['Control Documentation', 'Testing Results', 'Remediation Plans'], 'testing_procedures': 'Perform walkthroughs and test controls for effectiveness.', 'responsible_team': 'Internal Audit', 'next_assessment_date': date.today() + timedelta(days=180), }, # ISO 27001 Requirements { 'framework': iso_framework, 'requirement_id': 'ISO-001', 'title': 'Information Security Policy', 'description': 'Establish and maintain information security policies and procedures.', 'requirement_type': 'DOCUMENTATION', 'priority': 'HIGH', 'implementation_guidance': 'Develop comprehensive information security policies covering all aspects of the ISMS.', 'evidence_requirements': ['Security Policy Document', 'Policy Review Records', 'Approval Documentation'], 'testing_procedures': 'Review policy documentation and verify implementation across organization.', 'responsible_team': 'Information Security Team', 'next_assessment_date': date.today() + timedelta(days=365), }, { 'framework': iso_framework, 'requirement_id': 'ISO-002', 'title': 'Risk Assessment and Treatment', 'description': 'Conduct regular risk assessments and implement appropriate risk treatment measures.', 'requirement_type': 'PROCEDURAL', 'priority': 'CRITICAL', 'implementation_guidance': 'Implement systematic risk assessment process and risk treatment plans.', 'evidence_requirements': ['Risk Assessment Reports', 'Risk Treatment Plans', 'Risk Register'], 'testing_procedures': 'Review risk assessment methodology and verify implementation of treatment measures.', 'responsible_team': 'Risk Management Team', 'next_assessment_date': date.today() + timedelta(days=180), }, ] for req_data in requirements_data: requirement, created = ComplianceRequirement.objects.get_or_create( framework=req_data['framework'], requirement_id=req_data['requirement_id'], defaults=req_data ) if created: self.stdout.write(f' Created requirement: {requirement.requirement_id} - {requirement.title}') else: self.stdout.write(f' Requirement already exists: {requirement.requirement_id}') def create_regulatory_workflows(self): """Create sample regulatory workflows""" self.stdout.write('Creating regulatory workflows...') # Get frameworks gdpr_framework = RegulatoryFramework.objects.get(name='GDPR Compliance Framework') hipaa_framework = RegulatoryFramework.objects.get(name='HIPAA Compliance Framework') workflows_data = [ { 'name': 'GDPR Data Breach Response Workflow', 'workflow_type': 'DATA_BREACH', 'description': 'Workflow for handling GDPR data breach notifications and response.', 'applicable_frameworks': [gdpr_framework], 'workflow_definition': { 'steps': [ {'id': 'detect', 'name': 'Breach Detection', 'assignee': 'Security Team'}, {'id': 'assess', 'name': 'Risk Assessment', 'assignee': 'Privacy Team'}, {'id': 'notify_dpa', 'name': 'DPA Notification', 'assignee': 'Legal Team'}, {'id': 'notify_subjects', 'name': 'Data Subject Notification', 'assignee': 'Privacy Team'}, {'id': 'remediate', 'name': 'Remediation', 'assignee': 'IT Team'}, {'id': 'document', 'name': 'Documentation', 'assignee': 'Compliance Team'}, ], 'transitions': [ {'from': 'detect', 'to': 'assess', 'condition': 'breach_confirmed'}, {'from': 'assess', 'to': 'notify_dpa', 'condition': 'high_risk'}, {'from': 'assess', 'to': 'remediate', 'condition': 'low_risk'}, {'from': 'notify_dpa', 'to': 'notify_subjects', 'condition': 'dpa_notified'}, {'from': 'notify_subjects', 'to': 'remediate', 'condition': 'subjects_notified'}, {'from': 'remediate', 'to': 'document', 'condition': 'remediation_complete'}, ], 'end_steps': ['document'] }, 'triggers': ['data_breach_detected', 'gdpr_incident_created'], 'conditions': {'framework': 'GDPR', 'severity': ['HIGH', 'CRITICAL']}, 'status': 'ACTIVE', 'version': '1.0', 'notification_rules': [ {'event': 'workflow_started', 'recipients': ['privacy_team', 'legal_team']}, {'event': 'step_completed', 'recipients': ['assigned_user']}, {'event': 'workflow_completed', 'recipients': ['compliance_team']}, ], 'escalation_rules': [ {'step': 'notify_dpa', 'timeout': 24, 'escalate_to': 'legal_director'}, {'step': 'notify_subjects', 'timeout': 48, 'escalate_to': 'privacy_officer'}, ], }, { 'name': 'HIPAA Incident Response Workflow', 'workflow_type': 'INCIDENT_RESPONSE', 'description': 'Workflow for handling HIPAA security incidents and breaches.', 'applicable_frameworks': [hipaa_framework], 'workflow_definition': { 'steps': [ {'id': 'detect', 'name': 'Incident Detection', 'assignee': 'Security Team'}, {'id': 'contain', 'name': 'Containment', 'assignee': 'IT Team'}, {'id': 'investigate', 'name': 'Investigation', 'assignee': 'Security Team'}, {'id': 'assess', 'name': 'Risk Assessment', 'assignee': 'Privacy Team'}, {'id': 'notify', 'name': 'Notification', 'assignee': 'Legal Team'}, {'id': 'recover', 'name': 'Recovery', 'assignee': 'IT Team'}, {'id': 'lessons', 'name': 'Lessons Learned', 'assignee': 'Security Team'}, ], 'transitions': [ {'from': 'detect', 'to': 'contain', 'condition': 'incident_confirmed'}, {'from': 'contain', 'to': 'investigate', 'condition': 'contained'}, {'from': 'investigate', 'to': 'assess', 'condition': 'investigation_complete'}, {'from': 'assess', 'to': 'notify', 'condition': 'breach_confirmed'}, {'from': 'assess', 'to': 'recover', 'condition': 'no_breach'}, {'from': 'notify', 'to': 'recover', 'condition': 'notifications_sent'}, {'from': 'recover', 'to': 'lessons', 'condition': 'recovery_complete'}, ], 'end_steps': ['lessons'] }, 'triggers': ['hipaa_incident_created', 'phi_breach_detected'], 'conditions': {'framework': 'HIPAA'}, 'status': 'ACTIVE', 'version': '1.0', 'notification_rules': [ {'event': 'workflow_started', 'recipients': ['security_team', 'privacy_officer']}, {'event': 'breach_confirmed', 'recipients': ['legal_team', 'executive_team']}, ], 'escalation_rules': [ {'step': 'notify', 'timeout': 12, 'escalate_to': 'legal_director'}, ], }, ] for workflow_data in workflows_data: applicable_frameworks = workflow_data.pop('applicable_frameworks') workflow, created = RegulatoryWorkflow.objects.get_or_create( name=workflow_data['name'], defaults=workflow_data ) if created: workflow.applicable_frameworks.set(applicable_frameworks) self.stdout.write(f' Created workflow: {workflow.name}') else: self.stdout.write(f' Workflow already exists: {workflow.name}') def create_retention_policies(self): """Create sample retention policies""" self.stdout.write('Creating retention policies...') # Get frameworks gdpr_framework = RegulatoryFramework.objects.get(name='GDPR Compliance Framework') hipaa_framework = RegulatoryFramework.objects.get(name='HIPAA Compliance Framework') sox_framework = RegulatoryFramework.objects.get(name='SOX Compliance Framework') policies_data = [ { 'name': 'GDPR Personal Data Retention Policy', 'description': 'Retention policy for personal data under GDPR requirements.', 'policy_type': 'INCIDENT_DATA', 'applicable_frameworks': [gdpr_framework], 'retention_period': 7, 'retention_unit': 'YEARS', 'auto_archive': True, 'auto_delete': False, 'data_classification_levels': ['CONFIDENTIAL', 'RESTRICTED'], 'incident_categories': ['Data Breach', 'Privacy Incident', 'GDPR Violation'], 'legal_hold_override': True, 'is_active': True, 'effective_date': date.today(), }, { 'name': 'HIPAA PHI Retention Policy', 'description': 'Retention policy for Protected Health Information under HIPAA.', 'policy_type': 'INCIDENT_DATA', 'applicable_frameworks': [hipaa_framework], 'retention_period': 6, 'retention_unit': 'YEARS', 'auto_archive': True, 'auto_delete': False, 'data_classification_levels': ['RESTRICTED', 'TOP_SECRET'], 'incident_categories': ['HIPAA Breach', 'PHI Incident', 'Security Incident'], 'legal_hold_override': True, 'is_active': True, 'effective_date': date.today(), }, { 'name': 'SOX Financial Records Retention Policy', 'description': 'Retention policy for financial records and audit trails under SOX.', 'policy_type': 'AUDIT_LOGS', 'applicable_frameworks': [sox_framework], 'retention_period': 7, 'retention_unit': 'YEARS', 'auto_archive': True, 'auto_delete': False, 'data_classification_levels': ['CONFIDENTIAL', 'RESTRICTED'], 'incident_categories': ['Financial Incident', 'Audit Finding', 'Control Failure'], 'legal_hold_override': True, 'is_active': True, 'effective_date': date.today(), }, { 'name': 'General System Logs Retention Policy', 'description': 'Retention policy for general system logs and audit trails.', 'policy_type': 'SYSTEM_LOGS', 'applicable_frameworks': [], 'retention_period': 1, 'retention_unit': 'YEARS', 'auto_archive': True, 'auto_delete': True, 'data_classification_levels': ['PUBLIC', 'INTERNAL'], 'incident_categories': ['System Incident', 'Performance Issue', 'General Security'], 'legal_hold_override': True, 'is_active': True, 'effective_date': date.today(), }, ] for policy_data in policies_data: applicable_frameworks = policy_data.pop('applicable_frameworks') policy, created = RetentionPolicy.objects.get_or_create( name=policy_data['name'], defaults=policy_data ) if created: policy.applicable_frameworks.set(applicable_frameworks) self.stdout.write(f' Created retention policy: {policy.name}') else: self.stdout.write(f' Retention policy already exists: {policy.name}')