Files
ETB/ETB-API/incident_intelligence/management/commands/setup_security_integration.py
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

191 lines
8.8 KiB
Python

"""
Management command to set up security integration for incident intelligence
"""
from django.core.management.base import BaseCommand
from django.db import transaction
from security.models import DataClassification, Role, Permission
from django.contrib.contenttypes.models import ContentType
from incident_intelligence.models import Incident
class Command(BaseCommand):
help = 'Set up security integration for incident intelligence module'
def add_arguments(self, parser):
parser.add_argument(
'--create-permissions',
action='store_true',
help='Create incident intelligence permissions',
)
parser.add_argument(
'--create-roles',
action='store_true',
help='Create incident intelligence roles',
)
parser.add_argument(
'--assign-classifications',
action='store_true',
help='Assign data classifications to existing incidents',
)
def handle(self, *args, **options):
self.stdout.write(
self.style.SUCCESS('Setting up security integration for Incident Intelligence...')
)
if options['create_permissions']:
self.create_permissions()
if options['create_roles']:
self.create_roles()
if options['assign_classifications']:
self.assign_classifications()
self.stdout.write(
self.style.SUCCESS('Security integration setup completed!')
)
def create_permissions(self):
"""Create incident intelligence permissions"""
self.stdout.write('Creating incident intelligence permissions...')
# Get content type for Incident model
incident_content_type = ContentType.objects.get_for_model(Incident)
permissions_data = [
('view_incident', 'Can view incident'),
('add_incident', 'Can add incident'),
('change_incident', 'Can change incident'),
('delete_incident', 'Can delete incident'),
('analyze_incident', 'Can analyze incident'),
('view_incidentcorrelation', 'Can view incident correlation'),
('view_duplicationdetection', 'Can view duplication detection'),
('view_incidentpattern', 'Can view incident pattern'),
('approve_merge', 'Can approve incident merge'),
('reject_merge', 'Can reject incident merge'),
('resolve_pattern', 'Can resolve incident pattern'),
]
with transaction.atomic():
for codename, name in permissions_data:
permission, created = Permission.objects.get_or_create(
codename=codename,
content_type=incident_content_type,
defaults={'name': name}
)
if created:
self.stdout.write(f' Created permission: {permission.name}')
else:
self.stdout.write(f' Permission already exists: {permission.name}')
self.stdout.write(
self.style.SUCCESS('Created incident intelligence permissions')
)
def create_roles(self):
"""Create incident intelligence roles"""
self.stdout.write('Creating incident intelligence roles...')
# Get permissions
incident_content_type = ContentType.objects.get_for_model(Incident)
permissions = Permission.objects.filter(content_type=incident_content_type)
# Get data classifications
public_classification = DataClassification.objects.filter(name='PUBLIC').first()
internal_classification = DataClassification.objects.filter(name='INTERNAL').first()
confidential_classification = DataClassification.objects.filter(name='CONFIDENTIAL').first()
roles_data = [
{
'name': 'Incident Viewer',
'description': 'Can view incidents and basic analysis results',
'permissions': ['view_incident', 'view_incidentcorrelation', 'view_duplicationdetection', 'view_incidentpattern'],
'classifications': [public_classification, internal_classification] if public_classification and internal_classification else []
},
{
'name': 'Incident Analyst',
'description': 'Can view and analyze incidents, trigger AI analysis',
'permissions': ['view_incident', 'add_incident', 'change_incident', 'analyze_incident', 'view_incidentcorrelation', 'view_duplicationdetection', 'view_incidentpattern'],
'classifications': [public_classification, internal_classification, confidential_classification] if all([public_classification, internal_classification, confidential_classification]) else []
},
{
'name': 'Incident Manager',
'description': 'Can manage incidents, approve merges, resolve patterns',
'permissions': ['view_incident', 'add_incident', 'change_incident', 'delete_incident', 'analyze_incident', 'view_incidentcorrelation', 'view_duplicationdetection', 'view_incidentpattern', 'approve_merge', 'reject_merge', 'resolve_pattern'],
'classifications': [public_classification, internal_classification, confidential_classification] if all([public_classification, internal_classification, confidential_classification]) else []
},
{
'name': 'Incident Administrator',
'description': 'Full access to all incident intelligence features',
'permissions': [p.codename for p in permissions],
'classifications': [public_classification, internal_classification, confidential_classification] if all([public_classification, internal_classification, confidential_classification]) else []
}
]
with transaction.atomic():
for role_data in roles_data:
role, created = Role.objects.get_or_create(
name=role_data['name'],
defaults={
'description': role_data['description'],
'is_active': True
}
)
if created:
# Assign permissions
role_permissions = Permission.objects.filter(
codename__in=role_data['permissions'],
content_type=incident_content_type
)
role.permissions.set(role_permissions)
# Assign data classifications
if role_data['classifications']:
role.data_classification_access.set(role_data['classifications'])
self.stdout.write(f' Created role: {role.name}')
else:
self.stdout.write(f' Role already exists: {role.name}')
self.stdout.write(
self.style.SUCCESS('Created incident intelligence roles')
)
def assign_classifications(self):
"""Assign data classifications to existing incidents"""
self.stdout.write('Assigning data classifications to existing incidents...')
# Get data classifications
public_classification = DataClassification.objects.filter(name='PUBLIC').first()
internal_classification = DataClassification.objects.filter(name='INTERNAL').first()
confidential_classification = DataClassification.objects.filter(name='CONFIDENTIAL').first()
if not public_classification:
self.stdout.write(
self.style.WARNING('No data classifications found. Please create them first.')
)
return
with transaction.atomic():
# Assign classifications based on incident severity
incidents = Incident.objects.filter(data_classification__isnull=True)
for incident in incidents:
if incident.severity in ['CRITICAL', 'EMERGENCY']:
incident.data_classification = confidential_classification or internal_classification or public_classification
incident.security_clearance_required = True
incident.is_sensitive = True
elif incident.severity == 'HIGH':
incident.data_classification = internal_classification or public_classification
incident.is_sensitive = True
else:
incident.data_classification = public_classification
incident.save()
self.stdout.write(
self.style.SUCCESS(f'Assigned classifications to {incidents.count()} incidents')
)