This commit is contained in:
Iliyan Angelov
2025-11-28 02:40:05 +02:00
parent 627959f52b
commit 312f85530c
246 changed files with 23535 additions and 3428 deletions

View File

@@ -0,0 +1,148 @@
"""
File validation utilities for secure file uploads.
Validates file types using magic bytes (file signatures) to prevent spoofing.
"""
from PIL import Image
import io
from typing import Tuple, Optional
from fastapi import UploadFile, HTTPException, status
# Magic bytes for common image formats
IMAGE_MAGIC_BYTES = {
b'\xFF\xD8\xFF': 'image/jpeg',
b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A': 'image/png',
b'GIF87a': 'image/gif',
b'GIF89a': 'image/gif',
b'RIFF': 'image/webp', # WebP files start with RIFF, need deeper check
b'\x00\x00\x01\x00': 'image/x-icon',
b'\x00\x00\x02\x00': 'image/x-icon',
}
ALLOWED_IMAGE_TYPES = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
def validate_image_file_signature(file_content: bytes, filename: str) -> Tuple[bool, str]:
"""
Validate file type using magic bytes (file signature).
This prevents MIME type spoofing attacks.
Args:
file_content: The file content as bytes
filename: The filename (for extension checking)
Returns:
Tuple of (is_valid, error_message)
"""
if not file_content:
return False, "File is empty"
# Check magic bytes for image types
file_start = file_content[:16] # Check first 16 bytes
detected_type = None
# Check for JPEG
if file_content.startswith(b'\xFF\xD8\xFF'):
detected_type = 'image/jpeg'
# Check for PNG
elif file_content.startswith(b'\x89\x50\x4E\x47\x0D\x0A\x1A\x0A'):
detected_type = 'image/png'
# Check for GIF
elif file_content.startswith(b'GIF87a') or file_content.startswith(b'GIF89a'):
detected_type = 'image/gif'
# Check for WebP (RIFF header with WEBP in bytes 8-11)
elif file_content.startswith(b'RIFF') and len(file_content) > 12:
if file_content[8:12] == b'WEBP':
detected_type = 'image/webp'
# Check for ICO
elif file_content.startswith(b'\x00\x00\x01\x00') or file_content.startswith(b'\x00\x00\x02\x00'):
detected_type = 'image/x-icon'
# If magic bytes don't match known image types, try PIL verification
if not detected_type:
try:
# Try to open with PIL to verify it's a valid image
img = Image.open(io.BytesIO(file_content))
img.verify()
# Get format from PIL
img_format = img.format.lower() if img.format else None
if img_format == 'jpeg':
detected_type = 'image/jpeg'
elif img_format == 'png':
detected_type = 'image/png'
elif img_format == 'gif':
detected_type = 'image/gif'
elif img_format == 'webp':
detected_type = 'image/webp'
else:
return False, f"Unsupported image format: {img_format}"
except Exception:
return False, "File is not a valid image or is corrupted"
# Verify detected type is in allowed list
if detected_type not in ALLOWED_IMAGE_TYPES and detected_type != 'image/x-icon':
return False, f"File type {detected_type} is not allowed. Allowed types: {', '.join(ALLOWED_IMAGE_TYPES)}"
return True, detected_type
async def validate_uploaded_image(file: UploadFile, max_size: int) -> bytes:
"""
Validate an uploaded image file completely.
Args:
file: FastAPI UploadFile object
max_size: Maximum file size in bytes
Returns:
File content as bytes
Raises:
HTTPException if validation fails
"""
# Check MIME type first (quick check)
if not file.content_type or not file.content_type.startswith('image/'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f'File must be an image. Received MIME type: {file.content_type}'
)
# Read file content
content = await file.read()
# Validate file size
if len(content) > max_size:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f'File size ({len(content)} bytes) exceeds maximum allowed size ({max_size} bytes / {max_size // 1024 // 1024}MB)'
)
# Validate file signature (magic bytes)
is_valid, result = validate_image_file_signature(content, file.filename or '')
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f'Invalid file type: {result}. File signature validation failed. Please upload a valid image file.'
)
# Additional PIL validation to ensure image is not corrupted
try:
img = Image.open(io.BytesIO(content))
# Verify image integrity
img.verify()
# Re-open for further processing (verify() closes the image)
img = Image.open(io.BytesIO(content))
# Check image dimensions to prevent decompression bombs
if img.width > 10000 or img.height > 10000:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Image dimensions too large. Maximum dimensions: 10000x10000 pixels'
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f'Invalid or corrupted image file: {str(e)}'
)
return content

View File

@@ -0,0 +1,99 @@
"""
HTML sanitization utilities for backend content storage.
Prevents XSS attacks by sanitizing HTML before storing in database.
"""
import bleach
from typing import Optional
# Allowed HTML tags for rich content
ALLOWED_TAGS = [
'p', 'br', 'strong', 'em', 'u', 'b', 'i', 'span', 'div',
'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
'ul', 'ol', 'li',
'a', 'blockquote', 'pre', 'code',
'table', 'thead', 'tbody', 'tr', 'th', 'td',
'img', 'hr', 'section', 'article'
]
# Allowed HTML attributes
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title', 'target', 'rel'],
'img': ['src', 'alt', 'title', 'width', 'height', 'class'],
'div': ['class', 'id', 'style'],
'span': ['class', 'id', 'style'],
'p': ['class', 'id', 'style'],
'h1': ['class', 'id'],
'h2': ['class', 'id'],
'h3': ['class', 'id'],
'h4': ['class', 'id'],
'h5': ['class', 'id'],
'h6': ['class', 'id'],
'table': ['class', 'id'],
'tr': ['class', 'id'],
'th': ['class', 'id', 'colspan', 'rowspan'],
'td': ['class', 'id', 'colspan', 'rowspan'],
}
# Allowed URL schemes
ALLOWED_SCHEMES = ['http', 'https', 'mailto', 'tel']
def sanitize_html(html_content: Optional[str]) -> str:
"""
Sanitize HTML content to prevent XSS attacks.
Args:
html_content: HTML string to sanitize (can be None)
Returns:
Sanitized HTML string safe for storage
"""
if not html_content:
return ''
# Clean HTML content
cleaned = bleach.clean(
html_content,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRIBUTES,
protocols=ALLOWED_SCHEMES,
strip=True, # Strip disallowed tags instead of escaping
strip_comments=True, # Remove HTML comments
)
# Additional link sanitization - ensure external links have rel="noopener"
if '<a' in cleaned:
import re
# Add rel="noopener noreferrer" to external links
def add_rel(match):
tag = match.group(0)
if 'href=' in tag and ('http://' in tag or 'https://' in tag):
if 'rel=' not in tag:
# Insert rel attribute before closing >
return tag[:-1] + ' rel="noopener noreferrer">'
elif 'noopener' not in tag and 'noreferrer' not in tag:
# Add to existing rel attribute
tag = tag.replace('rel="', 'rel="noopener noreferrer ')
tag = tag.replace("rel='", "rel='noopener noreferrer ")
return tag
return tag
cleaned = re.sub(r'<a[^>]*>', add_rel, cleaned)
return cleaned
def sanitize_text_for_html(text: Optional[str]) -> str:
"""
Escape text content to be safely included in HTML.
Use this for plain text that should be displayed as-is.
Args:
text: Plain text string to escape
Returns:
HTML-escaped string
"""
if not text:
return ''
return bleach.clean(text, tags=[], strip=True)

View File

@@ -0,0 +1,59 @@
"""
Password validation utilities for enforcing password strength requirements.
"""
import re
from typing import Tuple, List
# Password strength requirements
MIN_PASSWORD_LENGTH = 8
REQUIRE_UPPERCASE = True
REQUIRE_LOWERCASE = True
REQUIRE_NUMBER = True
REQUIRE_SPECIAL = True
def validate_password_strength(password: str) -> Tuple[bool, List[str]]:
"""
Validate password meets strength requirements.
Args:
password: The password to validate
Returns:
Tuple of (is_valid, list_of_errors)
"""
errors = []
if not password:
return False, ['Password is required']
# Check minimum length
if len(password) < MIN_PASSWORD_LENGTH:
errors.append(f'Password must be at least {MIN_PASSWORD_LENGTH} characters long')
# Check for uppercase letter
if REQUIRE_UPPERCASE and not re.search(r'[A-Z]', password):
errors.append('Password must contain at least one uppercase letter')
# Check for lowercase letter
if REQUIRE_LOWERCASE and not re.search(r'[a-z]', password):
errors.append('Password must contain at least one lowercase letter')
# Check for number
if REQUIRE_NUMBER and not re.search(r'\d', password):
errors.append('Password must contain at least one number')
# Check for special character
if REQUIRE_SPECIAL and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append('Password must contain at least one special character (!@#$%^&*(),.?":{}|<>)')
# Check for common weak passwords
common_passwords = [
'password', '12345678', 'qwerty', 'abc123', 'password123',
'admin', 'letmein', 'welcome', 'monkey', '1234567890'
]
if password.lower() in common_passwords:
errors.append('Password is too common. Please choose a stronger password')
is_valid = len(errors) == 0
return is_valid, errors

View File

@@ -0,0 +1,21 @@
"""
Utility functions for request handling
"""
from typing import Optional
from fastapi import Request
def get_request_id(request: Optional[Request] = None) -> Optional[str]:
"""
Extract request_id from request state.
Args:
request: FastAPI Request object
Returns:
Request ID string or None
"""
if not request:
return None
return getattr(request.state, 'request_id', None) if hasattr(request, 'state') else None

View File

@@ -2,6 +2,7 @@
Utility functions for standardizing API responses
"""
from typing import Any, Dict, Optional
from fastapi import HTTPException, Request
def success_response(
data: Any = None,
@@ -31,6 +32,7 @@ def success_response(
def error_response(
message: str,
errors: Optional[list] = None,
request_id: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""
@@ -45,7 +47,40 @@ def error_response(
if errors:
response['errors'] = errors
if request_id:
response['request_id'] = request_id
response.update(kwargs)
return response
def raise_http_exception(
status_code: int,
message: str,
errors: Optional[list] = None,
request: Optional[Request] = None,
**kwargs
) -> None:
"""
Raise an HTTPException with standardized error response format.
Args:
status_code: HTTP status code
message: Error message
errors: Optional list of error details
request: Optional Request object to extract request_id
**kwargs: Additional fields to include in response
"""
request_id = None
if request:
request_id = getattr(request.state, 'request_id', None) if hasattr(request, 'state') else None
detail = error_response(
message=message,
errors=errors,
request_id=request_id,
**kwargs
)
raise HTTPException(status_code=status_code, detail=detail)