Add backup routes module with endpoints for backup management

This commit is contained in:
Iliyan Angelov
2025-12-01 03:58:18 +02:00
parent 1a103a769f
commit 91f51bc6fe
2 changed files with 177 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
"""
System routes module.
"""
from . import system_settings_routes
from . import workflow_routes
from . import task_routes
from . import approval_routes
from . import backup_routes
__all__ = [
'system_settings_routes',
'workflow_routes',
'task_routes',
'approval_routes',
'backup_routes',
]

View File

@@ -0,0 +1,160 @@
"""
Backup routes for database backup management.
"""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from pathlib import Path
from ...shared.config.database import get_db
from ...shared.config.logging_config import get_logger
from ...shared.config.settings import settings
from ...security.middleware.auth import get_current_user, authorize_roles
from ...auth.models.user import User
from ..services.backup_service import backup_service
from ...shared.utils.response_helpers import success_response
logger = get_logger(__name__)
router = APIRouter(prefix='/backups', tags=['backups'])
@router.post('/create')
async def create_backup(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Create a new database backup."""
try:
backup_metadata = backup_service.create_backup(
db_name=settings.DB_NAME,
db_user=settings.DB_USER,
db_password=settings.DB_PASS,
db_host=settings.DB_HOST,
db_port=settings.DB_PORT
)
logger.info(f'Backup created by user {current_user.id}: {backup_metadata["filename"]}')
return success_response(
message='Backup created successfully',
data=backup_metadata
)
except FileNotFoundError as e:
logger.error(f'Backup creation failed: {str(e)}')
raise HTTPException(
status_code=503,
detail={
'message': str(e),
'requires_installation': True
}
)
except Exception as e:
logger.error(f'Backup creation failed: {str(e)}', exc_info=True)
raise HTTPException(
status_code=500,
detail=f'Failed to create backup: {str(e)}'
)
@router.get('/status')
async def check_backup_status(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Check if backup service is available."""
try:
is_available = backup_service.check_mysqldump_available()
return success_response(
data={
'available': is_available,
'message': 'Backup service is available' if is_available else 'mysqldump is not installed. Please install MySQL client tools to enable backups.'
}
)
except Exception as e:
logger.error(f'Failed to check backup status: {str(e)}', exc_info=True)
return success_response(
data={
'available': False,
'message': f'Error checking backup status: {str(e)}'
}
)
@router.get('')
async def list_backups(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""List all available backups."""
try:
backups = backup_service.list_backups()
return success_response(
data={
'backups': backups,
'count': len(backups)
}
)
except Exception as e:
logger.error(f'Failed to list backups: {str(e)}', exc_info=True)
raise HTTPException(
status_code=500,
detail=f'Failed to list backups: {str(e)}'
)
@router.get('/{filename}')
async def download_backup(
filename: str,
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Download a backup file."""
try:
backup_path = backup_service.BACKUP_DIR / filename
if not backup_path.exists():
raise HTTPException(
status_code=404,
detail=f'Backup not found: {filename}'
)
# Verify it's a backup file (security check)
if not filename.startswith('backup_') or not filename.endswith('.sql'):
raise HTTPException(
status_code=400,
detail='Invalid backup filename'
)
return FileResponse(
path=str(backup_path),
filename=filename,
media_type='application/sql'
)
except HTTPException:
raise
except Exception as e:
logger.error(f'Failed to download backup: {str(e)}', exc_info=True)
raise HTTPException(
status_code=500,
detail=f'Failed to download backup: {str(e)}'
)
@router.post('/cleanup')
async def cleanup_old_backups(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Remove backups older than the retention period."""
try:
removed_count = backup_service.cleanup_old_backups()
logger.info(f'Cleanup completed by user {current_user.id}: {removed_count} backups removed')
return success_response(
message=f'Cleanup completed: {removed_count} backup(s) removed',
data={'removed_count': removed_count}
)
except Exception as e:
logger.error(f'Cleanup failed: {str(e)}', exc_info=True)
raise HTTPException(
status_code=500,
detail=f'Failed to cleanup backups: {str(e)}'
)