Files
Iliyan Angelov 6b247e5b9f Updates
2025-09-19 11:58:53 +03:00

680 lines
24 KiB
Python

"""
Automation & Orchestration models for Enterprise Incident Management API
Implements runbooks, integrations, ChatOps, auto-remediation, and maintenance scheduling
"""
import uuid
import json
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.utils import timezone
from django.core.exceptions import ValidationError
User = get_user_model()
class Runbook(models.Model):
"""Predefined response steps for incident automation"""
TRIGGER_TYPES = [
('MANUAL', 'Manual Trigger'),
('AUTOMATIC', 'Automatic Trigger'),
('SCHEDULED', 'Scheduled Trigger'),
('WEBHOOK', 'Webhook Trigger'),
('CHATOPS', 'ChatOps Trigger'),
]
STATUS_CHOICES = [
('DRAFT', 'Draft'),
('ACTIVE', 'Active'),
('INACTIVE', 'Inactive'),
('DEPRECATED', 'Deprecated'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, unique=True)
description = models.TextField()
version = models.CharField(max_length=20, default='1.0')
# Trigger configuration
trigger_type = models.CharField(max_length=20, choices=TRIGGER_TYPES, default='MANUAL')
trigger_conditions = models.JSONField(
default=dict,
help_text="Conditions that trigger this runbook (incident severity, category, etc.)"
)
# Runbook content
steps = models.JSONField(
default=list,
help_text="List of steps to execute in order"
)
estimated_duration = models.DurationField(help_text="Estimated time to complete")
# Categorization
category = models.CharField(max_length=100, blank=True, null=True)
tags = models.JSONField(default=list, help_text="Tags for categorization and search")
# Status and metadata
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT')
is_public = models.BooleanField(default=True, help_text="Whether this runbook is available to all users")
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, related_name='created_runbooks')
last_modified_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, related_name='modified_runbooks')
# Execution tracking
execution_count = models.PositiveIntegerField(default=0)
success_rate = models.FloatField(
validators=[MinValueValidator(0.0), MaxValueValidator(1.0)],
default=0.0,
help_text="Success rate of runbook executions (0.0-1.0)"
)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
last_executed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['status', 'trigger_type']),
models.Index(fields=['category']),
models.Index(fields=['created_at']),
]
def __str__(self):
return f"{self.name} v{self.version}"
def can_be_triggered_by(self, user: User) -> bool:
"""Check if user can trigger this runbook"""
if not self.is_public and self.created_by != user:
return False
return self.status == 'ACTIVE'
class RunbookExecution(models.Model):
"""Execution log for runbook runs"""
STATUS_CHOICES = [
('PENDING', 'Pending'),
('RUNNING', 'Running'),
('COMPLETED', 'Completed'),
('FAILED', 'Failed'),
('CANCELLED', 'Cancelled'),
('TIMEOUT', 'Timeout'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
runbook = models.ForeignKey(Runbook, on_delete=models.CASCADE, related_name='executions')
# Execution context
triggered_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
trigger_type = models.CharField(max_length=20, choices=Runbook.TRIGGER_TYPES)
trigger_data = models.JSONField(default=dict, help_text="Data that triggered the execution")
# Related incident (if applicable)
incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='runbook_executions'
)
# SLA Integration
sla_instance = models.ForeignKey(
'sla_oncall.SLAInstance',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='runbook_executions',
help_text="SLA instance that triggered this runbook execution"
)
# Execution details
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
current_step = models.PositiveIntegerField(default=0)
total_steps = models.PositiveIntegerField()
# Results
execution_log = models.JSONField(default=list, help_text="Detailed execution log")
error_message = models.TextField(blank=True, null=True)
output_data = models.JSONField(default=dict, help_text="Output data from execution")
# Performance metrics
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
duration = models.DurationField(null=True, blank=True)
class Meta:
ordering = ['-started_at']
indexes = [
models.Index(fields=['runbook', 'status']),
models.Index(fields=['triggered_by', 'started_at']),
models.Index(fields=['incident', 'status']),
]
def __str__(self):
return f"Execution of {self.runbook.name} - {self.status}"
@property
def is_running(self):
return self.status == 'RUNNING'
@property
def is_completed(self):
return self.status in ['COMPLETED', 'FAILED', 'CANCELLED', 'TIMEOUT']
class Integration(models.Model):
"""External system integrations (ITSM/CI/CD tools)"""
INTEGRATION_TYPES = [
('JIRA', 'Jira'),
('GITHUB', 'GitHub'),
('JENKINS', 'Jenkins'),
('SERVICENOW', 'ServiceNow'),
('ANSIBLE', 'Ansible'),
('TERRAFORM', 'Terraform'),
('SLACK', 'Slack'),
('TEAMS', 'Microsoft Teams'),
('WEBHOOK', 'Generic Webhook'),
('API', 'Generic API'),
]
STATUS_CHOICES = [
('ACTIVE', 'Active'),
('INACTIVE', 'Inactive'),
('ERROR', 'Error'),
('CONFIGURING', 'Configuring'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, unique=True)
integration_type = models.CharField(max_length=20, choices=INTEGRATION_TYPES)
description = models.TextField(blank=True, null=True)
# Configuration
configuration = models.JSONField(
default=dict,
help_text="Integration-specific configuration (API keys, URLs, etc.)"
)
authentication_config = models.JSONField(
default=dict,
help_text="Authentication configuration (OAuth, API keys, etc.)"
)
# Status and health
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='CONFIGURING')
last_health_check = models.DateTimeField(null=True, blank=True)
health_status = models.CharField(
max_length=20,
choices=[
('HEALTHY', 'Healthy'),
('WARNING', 'Warning'),
('ERROR', 'Error'),
('UNKNOWN', 'Unknown'),
],
default='UNKNOWN'
)
error_message = models.TextField(blank=True, null=True)
# Usage tracking
request_count = models.PositiveIntegerField(default=0)
last_used_at = models.DateTimeField(null=True, blank=True)
# Metadata
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['integration_type', 'status']),
models.Index(fields=['status', 'health_status']),
]
def __str__(self):
return f"{self.name} ({self.integration_type})"
def is_healthy(self) -> bool:
"""Check if integration is healthy and ready to use"""
return self.status == 'ACTIVE' and self.health_status == 'HEALTHY'
class ChatOpsIntegration(models.Model):
"""ChatOps integration for triggering workflows from chat platforms"""
PLATFORM_CHOICES = [
('SLACK', 'Slack'),
('TEAMS', 'Microsoft Teams'),
('DISCORD', 'Discord'),
('MATTERMOST', 'Mattermost'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, unique=True)
platform = models.CharField(max_length=20, choices=PLATFORM_CHOICES)
# Platform configuration
webhook_url = models.URLField(help_text="Webhook URL for the chat platform")
bot_token = models.CharField(max_length=500, help_text="Bot authentication token")
channel_id = models.CharField(max_length=100, help_text="Default channel ID")
# Command configuration
command_prefix = models.CharField(max_length=10, default='!', help_text="Command prefix (e.g., !, /)")
available_commands = models.JSONField(
default=list,
help_text="List of available commands and their descriptions"
)
# Security
allowed_users = models.JSONField(
default=list,
help_text="List of user IDs allowed to use commands"
)
allowed_channels = models.JSONField(
default=list,
help_text="List of channel IDs where commands are allowed"
)
# Status
is_active = models.BooleanField(default=True)
last_activity = models.DateTimeField(null=True, blank=True)
# Metadata
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['platform', 'is_active']),
]
def __str__(self):
return f"{self.name} ({self.platform})"
class ChatOpsCommand(models.Model):
"""Individual ChatOps commands and their execution"""
STATUS_CHOICES = [
('PENDING', 'Pending'),
('EXECUTING', 'Executing'),
('COMPLETED', 'Completed'),
('FAILED', 'Failed'),
('CANCELLED', 'Cancelled'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
chatops_integration = models.ForeignKey(ChatOpsIntegration, on_delete=models.CASCADE, related_name='commands')
# Command details
command = models.CharField(max_length=100, help_text="The command that was executed")
arguments = models.JSONField(default=list, help_text="Command arguments")
user_id = models.CharField(max_length=100, help_text="User ID from chat platform")
channel_id = models.CharField(max_length=100, help_text="Channel ID where command was executed")
# Execution context
triggered_runbook = models.ForeignKey(
Runbook,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='chatops_triggers'
)
related_incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='chatops_commands'
)
# Execution results
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
response_message = models.TextField(blank=True, null=True)
execution_log = models.JSONField(default=list, help_text="Detailed execution log")
error_message = models.TextField(blank=True, null=True)
# Timestamps
executed_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-executed_at']
indexes = [
models.Index(fields=['chatops_integration', 'status']),
models.Index(fields=['user_id', 'executed_at']),
models.Index(fields=['channel_id', 'executed_at']),
]
def __str__(self):
return f"{self.command} by {self.user_id} - {self.status}"
class AutoRemediation(models.Model):
"""Auto-remediation hooks for automatic incident response"""
REMEDIATION_TYPES = [
('SERVICE_RESTART', 'Service Restart'),
('DEPLOYMENT_ROLLBACK', 'Deployment Rollback'),
('SCALE_UP', 'Scale Up Resources'),
('SCALE_DOWN', 'Scale Down Resources'),
('CACHE_CLEAR', 'Clear Cache'),
('CONFIG_UPDATE', 'Configuration Update'),
('CUSTOM_SCRIPT', 'Custom Script'),
('WEBHOOK', 'Webhook Call'),
]
TRIGGER_CONDITIONS = [
('SEVERITY', 'Incident Severity'),
('CATEGORY', 'Incident Category'),
('SERVICE', 'Affected Service'),
('DURATION', 'Incident Duration'),
('PATTERN', 'Pattern Match'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, unique=True)
description = models.TextField()
remediation_type = models.CharField(max_length=30, choices=REMEDIATION_TYPES)
# Trigger configuration
trigger_conditions = models.JSONField(
default=dict,
help_text="Conditions that trigger this remediation"
)
trigger_condition_type = models.CharField(max_length=20, choices=TRIGGER_CONDITIONS)
# Remediation configuration
remediation_config = models.JSONField(
default=dict,
help_text="Configuration for the remediation action"
)
timeout_seconds = models.PositiveIntegerField(default=300, help_text="Timeout for remediation action")
# Safety and approval
requires_approval = models.BooleanField(default=False, help_text="Whether manual approval is required")
approval_users = models.ManyToManyField(User, blank=True, related_name='approvable_remediations', help_text="Users who can approve this remediation")
max_executions_per_incident = models.PositiveIntegerField(default=1, help_text="Max times this can run per incident")
# Status and metadata
is_active = models.BooleanField(default=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, related_name='created_auto_remediations')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# Execution tracking
execution_count = models.PositiveIntegerField(default=0)
success_count = models.PositiveIntegerField(default=0)
last_executed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['remediation_type', 'is_active']),
models.Index(fields=['trigger_condition_type']),
]
def __str__(self):
return f"{self.name} ({self.remediation_type})"
@property
def success_rate(self):
if self.execution_count == 0:
return 0.0
return self.success_count / self.execution_count
class AutoRemediationExecution(models.Model):
"""Execution log for auto-remediation actions"""
STATUS_CHOICES = [
('PENDING', 'Pending'),
('APPROVED', 'Approved'),
('EXECUTING', 'Executing'),
('COMPLETED', 'Completed'),
('FAILED', 'Failed'),
('CANCELLED', 'Cancelled'),
('TIMEOUT', 'Timeout'),
('REJECTED', 'Rejected'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
auto_remediation = models.ForeignKey(AutoRemediation, on_delete=models.CASCADE, related_name='executions')
# Related incident
incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.CASCADE,
related_name='auto_remediations'
)
# SLA Integration
sla_instance = models.ForeignKey(
'sla_oncall.SLAInstance',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='auto_remediations',
help_text="SLA instance related to this auto-remediation"
)
# Execution details
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
trigger_data = models.JSONField(default=dict, help_text="Data that triggered the remediation")
# Approval workflow
approved_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, related_name='approved_remediations')
approved_at = models.DateTimeField(null=True, blank=True)
approval_notes = models.TextField(blank=True, null=True)
# Execution results
execution_log = models.JSONField(default=list, help_text="Detailed execution log")
output_data = models.JSONField(default=dict, help_text="Output data from remediation")
error_message = models.TextField(blank=True, null=True)
# Timestamps
triggered_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
duration = models.DurationField(null=True, blank=True)
class Meta:
ordering = ['-triggered_at']
indexes = [
models.Index(fields=['auto_remediation', 'status']),
models.Index(fields=['incident', 'status']),
models.Index(fields=['triggered_at']),
]
def __str__(self):
return f"Remediation {self.auto_remediation.name} for {self.incident.title} - {self.status}"
class MaintenanceWindow(models.Model):
"""Scheduled maintenance windows to suppress alerts"""
STATUS_CHOICES = [
('SCHEDULED', 'Scheduled'),
('ACTIVE', 'Active'),
('COMPLETED', 'Completed'),
('CANCELLED', 'Cancelled'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
description = models.TextField()
# Schedule
start_time = models.DateTimeField(help_text="When maintenance window starts")
end_time = models.DateTimeField(help_text="When maintenance window ends")
timezone = models.CharField(max_length=50, default='UTC')
# Scope
affected_services = models.JSONField(
default=list,
help_text="List of services affected by this maintenance"
)
affected_components = models.JSONField(
default=list,
help_text="List of components affected by this maintenance"
)
# Alert suppression
suppress_incident_creation = models.BooleanField(default=True)
suppress_notifications = models.BooleanField(default=True)
suppress_escalations = models.BooleanField(default=True)
# Status and metadata
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='SCHEDULED')
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# Execution tracking
incidents_suppressed = models.PositiveIntegerField(default=0)
notifications_suppressed = models.PositiveIntegerField(default=0)
class Meta:
ordering = ['start_time']
indexes = [
models.Index(fields=['start_time', 'end_time']),
models.Index(fields=['status']),
]
def __str__(self):
return f"{self.name} ({self.start_time} - {self.end_time})"
def is_active(self) -> bool:
"""Check if maintenance window is currently active"""
now = timezone.now()
return self.start_time <= now <= self.end_time and self.status == 'ACTIVE'
def is_scheduled(self) -> bool:
"""Check if maintenance window is scheduled for the future"""
now = timezone.now()
return self.start_time > now and self.status == 'SCHEDULED'
def clean(self):
"""Validate maintenance window data"""
if self.start_time >= self.end_time:
raise ValidationError("Start time must be before end time")
class WorkflowTemplate(models.Model):
"""Reusable workflow templates for common automation scenarios"""
TEMPLATE_TYPES = [
('INCIDENT_RESPONSE', 'Incident Response'),
('DEPLOYMENT', 'Deployment'),
('MAINTENANCE', 'Maintenance'),
('SCALING', 'Scaling'),
('MONITORING', 'Monitoring'),
('CUSTOM', 'Custom'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200, unique=True)
description = models.TextField()
template_type = models.CharField(max_length=30, choices=TEMPLATE_TYPES)
# Template content
workflow_steps = models.JSONField(
default=list,
help_text="List of workflow steps with conditions and actions"
)
input_parameters = models.JSONField(
default=list,
help_text="Required input parameters for the workflow"
)
output_schema = models.JSONField(
default=dict,
help_text="Expected output schema"
)
# Usage and metadata
usage_count = models.PositiveIntegerField(default=0)
is_public = models.BooleanField(default=True)
created_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['name']
indexes = [
models.Index(fields=['template_type', 'is_public']),
]
def __str__(self):
return f"{self.name} ({self.template_type})"
class WorkflowExecution(models.Model):
"""Execution of workflow templates"""
STATUS_CHOICES = [
('PENDING', 'Pending'),
('RUNNING', 'Running'),
('COMPLETED', 'Completed'),
('FAILED', 'Failed'),
('CANCELLED', 'Cancelled'),
('PAUSED', 'Paused'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
workflow_template = models.ForeignKey(WorkflowTemplate, on_delete=models.CASCADE, related_name='executions')
# Execution context
name = models.CharField(max_length=200, help_text="Name for this execution instance")
triggered_by = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
trigger_type = models.CharField(max_length=20, choices=Runbook.TRIGGER_TYPES)
# Related objects
related_incident = models.ForeignKey(
'incident_intelligence.Incident',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='workflow_executions'
)
related_maintenance = models.ForeignKey(
MaintenanceWindow,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='workflow_executions'
)
# Execution state
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING')
current_step = models.PositiveIntegerField(default=0)
total_steps = models.PositiveIntegerField()
# Input/Output
input_data = models.JSONField(default=dict, help_text="Input data for the workflow")
output_data = models.JSONField(default=dict, help_text="Output data from the workflow")
execution_log = models.JSONField(default=list, help_text="Detailed execution log")
error_message = models.TextField(blank=True, null=True)
# Timestamps
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
duration = models.DurationField(null=True, blank=True)
class Meta:
ordering = ['-started_at']
indexes = [
models.Index(fields=['workflow_template', 'status']),
models.Index(fields=['triggered_by', 'started_at']),
models.Index(fields=['related_incident', 'status']),
]
def __str__(self):
return f"Workflow {self.name} - {self.status}"