287 lines
9.8 KiB
Python
287 lines
9.8 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request, UploadFile, File
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_, or_
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import os
|
|
import aiofiles
|
|
import uuid
|
|
|
|
from ..config.database import get_db
|
|
from ..middleware.auth import get_current_user, authorize_roles
|
|
from ..models.user import User
|
|
from ..models.banner import Banner
|
|
|
|
router = APIRouter(prefix="/banners", tags=["banners"])
|
|
|
|
|
|
def normalize_image_url(image_url: str, base_url: str) -> str:
|
|
"""Normalize image URL to absolute URL"""
|
|
if not image_url:
|
|
return image_url
|
|
if image_url.startswith('http://') or image_url.startswith('https://'):
|
|
return image_url
|
|
if image_url.startswith('/'):
|
|
return f"{base_url}{image_url}"
|
|
return f"{base_url}/{image_url}"
|
|
|
|
|
|
def get_base_url(request: Request) -> str:
|
|
"""Get base URL for image normalization"""
|
|
return os.getenv("SERVER_URL") or f"http://{request.headers.get('host', 'localhost:3000')}"
|
|
|
|
|
|
@router.get("/")
|
|
async def get_banners(
|
|
request: Request,
|
|
position: Optional[str] = Query(None),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get all active banners"""
|
|
try:
|
|
query = db.query(Banner).filter(Banner.is_active == True)
|
|
|
|
# Filter by position
|
|
if position:
|
|
query = query.filter(Banner.position == position)
|
|
|
|
# Filter by date range
|
|
now = datetime.utcnow()
|
|
query = query.filter(
|
|
or_(
|
|
Banner.start_date == None,
|
|
Banner.start_date <= now
|
|
)
|
|
).filter(
|
|
or_(
|
|
Banner.end_date == None,
|
|
Banner.end_date >= now
|
|
)
|
|
)
|
|
|
|
banners = query.order_by(Banner.display_order.asc(), Banner.created_at.desc()).all()
|
|
|
|
base_url = get_base_url(request)
|
|
result = []
|
|
for banner in banners:
|
|
banner_dict = {
|
|
"id": banner.id,
|
|
"title": banner.title,
|
|
"description": banner.description,
|
|
"image_url": normalize_image_url(banner.image_url, base_url),
|
|
"link_url": banner.link_url,
|
|
"position": banner.position,
|
|
"display_order": banner.display_order,
|
|
"is_active": banner.is_active,
|
|
"start_date": banner.start_date.isoformat() if banner.start_date else None,
|
|
"end_date": banner.end_date.isoformat() if banner.end_date else None,
|
|
"created_at": banner.created_at.isoformat() if banner.created_at else None,
|
|
}
|
|
result.append(banner_dict)
|
|
|
|
return {
|
|
"status": "success",
|
|
"data": {"banners": result}
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/{id}")
|
|
async def get_banner_by_id(
|
|
id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get banner by ID"""
|
|
try:
|
|
banner = db.query(Banner).filter(Banner.id == id).first()
|
|
if not banner:
|
|
raise HTTPException(status_code=404, detail="Banner not found")
|
|
|
|
base_url = get_base_url(request)
|
|
banner_dict = {
|
|
"id": banner.id,
|
|
"title": banner.title,
|
|
"description": banner.description,
|
|
"image_url": normalize_image_url(banner.image_url, base_url),
|
|
"link_url": banner.link_url,
|
|
"position": banner.position,
|
|
"display_order": banner.display_order,
|
|
"is_active": banner.is_active,
|
|
"start_date": banner.start_date.isoformat() if banner.start_date else None,
|
|
"end_date": banner.end_date.isoformat() if banner.end_date else None,
|
|
"created_at": banner.created_at.isoformat() if banner.created_at else None,
|
|
}
|
|
|
|
return {
|
|
"status": "success",
|
|
"data": {"banner": banner_dict}
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/", dependencies=[Depends(authorize_roles("admin"))])
|
|
async def create_banner(
|
|
banner_data: dict,
|
|
current_user: User = Depends(authorize_roles("admin")),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Create new banner (Admin only)"""
|
|
try:
|
|
banner = Banner(
|
|
title=banner_data.get("title"),
|
|
description=banner_data.get("description"),
|
|
image_url=banner_data.get("image_url"),
|
|
link_url=banner_data.get("link"),
|
|
position=banner_data.get("position", "home"),
|
|
display_order=banner_data.get("display_order", 0),
|
|
is_active=True,
|
|
start_date=datetime.fromisoformat(banner_data["start_date"].replace('Z', '+00:00')) if banner_data.get("start_date") else None,
|
|
end_date=datetime.fromisoformat(banner_data["end_date"].replace('Z', '+00:00')) if banner_data.get("end_date") else None,
|
|
)
|
|
|
|
db.add(banner)
|
|
db.commit()
|
|
db.refresh(banner)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Banner created successfully",
|
|
"data": {"banner": banner}
|
|
}
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.put("/{id}", dependencies=[Depends(authorize_roles("admin"))])
|
|
async def update_banner(
|
|
id: int,
|
|
banner_data: dict,
|
|
current_user: User = Depends(authorize_roles("admin")),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Update banner (Admin only)"""
|
|
try:
|
|
banner = db.query(Banner).filter(Banner.id == id).first()
|
|
if not banner:
|
|
raise HTTPException(status_code=404, detail="Banner not found")
|
|
|
|
if "title" in banner_data:
|
|
banner.title = banner_data["title"]
|
|
if "description" in banner_data:
|
|
banner.description = banner_data["description"]
|
|
if "image_url" in banner_data:
|
|
banner.image_url = banner_data["image_url"]
|
|
if "link" in banner_data:
|
|
banner.link_url = banner_data["link"]
|
|
if "position" in banner_data:
|
|
banner.position = banner_data["position"]
|
|
if "display_order" in banner_data:
|
|
banner.display_order = banner_data["display_order"]
|
|
if "is_active" in banner_data:
|
|
banner.is_active = banner_data["is_active"]
|
|
if "start_date" in banner_data:
|
|
banner.start_date = datetime.fromisoformat(banner_data["start_date"].replace('Z', '+00:00')) if banner_data["start_date"] else None
|
|
if "end_date" in banner_data:
|
|
banner.end_date = datetime.fromisoformat(banner_data["end_date"].replace('Z', '+00:00')) if banner_data["end_date"] else None
|
|
|
|
db.commit()
|
|
db.refresh(banner)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Banner updated successfully",
|
|
"data": {"banner": banner}
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.delete("/{id}", dependencies=[Depends(authorize_roles("admin"))])
|
|
async def delete_banner(
|
|
id: int,
|
|
current_user: User = Depends(authorize_roles("admin")),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Delete banner (Admin only)"""
|
|
try:
|
|
banner = db.query(Banner).filter(Banner.id == id).first()
|
|
if not banner:
|
|
raise HTTPException(status_code=404, detail="Banner not found")
|
|
|
|
# Delete image file if it exists and is a local upload
|
|
if banner.image_url and banner.image_url.startswith('/uploads/banners/'):
|
|
file_path = Path(__file__).parent.parent.parent / "uploads" / "banners" / Path(banner.image_url).name
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
|
|
db.delete(banner)
|
|
db.commit()
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Banner deleted successfully"
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/upload", dependencies=[Depends(authorize_roles("admin"))])
|
|
async def upload_banner_image(
|
|
request: Request,
|
|
image: UploadFile = File(...),
|
|
current_user: User = Depends(authorize_roles("admin")),
|
|
):
|
|
"""Upload banner image (Admin only)"""
|
|
try:
|
|
# Validate file type
|
|
if not image.content_type or not image.content_type.startswith('image/'):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="File must be an image"
|
|
)
|
|
|
|
# Create uploads directory
|
|
upload_dir = Path(__file__).parent.parent.parent / "uploads" / "banners"
|
|
upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate filename
|
|
ext = Path(image.filename).suffix
|
|
filename = f"banner-{uuid.uuid4()}{ext}"
|
|
file_path = upload_dir / filename
|
|
|
|
# Save file
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
content = await image.read()
|
|
await f.write(content)
|
|
|
|
# Return the image URL
|
|
image_url = f"/uploads/banners/{filename}"
|
|
base_url = get_base_url(request)
|
|
full_url = normalize_image_url(image_url, base_url)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Image uploaded successfully",
|
|
"data": {
|
|
"image_url": image_url,
|
|
"full_url": full_url
|
|
}
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|