This commit is contained in:
Iliyan Angelov
2025-11-30 23:29:01 +02:00
parent 39fcfff811
commit 0fa2adeb19
1058 changed files with 4630 additions and 296 deletions

View File

@@ -0,0 +1,5 @@
from . import security_routes
from . import compliance_routes
__all__ = ['security_routes', 'compliance_routes']

View File

@@ -0,0 +1,198 @@
"""
Routes for compliance reporting and management.
"""
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from sqlalchemy.orm import Session
from typing import Optional
from datetime import datetime, timedelta
import csv
import io
from ...shared.config.database import get_db
from ...shared.config.logging_config import get_logger
from ...security.middleware.auth import authorize_roles
from ...auth.models.user import User
from ...payments.models.financial_audit_trail import FinancialAuditTrail
from ...security.models.gdpr_compliance import DataSubjectRequest, DataSubjectRequestStatus
from ...analytics.models.audit_log import AuditLog
from ...shared.utils.response_helpers import success_response
logger = get_logger(__name__)
router = APIRouter(prefix='/compliance', tags=['compliance'])
@router.get('/report')
async def get_compliance_report(
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
format: str = Query("json", regex="^(json|csv)$"),
current_user: User = Depends(authorize_roles('admin', 'accountant')),
db: Session = Depends(get_db)
):
"""Generate comprehensive compliance report."""
try:
# Parse dates
if start_date:
try:
# Try parsing as full datetime first
start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except ValueError:
# If that fails, parse as date only
start = datetime.strptime(start_date, '%Y-%m-%d')
else:
start = datetime.utcnow() - timedelta(days=30)
if end_date:
try:
# Try parsing as full datetime first
end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
# If that fails, parse as date only, set to end of day
end = datetime.strptime(end_date, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
else:
end = datetime.utcnow()
# Financial audit trail statistics
try:
financial_audits = db.query(FinancialAuditTrail).filter(
FinancialAuditTrail.created_at >= start,
FinancialAuditTrail.created_at <= end
).count()
except Exception as e:
logger.warning(f'Error querying financial audit trail: {str(e)}')
financial_audits = 0
# GDPR requests statistics
try:
gdpr_requests = db.query(DataSubjectRequest).filter(
DataSubjectRequest.created_at >= start,
DataSubjectRequest.created_at <= end
).count()
gdpr_completed = db.query(DataSubjectRequest).filter(
DataSubjectRequest.created_at >= start,
DataSubjectRequest.created_at <= end,
DataSubjectRequest.status == DataSubjectRequestStatus.completed
).count()
except Exception as e:
logger.warning(f'Error querying GDPR requests: {str(e)}')
gdpr_requests = 0
gdpr_completed = 0
# Security audit logs
try:
security_audits = db.query(AuditLog).filter(
AuditLog.created_at >= start,
AuditLog.created_at <= end
).count()
except Exception as e:
logger.warning(f'Error querying security audit logs: {str(e)}')
security_audits = 0
# Build report
report = {
'period': {
'start_date': start.isoformat(),
'end_date': end.isoformat()
},
'financial_audit': {
'total_actions': financial_audits,
'description': 'Total financial transactions logged in audit trail'
},
'gdpr_compliance': {
'total_requests': gdpr_requests,
'completed_requests': gdpr_completed,
'completion_rate': round((gdpr_completed / gdpr_requests * 100) if gdpr_requests > 0 else 0, 2),
'description': 'GDPR data subject requests processed'
},
'security_audit': {
'total_events': security_audits,
'description': 'Total security audit events logged'
},
'compliance_status': 'compliant' if (gdpr_requests == 0 or gdpr_requests == gdpr_completed) else 'pending_review',
'generated_at': datetime.utcnow().isoformat(),
'generated_by': current_user.email if current_user.email else f'User {current_user.id}'
}
if format.lower() == 'csv':
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['Compliance Report'])
writer.writerow(['Period', f"{start.date()} to {end.date()}"])
writer.writerow(['Generated At', datetime.utcnow().isoformat()])
writer.writerow(['Generated By', current_user.email if current_user.email else f'User {current_user.id}'])
writer.writerow([])
writer.writerow(['Financial Audit'])
writer.writerow(['Total Actions', financial_audits])
writer.writerow([])
writer.writerow(['GDPR Compliance'])
writer.writerow(['Total Requests', gdpr_requests])
writer.writerow(['Completed Requests', gdpr_completed])
writer.writerow(['Completion Rate', f"{report['gdpr_compliance']['completion_rate']}%"])
writer.writerow([])
writer.writerow(['Security Audit'])
writer.writerow(['Total Events', security_audits])
writer.writerow([])
writer.writerow(['Compliance Status', report['compliance_status']])
return Response(
content=output.getvalue(),
media_type='text/csv',
headers={
'Content-Disposition': f'attachment; filename="compliance_report_{datetime.utcnow().strftime("%Y%m%d")}.csv"'
}
)
return success_response(
data=report,
message='Compliance report generated successfully'
)
except Exception as e:
logger.error(f'Error generating compliance report: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail='An error occurred while generating compliance report')
@router.get('/gdpr-summary')
async def get_gdpr_summary(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Get GDPR compliance summary."""
try:
# Get statistics
total_requests = db.query(DataSubjectRequest).count()
pending_requests = db.query(DataSubjectRequest).filter(
DataSubjectRequest.status == DataSubjectRequestStatus.pending
).count()
completed_requests = db.query(DataSubjectRequest).filter(
DataSubjectRequest.status == DataSubjectRequestStatus.completed
).count()
# Requests by type
from ...security.models.gdpr_compliance import DataSubjectRequestType
requests_by_type = {}
for req_type in DataSubjectRequestType:
count = db.query(DataSubjectRequest).filter(
DataSubjectRequest.request_type == req_type
).count()
requests_by_type[req_type.value] = count
return success_response(
data={
'total_requests': total_requests,
'pending_requests': pending_requests,
'completed_requests': completed_requests,
'completion_rate': round((completed_requests / total_requests * 100) if total_requests > 0 else 0, 2),
'requests_by_type': requests_by_type,
'compliance_status': 'compliant' if pending_requests == 0 else 'pending_review'
},
message='GDPR compliance summary retrieved successfully'
)
except Exception as e:
logger.error(f'Error retrieving GDPR summary: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail='An error occurred while retrieving GDPR summary')

View File

@@ -696,10 +696,11 @@ async def delete_user_data(
@router.get("/gdpr/export/{user_id}")
async def export_user_data(
user_id: int,
format: str = Query("json", regex="^(json|csv)$"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Export user data (GDPR portability request)"""
"""Export user data (GDPR portability request) - supports JSON and CSV formats"""
if current_user.id != user_id:
# Check if user is admin
from ...auth.models.role import Role
@@ -708,8 +709,19 @@ async def export_user_data(
raise HTTPException(status_code=403, detail="Access denied")
try:
data = gdpr_service.export_user_data(db=db, user_id=user_id)
return {"status": "success", "data": data}
result = gdpr_service.export_user_data(db=db, user_id=user_id, format=format)
if format.lower() == 'csv':
from fastapi.responses import Response
return Response(
content=result['data'],
media_type='text/csv',
headers={
'Content-Disposition': f'attachment; filename="user_data_{user_id}_{datetime.utcnow().strftime("%Y%m%d")}.csv"'
}
)
return {"status": "success", "data": result['data']}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))

View File

@@ -156,9 +156,74 @@ class GDPRService:
return False
@staticmethod
def export_user_data(db: Session, user_id: int) -> Dict[str, Any]:
"""Export user data in portable format (for portability request)"""
return GDPRService.get_user_data(db, user_id)
def export_user_data(db: Session, user_id: int, format: str = 'json') -> Dict[str, Any]:
"""Export user data in portable format (for portability request)
Args:
db: Database session
user_id: User ID to export
format: Export format ('json' or 'csv')
Returns:
Dict containing exported data or CSV string
"""
data = GDPRService.get_user_data(db, user_id)
if format.lower() == 'csv':
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# Write user data
writer.writerow(['Data Type', 'Field', 'Value'])
user_data = data.get('user', {})
for key, value in user_data.items():
writer.writerow(['User', key, value])
# Write bookings
writer.writerow([])
writer.writerow(['Bookings'])
writer.writerow(['ID', 'Booking Number', 'Check In', 'Check Out', 'Total Price', 'Status'])
for booking in data.get('bookings', []):
writer.writerow([
booking.get('id'),
booking.get('booking_number'),
booking.get('check_in_date'),
booking.get('check_out_date'),
booking.get('total_price'),
booking.get('status')
])
# Write payments
writer.writerow([])
writer.writerow(['Payments'])
writer.writerow(['ID', 'Amount', 'Payment Method', 'Payment Status', 'Payment Date'])
for payment in data.get('payments', []):
writer.writerow([
payment.get('id'),
payment.get('amount'),
payment.get('payment_method'),
payment.get('payment_status'),
payment.get('payment_date')
])
# Write reviews
writer.writerow([])
writer.writerow(['Reviews'])
writer.writerow(['ID', 'Rating', 'Comment', 'Created At'])
for review in data.get('reviews', []):
writer.writerow([
review.get('id'),
review.get('rating'),
review.get('comment'),
review.get('created_at')
])
return {'format': 'csv', 'data': output.getvalue()}
return {'format': 'json', 'data': data}
@staticmethod
def record_consent(