191 lines
8.8 KiB
Python
191 lines
8.8 KiB
Python
"""
|
|
Management command to set up security integration for incident intelligence
|
|
"""
|
|
from django.core.management.base import BaseCommand
|
|
from django.db import transaction
|
|
from security.models import DataClassification, Role, Permission
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from incident_intelligence.models import Incident
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Set up security integration for incident intelligence module'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'--create-permissions',
|
|
action='store_true',
|
|
help='Create incident intelligence permissions',
|
|
)
|
|
parser.add_argument(
|
|
'--create-roles',
|
|
action='store_true',
|
|
help='Create incident intelligence roles',
|
|
)
|
|
parser.add_argument(
|
|
'--assign-classifications',
|
|
action='store_true',
|
|
help='Assign data classifications to existing incidents',
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
self.stdout.write(
|
|
self.style.SUCCESS('Setting up security integration for Incident Intelligence...')
|
|
)
|
|
|
|
if options['create_permissions']:
|
|
self.create_permissions()
|
|
|
|
if options['create_roles']:
|
|
self.create_roles()
|
|
|
|
if options['assign_classifications']:
|
|
self.assign_classifications()
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS('Security integration setup completed!')
|
|
)
|
|
|
|
def create_permissions(self):
|
|
"""Create incident intelligence permissions"""
|
|
self.stdout.write('Creating incident intelligence permissions...')
|
|
|
|
# Get content type for Incident model
|
|
incident_content_type = ContentType.objects.get_for_model(Incident)
|
|
|
|
permissions_data = [
|
|
('view_incident', 'Can view incident'),
|
|
('add_incident', 'Can add incident'),
|
|
('change_incident', 'Can change incident'),
|
|
('delete_incident', 'Can delete incident'),
|
|
('analyze_incident', 'Can analyze incident'),
|
|
('view_incidentcorrelation', 'Can view incident correlation'),
|
|
('view_duplicationdetection', 'Can view duplication detection'),
|
|
('view_incidentpattern', 'Can view incident pattern'),
|
|
('approve_merge', 'Can approve incident merge'),
|
|
('reject_merge', 'Can reject incident merge'),
|
|
('resolve_pattern', 'Can resolve incident pattern'),
|
|
]
|
|
|
|
with transaction.atomic():
|
|
for codename, name in permissions_data:
|
|
permission, created = Permission.objects.get_or_create(
|
|
codename=codename,
|
|
content_type=incident_content_type,
|
|
defaults={'name': name}
|
|
)
|
|
if created:
|
|
self.stdout.write(f' Created permission: {permission.name}')
|
|
else:
|
|
self.stdout.write(f' Permission already exists: {permission.name}')
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS('Created incident intelligence permissions')
|
|
)
|
|
|
|
def create_roles(self):
|
|
"""Create incident intelligence roles"""
|
|
self.stdout.write('Creating incident intelligence roles...')
|
|
|
|
# Get permissions
|
|
incident_content_type = ContentType.objects.get_for_model(Incident)
|
|
permissions = Permission.objects.filter(content_type=incident_content_type)
|
|
|
|
# Get data classifications
|
|
public_classification = DataClassification.objects.filter(name='PUBLIC').first()
|
|
internal_classification = DataClassification.objects.filter(name='INTERNAL').first()
|
|
confidential_classification = DataClassification.objects.filter(name='CONFIDENTIAL').first()
|
|
|
|
roles_data = [
|
|
{
|
|
'name': 'Incident Viewer',
|
|
'description': 'Can view incidents and basic analysis results',
|
|
'permissions': ['view_incident', 'view_incidentcorrelation', 'view_duplicationdetection', 'view_incidentpattern'],
|
|
'classifications': [public_classification, internal_classification] if public_classification and internal_classification else []
|
|
},
|
|
{
|
|
'name': 'Incident Analyst',
|
|
'description': 'Can view and analyze incidents, trigger AI analysis',
|
|
'permissions': ['view_incident', 'add_incident', 'change_incident', 'analyze_incident', 'view_incidentcorrelation', 'view_duplicationdetection', 'view_incidentpattern'],
|
|
'classifications': [public_classification, internal_classification, confidential_classification] if all([public_classification, internal_classification, confidential_classification]) else []
|
|
},
|
|
{
|
|
'name': 'Incident Manager',
|
|
'description': 'Can manage incidents, approve merges, resolve patterns',
|
|
'permissions': ['view_incident', 'add_incident', 'change_incident', 'delete_incident', 'analyze_incident', 'view_incidentcorrelation', 'view_duplicationdetection', 'view_incidentpattern', 'approve_merge', 'reject_merge', 'resolve_pattern'],
|
|
'classifications': [public_classification, internal_classification, confidential_classification] if all([public_classification, internal_classification, confidential_classification]) else []
|
|
},
|
|
{
|
|
'name': 'Incident Administrator',
|
|
'description': 'Full access to all incident intelligence features',
|
|
'permissions': [p.codename for p in permissions],
|
|
'classifications': [public_classification, internal_classification, confidential_classification] if all([public_classification, internal_classification, confidential_classification]) else []
|
|
}
|
|
]
|
|
|
|
with transaction.atomic():
|
|
for role_data in roles_data:
|
|
role, created = Role.objects.get_or_create(
|
|
name=role_data['name'],
|
|
defaults={
|
|
'description': role_data['description'],
|
|
'is_active': True
|
|
}
|
|
)
|
|
|
|
if created:
|
|
# Assign permissions
|
|
role_permissions = Permission.objects.filter(
|
|
codename__in=role_data['permissions'],
|
|
content_type=incident_content_type
|
|
)
|
|
role.permissions.set(role_permissions)
|
|
|
|
# Assign data classifications
|
|
if role_data['classifications']:
|
|
role.data_classification_access.set(role_data['classifications'])
|
|
|
|
self.stdout.write(f' Created role: {role.name}')
|
|
else:
|
|
self.stdout.write(f' Role already exists: {role.name}')
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS('Created incident intelligence roles')
|
|
)
|
|
|
|
def assign_classifications(self):
|
|
"""Assign data classifications to existing incidents"""
|
|
self.stdout.write('Assigning data classifications to existing incidents...')
|
|
|
|
# Get data classifications
|
|
public_classification = DataClassification.objects.filter(name='PUBLIC').first()
|
|
internal_classification = DataClassification.objects.filter(name='INTERNAL').first()
|
|
confidential_classification = DataClassification.objects.filter(name='CONFIDENTIAL').first()
|
|
|
|
if not public_classification:
|
|
self.stdout.write(
|
|
self.style.WARNING('No data classifications found. Please create them first.')
|
|
)
|
|
return
|
|
|
|
with transaction.atomic():
|
|
# Assign classifications based on incident severity
|
|
incidents = Incident.objects.filter(data_classification__isnull=True)
|
|
|
|
for incident in incidents:
|
|
if incident.severity in ['CRITICAL', 'EMERGENCY']:
|
|
incident.data_classification = confidential_classification or internal_classification or public_classification
|
|
incident.security_clearance_required = True
|
|
incident.is_sensitive = True
|
|
elif incident.severity == 'HIGH':
|
|
incident.data_classification = internal_classification or public_classification
|
|
incident.is_sensitive = True
|
|
else:
|
|
incident.data_classification = public_classification
|
|
|
|
incident.save()
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f'Assigned classifications to {incidents.count()} incidents')
|
|
)
|