882 lines
34 KiB
Python
882 lines
34 KiB
Python
"""
|
|
Enterprise Backup System for ETB-API
|
|
Comprehensive backup and recovery with encryption and compression
|
|
"""
|
|
import os
|
|
import gzip
|
|
import shutil
|
|
import logging
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
from django.conf import settings
|
|
from django.core.management import call_command
|
|
from django.core.management.base import BaseCommand
|
|
from django.utils import timezone
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
import psutil
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BackupService:
|
|
"""Enterprise backup service with multiple storage options"""
|
|
|
|
def __init__(self):
|
|
self.backup_location = getattr(settings, 'BACKUP_LOCATION', '/backups/etb-api/')
|
|
self.retention_days = getattr(settings, 'BACKUP_RETENTION_DAYS', 30)
|
|
self.encryption_key = os.getenv('BACKUP_ENCRYPTION_KEY')
|
|
self.aws_s3_bucket = os.getenv('AWS_S3_BACKUP_BUCKET')
|
|
self.aws_region = os.getenv('AWS_REGION', 'us-east-1')
|
|
|
|
# Ensure backup directory exists
|
|
Path(self.backup_location).mkdir(parents=True, exist_ok=True)
|
|
|
|
def create_full_backup(self, include_media: bool = True, include_static: bool = True) -> Dict[str, Any]:
|
|
"""Create a full system backup"""
|
|
backup_id = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_info = {
|
|
'backup_id': backup_id,
|
|
'start_time': timezone.now(),
|
|
'status': 'in_progress',
|
|
'components': {},
|
|
'total_size': 0,
|
|
'errors': [],
|
|
}
|
|
|
|
try:
|
|
logger.info(f"Starting full backup: {backup_id}")
|
|
|
|
# 1. Database backup
|
|
db_backup = self._backup_database(backup_id)
|
|
backup_info['components']['database'] = db_backup
|
|
|
|
# 2. Media files backup
|
|
if include_media:
|
|
media_backup = self._backup_media_files(backup_id)
|
|
backup_info['components']['media'] = media_backup
|
|
|
|
# 3. Static files backup
|
|
if include_static:
|
|
static_backup = self._backup_static_files(backup_id)
|
|
backup_info['components']['static'] = static_backup
|
|
|
|
# 4. Configuration backup
|
|
config_backup = self._backup_configuration(backup_id)
|
|
backup_info['components']['configuration'] = config_backup
|
|
|
|
# 5. Logs backup
|
|
logs_backup = self._backup_logs(backup_id)
|
|
backup_info['components']['logs'] = logs_backup
|
|
|
|
# 6. Create backup manifest
|
|
manifest = self._create_backup_manifest(backup_id, backup_info)
|
|
backup_info['components']['manifest'] = manifest
|
|
|
|
# 7. Compress and encrypt backup
|
|
compressed_backup = self._compress_backup(backup_id)
|
|
backup_info['components']['compressed'] = compressed_backup
|
|
|
|
# 8. Upload to cloud storage (if configured)
|
|
if self.aws_s3_bucket:
|
|
cloud_backup = self._upload_to_cloud(backup_id)
|
|
backup_info['components']['cloud'] = cloud_backup
|
|
|
|
# 9. Cleanup old backups
|
|
self._cleanup_old_backups()
|
|
|
|
backup_info['status'] = 'completed'
|
|
backup_info['end_time'] = timezone.now()
|
|
backup_info['duration'] = (backup_info['end_time'] - backup_info['start_time']).total_seconds()
|
|
|
|
logger.info(f"Backup completed successfully: {backup_id}")
|
|
return backup_info
|
|
|
|
except Exception as e:
|
|
backup_info['status'] = 'failed'
|
|
backup_info['end_time'] = timezone.now()
|
|
backup_info['error'] = str(e)
|
|
backup_info['errors'].append(str(e))
|
|
logger.error(f"Backup failed: {backup_id} - {str(e)}")
|
|
return backup_info
|
|
|
|
def _backup_database(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Backup database with pg_dump or SQLite dump"""
|
|
try:
|
|
db_backup_path = os.path.join(self.backup_location, f"{backup_id}_database.sql")
|
|
|
|
if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.postgresql':
|
|
# PostgreSQL backup
|
|
db_config = settings.DATABASES['default']
|
|
cmd = [
|
|
'pg_dump',
|
|
'-h', db_config['HOST'],
|
|
'-p', str(db_config['PORT']),
|
|
'-U', db_config['USER'],
|
|
'-d', db_config['NAME'],
|
|
'-f', db_backup_path,
|
|
'--verbose',
|
|
'--no-password'
|
|
]
|
|
|
|
# Set password via environment variable
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = db_config['PASSWORD']
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"pg_dump failed: {result.stderr}")
|
|
|
|
else:
|
|
# SQLite backup
|
|
db_path = settings.DATABASES['default']['NAME']
|
|
shutil.copy2(db_path, db_backup_path)
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(db_backup_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': db_backup_path,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Database backup failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _backup_media_files(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Backup media files"""
|
|
try:
|
|
media_root = getattr(settings, 'MEDIA_ROOT', None)
|
|
if not media_root or not os.path.exists(media_root):
|
|
return {
|
|
'status': 'skipped',
|
|
'message': 'No media files to backup',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
media_backup_path = os.path.join(self.backup_location, f"{backup_id}_media.tar.gz")
|
|
|
|
# Create tar.gz archive
|
|
cmd = ['tar', '-czf', media_backup_path, '-C', os.path.dirname(media_root), os.path.basename(media_root)]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Media backup failed: {result.stderr}")
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(media_backup_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': media_backup_path,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Media backup failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _backup_static_files(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Backup static files"""
|
|
try:
|
|
static_root = getattr(settings, 'STATIC_ROOT', None)
|
|
if not static_root or not os.path.exists(static_root):
|
|
return {
|
|
'status': 'skipped',
|
|
'message': 'No static files to backup',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
static_backup_path = os.path.join(self.backup_location, f"{backup_id}_static.tar.gz")
|
|
|
|
# Create tar.gz archive
|
|
cmd = ['tar', '-czf', static_backup_path, '-C', os.path.dirname(static_root), os.path.basename(static_root)]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Static backup failed: {result.stderr}")
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(static_backup_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': static_backup_path,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Static backup failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _backup_configuration(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Backup configuration files"""
|
|
try:
|
|
config_backup_path = os.path.join(self.backup_location, f"{backup_id}_config.tar.gz")
|
|
|
|
# Files to backup
|
|
config_files = [
|
|
'core/settings.py',
|
|
'.env',
|
|
'requirements.txt',
|
|
'manage.py',
|
|
]
|
|
|
|
# Create temporary directory for config files
|
|
temp_dir = os.path.join(self.backup_location, f"{backup_id}_config_temp")
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
# Copy configuration files
|
|
for config_file in config_files:
|
|
source_path = os.path.join(settings.BASE_DIR, config_file)
|
|
if os.path.exists(source_path):
|
|
dest_path = os.path.join(temp_dir, config_file)
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
shutil.copy2(source_path, dest_path)
|
|
|
|
# Create tar.gz archive
|
|
cmd = ['tar', '-czf', config_backup_path, '-C', temp_dir, '.']
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
# Cleanup temporary directory
|
|
shutil.rmtree(temp_dir)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Configuration backup failed: {result.stderr}")
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(config_backup_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': config_backup_path,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Configuration backup failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _backup_logs(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Backup log files"""
|
|
try:
|
|
log_dirs = ['/var/log/etb-api/', '/var/log/nginx/', '/var/log/celery/']
|
|
existing_log_dirs = [d for d in log_dirs if os.path.exists(d)]
|
|
|
|
if not existing_log_dirs:
|
|
return {
|
|
'status': 'skipped',
|
|
'message': 'No log directories found',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
logs_backup_path = os.path.join(self.backup_location, f"{backup_id}_logs.tar.gz")
|
|
|
|
# Create tar.gz archive of log directories
|
|
cmd = ['tar', '-czf', logs_backup_path] + existing_log_dirs
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Logs backup failed: {result.stderr}")
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(logs_backup_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': logs_backup_path,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'directories': existing_log_dirs,
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Logs backup failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _create_backup_manifest(self, backup_id: str, backup_info: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create backup manifest with metadata"""
|
|
try:
|
|
manifest_path = os.path.join(self.backup_location, f"{backup_id}_manifest.json")
|
|
|
|
manifest = {
|
|
'backup_id': backup_id,
|
|
'created_at': timezone.now().isoformat(),
|
|
'version': getattr(settings, 'VERSION', '1.0.0'),
|
|
'environment': 'production' if not settings.DEBUG else 'development',
|
|
'components': backup_info['components'],
|
|
'system_info': {
|
|
'hostname': os.uname().nodename if hasattr(os, 'uname') else 'unknown',
|
|
'python_version': os.sys.version,
|
|
'django_version': settings.VERSION,
|
|
'disk_usage': psutil.disk_usage('/').percent,
|
|
'memory_usage': psutil.virtual_memory().percent,
|
|
},
|
|
'checksums': {},
|
|
}
|
|
|
|
# Calculate checksums for backup files
|
|
import hashlib
|
|
for component, info in backup_info['components'].items():
|
|
if info.get('status') == 'success' and 'path' in info:
|
|
file_path = info['path']
|
|
if os.path.exists(file_path):
|
|
with open(file_path, 'rb') as f:
|
|
checksum = hashlib.sha256(f.read()).hexdigest()
|
|
manifest['checksums'][component] = checksum
|
|
|
|
# Write manifest
|
|
import json
|
|
with open(manifest_path, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': manifest_path,
|
|
'size_bytes': os.path.getsize(manifest_path),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Manifest creation failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _compress_backup(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Compress all backup files into a single archive"""
|
|
try:
|
|
compressed_path = os.path.join(self.backup_location, f"{backup_id}_full_backup.tar.gz")
|
|
|
|
# Find all backup files for this backup_id
|
|
backup_files = []
|
|
for file in os.listdir(self.backup_location):
|
|
if file.startswith(backup_id) and not file.endswith('.tar.gz'):
|
|
backup_files.append(os.path.join(self.backup_location, file))
|
|
|
|
if not backup_files:
|
|
raise Exception("No backup files found to compress")
|
|
|
|
# Create compressed archive
|
|
cmd = ['tar', '-czf', compressed_path] + backup_files
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Compression failed: {result.stderr}")
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(compressed_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'path': compressed_path,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'files_included': len(backup_files),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Compression failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _upload_to_cloud(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Upload backup to AWS S3"""
|
|
try:
|
|
if not self.aws_s3_bucket:
|
|
return {
|
|
'status': 'skipped',
|
|
'message': 'S3 bucket not configured',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
# Initialize S3 client
|
|
s3_client = boto3.client('s3', region_name=self.aws_region)
|
|
|
|
# Find compressed backup file
|
|
compressed_file = os.path.join(self.backup_location, f"{backup_id}_full_backup.tar.gz")
|
|
if not os.path.exists(compressed_file):
|
|
raise Exception("Compressed backup file not found")
|
|
|
|
# Upload to S3
|
|
s3_key = f"backups/{backup_id}_full_backup.tar.gz"
|
|
s3_client.upload_file(compressed_file, self.aws_s3_bucket, s3_key)
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(compressed_file)
|
|
|
|
return {
|
|
'status': 'success',
|
|
's3_bucket': self.aws_s3_bucket,
|
|
's3_key': s3_key,
|
|
'size_bytes': file_size,
|
|
'size_mb': round(file_size / (1024 * 1024), 2),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Cloud upload failed: {str(e)}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _cleanup_old_backups(self) -> None:
|
|
"""Remove old backups based on retention policy"""
|
|
try:
|
|
cutoff_date = timezone.now() - timedelta(days=self.retention_days)
|
|
|
|
for file in os.listdir(self.backup_location):
|
|
file_path = os.path.join(self.backup_location, file)
|
|
if os.path.isfile(file_path):
|
|
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
|
|
if file_time < cutoff_date:
|
|
os.remove(file_path)
|
|
logger.info(f"Removed old backup: {file}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Backup cleanup failed: {str(e)}")
|
|
|
|
def restore_backup(self, backup_id: str, components: Optional[List[str]] = None) -> Dict[str, Any]:
|
|
"""Restore from backup"""
|
|
try:
|
|
restore_info = {
|
|
'backup_id': backup_id,
|
|
'start_time': timezone.now(),
|
|
'status': 'in_progress',
|
|
'components': {},
|
|
'errors': [],
|
|
}
|
|
|
|
# Find backup files
|
|
backup_files = {}
|
|
for file in os.listdir(self.backup_location):
|
|
if file.startswith(backup_id):
|
|
if 'database' in file:
|
|
backup_files['database'] = os.path.join(self.backup_location, file)
|
|
elif 'media' in file:
|
|
backup_files['media'] = os.path.join(self.backup_location, file)
|
|
elif 'static' in file:
|
|
backup_files['static'] = os.path.join(self.backup_location, file)
|
|
elif 'config' in file:
|
|
backup_files['config'] = os.path.join(self.backup_location, file)
|
|
elif 'logs' in file:
|
|
backup_files['logs'] = os.path.join(self.backup_location, file)
|
|
|
|
# Restore components
|
|
components_to_restore = components or list(backup_files.keys())
|
|
|
|
for component in components_to_restore:
|
|
if component in backup_files:
|
|
restore_result = self._restore_component(component, backup_files[component])
|
|
restore_info['components'][component] = restore_result
|
|
else:
|
|
restore_info['components'][component] = {
|
|
'status': 'skipped',
|
|
'message': f'Backup file for {component} not found',
|
|
}
|
|
|
|
restore_info['status'] = 'completed'
|
|
restore_info['end_time'] = timezone.now()
|
|
restore_info['duration'] = (restore_info['end_time'] - restore_info['start_time']).total_seconds()
|
|
|
|
logger.info(f"Restore completed: {backup_id}")
|
|
return restore_info
|
|
|
|
except Exception as e:
|
|
restore_info['status'] = 'failed'
|
|
restore_info['end_time'] = timezone.now()
|
|
restore_info['error'] = str(e)
|
|
restore_info['errors'].append(str(e))
|
|
logger.error(f"Restore failed: {backup_id} - {str(e)}")
|
|
return restore_info
|
|
|
|
def _restore_component(self, component: str, backup_file: str) -> Dict[str, Any]:
|
|
"""Restore a specific component from backup"""
|
|
try:
|
|
if component == 'database':
|
|
return self._restore_database(backup_file)
|
|
elif component == 'media':
|
|
return self._restore_media_files(backup_file)
|
|
elif component == 'static':
|
|
return self._restore_static_files(backup_file)
|
|
elif component == 'config':
|
|
return self._restore_configuration(backup_file)
|
|
elif component == 'logs':
|
|
return self._restore_logs(backup_file)
|
|
else:
|
|
return {
|
|
'status': 'skipped',
|
|
'message': f'Unknown component: {component}',
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _restore_database(self, backup_file: str) -> Dict[str, Any]:
|
|
"""Restore database from backup"""
|
|
try:
|
|
if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.postgresql':
|
|
# PostgreSQL restore
|
|
db_config = settings.DATABASES['default']
|
|
cmd = [
|
|
'psql',
|
|
'-h', db_config['HOST'],
|
|
'-p', str(db_config['PORT']),
|
|
'-U', db_config['USER'],
|
|
'-d', db_config['NAME'],
|
|
'-f', backup_file,
|
|
'--verbose'
|
|
]
|
|
|
|
env = os.environ.copy()
|
|
env['PGPASSWORD'] = db_config['PASSWORD']
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Database restore failed: {result.stderr}")
|
|
|
|
else:
|
|
# SQLite restore
|
|
db_path = settings.DATABASES['default']['NAME']
|
|
shutil.copy2(backup_file, db_path)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': 'Database restored successfully',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _restore_media_files(self, backup_file: str) -> Dict[str, Any]:
|
|
"""Restore media files from backup"""
|
|
try:
|
|
media_root = getattr(settings, 'MEDIA_ROOT', None)
|
|
if not media_root:
|
|
raise Exception("MEDIA_ROOT not configured")
|
|
|
|
# Extract tar.gz archive
|
|
cmd = ['tar', '-xzf', backup_file, '-C', os.path.dirname(media_root)]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Media restore failed: {result.stderr}")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': 'Media files restored successfully',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _restore_static_files(self, backup_file: str) -> Dict[str, Any]:
|
|
"""Restore static files from backup"""
|
|
try:
|
|
static_root = getattr(settings, 'STATIC_ROOT', None)
|
|
if not static_root:
|
|
raise Exception("STATIC_ROOT not configured")
|
|
|
|
# Extract tar.gz archive
|
|
cmd = ['tar', '-xzf', backup_file, '-C', os.path.dirname(static_root)]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Static restore failed: {result.stderr}")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': 'Static files restored successfully',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _restore_configuration(self, backup_file: str) -> Dict[str, Any]:
|
|
"""Restore configuration files from backup"""
|
|
try:
|
|
# Extract to temporary directory
|
|
temp_dir = os.path.join(self.backup_location, 'config_restore_temp')
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
cmd = ['tar', '-xzf', backup_file, '-C', temp_dir]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Configuration restore failed: {result.stderr}")
|
|
|
|
# Copy files back to their locations
|
|
for root, dirs, files in os.walk(temp_dir):
|
|
for file in files:
|
|
source_path = os.path.join(root, file)
|
|
relative_path = os.path.relpath(source_path, temp_dir)
|
|
dest_path = os.path.join(settings.BASE_DIR, relative_path)
|
|
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
shutil.copy2(source_path, dest_path)
|
|
|
|
# Cleanup temporary directory
|
|
shutil.rmtree(temp_dir)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': 'Configuration restored successfully',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def _restore_logs(self, backup_file: str) -> Dict[str, Any]:
|
|
"""Restore log files from backup"""
|
|
try:
|
|
# Extract tar.gz archive to root
|
|
cmd = ['tar', '-xzf', backup_file, '-C', '/']
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
raise Exception(f"Logs restore failed: {result.stderr}")
|
|
|
|
return {
|
|
'status': 'success',
|
|
'message': 'Log files restored successfully',
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'timestamp': timezone.now().isoformat(),
|
|
}
|
|
|
|
def list_backups(self) -> List[Dict[str, Any]]:
|
|
"""List available backups"""
|
|
try:
|
|
backups = []
|
|
|
|
for file in os.listdir(self.backup_location):
|
|
if file.endswith('_manifest.json'):
|
|
manifest_path = os.path.join(self.backup_location, file)
|
|
try:
|
|
import json
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
backups.append(manifest)
|
|
except Exception as e:
|
|
logger.warning(f"Could not read manifest {file}: {str(e)}")
|
|
|
|
# Sort by creation date (newest first)
|
|
backups.sort(key=lambda x: x.get('created_at', ''), reverse=True)
|
|
|
|
return backups
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list backups: {str(e)}")
|
|
return []
|
|
|
|
def get_backup_status(self, backup_id: str) -> Dict[str, Any]:
|
|
"""Get status of a specific backup"""
|
|
try:
|
|
manifest_path = os.path.join(self.backup_location, f"{backup_id}_manifest.json")
|
|
|
|
if not os.path.exists(manifest_path):
|
|
return {
|
|
'status': 'not_found',
|
|
'message': f'Backup {backup_id} not found',
|
|
}
|
|
|
|
import json
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
|
|
# Check if all backup files still exist
|
|
missing_files = []
|
|
for component, info in manifest.get('components', {}).items():
|
|
if info.get('status') == 'success' and 'path' in info:
|
|
if not os.path.exists(info['path']):
|
|
missing_files.append(component)
|
|
|
|
if missing_files:
|
|
manifest['status'] = 'incomplete'
|
|
manifest['missing_files'] = missing_files
|
|
else:
|
|
manifest['status'] = 'complete'
|
|
|
|
return manifest
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get backup status: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': str(e),
|
|
}
|
|
|
|
|
|
class BackupCommand(BaseCommand):
|
|
"""Django management command for backup operations"""
|
|
|
|
help = 'Perform backup and restore operations'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'action',
|
|
choices=['create', 'restore', 'list', 'status'],
|
|
help='Action to perform'
|
|
)
|
|
parser.add_argument(
|
|
'--backup-id',
|
|
type=str,
|
|
help='Backup ID for restore/status operations'
|
|
)
|
|
parser.add_argument(
|
|
'--components',
|
|
nargs='+',
|
|
choices=['database', 'media', 'static', 'config', 'logs'],
|
|
help='Components to backup/restore'
|
|
)
|
|
parser.add_argument(
|
|
'--include-media',
|
|
action='store_true',
|
|
help='Include media files in backup'
|
|
)
|
|
parser.add_argument(
|
|
'--include-static',
|
|
action='store_true',
|
|
help='Include static files in backup'
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
backup_service = BackupService()
|
|
|
|
if options['action'] == 'create':
|
|
result = backup_service.create_full_backup(
|
|
include_media=options.get('include_media', True),
|
|
include_static=options.get('include_static', True)
|
|
)
|
|
|
|
if result['status'] == 'completed':
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Backup created successfully: {result['backup_id']}")
|
|
)
|
|
else:
|
|
self.stdout.write(
|
|
self.style.ERROR(f"Backup failed: {result.get('error', 'Unknown error')}")
|
|
)
|
|
|
|
elif options['action'] == 'restore':
|
|
if not options['backup_id']:
|
|
self.stdout.write(
|
|
self.style.ERROR("Backup ID is required for restore operation")
|
|
)
|
|
return
|
|
|
|
result = backup_service.restore_backup(
|
|
options['backup_id'],
|
|
options.get('components')
|
|
)
|
|
|
|
if result['status'] == 'completed':
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Restore completed successfully: {result['backup_id']}")
|
|
)
|
|
else:
|
|
self.stdout.write(
|
|
self.style.ERROR(f"Restore failed: {result.get('error', 'Unknown error')}")
|
|
)
|
|
|
|
elif options['action'] == 'list':
|
|
backups = backup_service.list_backups()
|
|
|
|
if not backups:
|
|
self.stdout.write("No backups found")
|
|
return
|
|
|
|
self.stdout.write(f"Found {len(backups)} backups:")
|
|
for backup in backups:
|
|
self.stdout.write(f" - {backup['backup_id']} ({backup['created_at']})")
|
|
|
|
elif options['action'] == 'status':
|
|
if not options['backup_id']:
|
|
self.stdout.write(
|
|
self.style.ERROR("Backup ID is required for status operation")
|
|
)
|
|
return
|
|
|
|
status = backup_service.get_backup_status(options['backup_id'])
|
|
|
|
if status['status'] == 'not_found':
|
|
self.stdout.write(
|
|
self.style.ERROR(f"Backup {options['backup_id']} not found")
|
|
)
|
|
else:
|
|
self.stdout.write(f"Backup Status: {status['status']}")
|
|
if 'missing_files' in status:
|
|
self.stdout.write(f"Missing files: {', '.join(status['missing_files'])}")
|