update
This commit is contained in:
Binary file not shown.
Binary file not shown.
@@ -41,9 +41,13 @@ class PageContent(Base):
|
||||
hero_title = Column(String(500), nullable=True)
|
||||
hero_subtitle = Column(String(1000), nullable=True)
|
||||
hero_image = Column(String(1000), nullable=True)
|
||||
hero_video_url = Column(Text, nullable=True)
|
||||
hero_video_poster = Column(Text, nullable=True)
|
||||
story_content = Column(Text, nullable=True)
|
||||
values = Column(Text, nullable=True)
|
||||
features = Column(Text, nullable=True)
|
||||
features_section_title = Column(Text, nullable=True)
|
||||
features_section_subtitle = Column(Text, nullable=True)
|
||||
about_hero_image = Column(Text, nullable=True)
|
||||
mission = Column(Text, nullable=True)
|
||||
vision = Column(Text, nullable=True)
|
||||
@@ -74,8 +78,18 @@ class PageContent(Base):
|
||||
about_preview_content = Column(Text, nullable=True)
|
||||
about_preview_image = Column(String(1000), nullable=True)
|
||||
stats = Column(Text, nullable=True)
|
||||
stats_section_title = Column(Text, nullable=True)
|
||||
stats_section_subtitle = Column(Text, nullable=True)
|
||||
rooms_section_title = Column(Text, nullable=True)
|
||||
rooms_section_subtitle = Column(Text, nullable=True)
|
||||
rooms_section_button_text = Column(Text, nullable=True)
|
||||
rooms_section_button_link = Column(Text, nullable=True)
|
||||
rooms_section_enabled = Column(Boolean, default=True, nullable=True)
|
||||
luxury_services_section_title = Column(Text, nullable=True)
|
||||
luxury_services_section_subtitle = Column(Text, nullable=True)
|
||||
services_section_button_text = Column(Text, nullable=True)
|
||||
services_section_button_link = Column(Text, nullable=True)
|
||||
services_section_limit = Column(Integer, nullable=True)
|
||||
luxury_services = Column(Text, nullable=True)
|
||||
luxury_experiences_section_title = Column(Text, nullable=True)
|
||||
luxury_experiences_section_subtitle = Column(Text, nullable=True)
|
||||
@@ -91,6 +105,24 @@ class PageContent(Base):
|
||||
partners_section_title = Column(Text, nullable=True)
|
||||
partners_section_subtitle = Column(Text, nullable=True)
|
||||
partners = Column(Text, nullable=True)
|
||||
sections_enabled = Column(Text, nullable=True)
|
||||
newsletter_section_title = Column(Text, nullable=True)
|
||||
newsletter_section_subtitle = Column(Text, nullable=True)
|
||||
newsletter_placeholder = Column(Text, nullable=True)
|
||||
newsletter_button_text = Column(Text, nullable=True)
|
||||
newsletter_enabled = Column(Boolean, default=False, nullable=True)
|
||||
trust_badges_section_title = Column(Text, nullable=True)
|
||||
trust_badges_section_subtitle = Column(Text, nullable=True)
|
||||
trust_badges = Column(Text, nullable=True)
|
||||
trust_badges_enabled = Column(Boolean, default=False, nullable=True)
|
||||
promotions_section_title = Column(Text, nullable=True)
|
||||
promotions_section_subtitle = Column(Text, nullable=True)
|
||||
promotions = Column(Text, nullable=True)
|
||||
promotions_enabled = Column(Boolean, default=False, nullable=True)
|
||||
blog_section_title = Column(Text, nullable=True)
|
||||
blog_section_subtitle = Column(Text, nullable=True)
|
||||
blog_posts_limit = Column(Integer, nullable=True)
|
||||
blog_enabled = Column(Boolean, default=False, nullable=True)
|
||||
is_active = Column(Boolean, default=True, nullable=False)
|
||||
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -8,7 +8,7 @@ from ...auth.models.user import User
|
||||
from ...auth.models.role import Role
|
||||
from ...system.models.system_settings import SystemSettings
|
||||
from ...shared.utils.mailer import send_email
|
||||
from ...shared.utils.html_sanitizer import sanitize_text_for_html
|
||||
from ...shared.utils.sanitization import sanitize_text_for_html
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix='/contact', tags=['contact'])
|
||||
|
||||
|
||||
@@ -41,9 +41,13 @@ def serialize_page_content(content: PageContent) -> dict:
|
||||
'hero_title': content.hero_title,
|
||||
'hero_subtitle': content.hero_subtitle,
|
||||
'hero_image': content.hero_image,
|
||||
'hero_video_url': content.hero_video_url,
|
||||
'hero_video_poster': content.hero_video_poster,
|
||||
'story_content': content.story_content,
|
||||
'values': safe_json_loads(content.values, []),
|
||||
'features': safe_json_loads(content.features, []),
|
||||
'features_section_title': content.features_section_title,
|
||||
'features_section_subtitle': content.features_section_subtitle,
|
||||
'about_hero_image': content.about_hero_image,
|
||||
'mission': content.mission,
|
||||
'vision': content.vision,
|
||||
@@ -74,8 +78,18 @@ def serialize_page_content(content: PageContent) -> dict:
|
||||
'about_preview_content': content.about_preview_content,
|
||||
'about_preview_image': content.about_preview_image,
|
||||
'stats': safe_json_loads(content.stats, []),
|
||||
'stats_section_title': content.stats_section_title,
|
||||
'stats_section_subtitle': content.stats_section_subtitle,
|
||||
'rooms_section_title': content.rooms_section_title,
|
||||
'rooms_section_subtitle': content.rooms_section_subtitle,
|
||||
'rooms_section_button_text': content.rooms_section_button_text,
|
||||
'rooms_section_button_link': content.rooms_section_button_link,
|
||||
'rooms_section_enabled': content.rooms_section_enabled,
|
||||
'luxury_services_section_title': content.luxury_services_section_title,
|
||||
'luxury_services_section_subtitle': content.luxury_services_section_subtitle,
|
||||
'services_section_button_text': content.services_section_button_text,
|
||||
'services_section_button_link': content.services_section_button_link,
|
||||
'services_section_limit': content.services_section_limit,
|
||||
'luxury_services': safe_json_loads(content.luxury_services, []),
|
||||
'luxury_experiences_section_title': content.luxury_experiences_section_title,
|
||||
'luxury_experiences_section_subtitle': content.luxury_experiences_section_subtitle,
|
||||
@@ -91,6 +105,24 @@ def serialize_page_content(content: PageContent) -> dict:
|
||||
'partners_section_title': content.partners_section_title,
|
||||
'partners_section_subtitle': content.partners_section_subtitle,
|
||||
'partners': safe_json_loads(content.partners, []),
|
||||
'sections_enabled': safe_json_loads(content.sections_enabled, {}),
|
||||
'newsletter_section_title': content.newsletter_section_title,
|
||||
'newsletter_section_subtitle': content.newsletter_section_subtitle,
|
||||
'newsletter_placeholder': content.newsletter_placeholder,
|
||||
'newsletter_button_text': content.newsletter_button_text,
|
||||
'newsletter_enabled': content.newsletter_enabled,
|
||||
'trust_badges_section_title': content.trust_badges_section_title,
|
||||
'trust_badges_section_subtitle': content.trust_badges_section_subtitle,
|
||||
'trust_badges': safe_json_loads(content.trust_badges, []),
|
||||
'trust_badges_enabled': content.trust_badges_enabled,
|
||||
'promotions_section_title': content.promotions_section_title,
|
||||
'promotions_section_subtitle': content.promotions_section_subtitle,
|
||||
'promotions': safe_json_loads(content.promotions, []),
|
||||
'promotions_enabled': content.promotions_enabled,
|
||||
'blog_section_title': content.blog_section_title,
|
||||
'blog_section_subtitle': content.blog_section_subtitle,
|
||||
'blog_posts_limit': content.blog_posts_limit,
|
||||
'blog_enabled': content.blog_enabled,
|
||||
'is_active': content.is_active,
|
||||
'created_at': content.created_at.isoformat() if content.created_at else None,
|
||||
'updated_at': content.updated_at.isoformat() if content.updated_at else None
|
||||
|
||||
File diff suppressed because one or more lines are too long
Binary file not shown.
@@ -27,9 +27,13 @@ class PageContentUpdateRequest(BaseModel):
|
||||
hero_title: Optional[str] = Field(None, max_length=500)
|
||||
hero_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
hero_image: Optional[str] = Field(None, max_length=1000)
|
||||
hero_video_url: Optional[str] = Field(None, max_length=1000)
|
||||
hero_video_poster: Optional[str] = Field(None, max_length=1000)
|
||||
story_content: Optional[str] = None
|
||||
values: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
features: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
features_section_title: Optional[str] = Field(None, max_length=500)
|
||||
features_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
about_hero_image: Optional[str] = Field(None, max_length=1000)
|
||||
mission: Optional[str] = Field(None, max_length=2000)
|
||||
vision: Optional[str] = Field(None, max_length=2000)
|
||||
@@ -60,8 +64,18 @@ class PageContentUpdateRequest(BaseModel):
|
||||
about_preview_content: Optional[str] = None
|
||||
about_preview_image: Optional[str] = Field(None, max_length=1000)
|
||||
stats: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
stats_section_title: Optional[str] = Field(None, max_length=500)
|
||||
stats_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
rooms_section_title: Optional[str] = Field(None, max_length=500)
|
||||
rooms_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
rooms_section_button_text: Optional[str] = Field(None, max_length=200)
|
||||
rooms_section_button_link: Optional[str] = Field(None, max_length=1000)
|
||||
rooms_section_enabled: Optional[bool] = None
|
||||
luxury_services_section_title: Optional[str] = Field(None, max_length=500)
|
||||
luxury_services_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
services_section_button_text: Optional[str] = Field(None, max_length=200)
|
||||
services_section_button_link: Optional[str] = Field(None, max_length=1000)
|
||||
services_section_limit: Optional[int] = None
|
||||
luxury_services: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
luxury_experiences_section_title: Optional[str] = Field(None, max_length=500)
|
||||
luxury_experiences_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
@@ -77,6 +91,24 @@ class PageContentUpdateRequest(BaseModel):
|
||||
partners_section_title: Optional[str] = Field(None, max_length=500)
|
||||
partners_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
partners: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
sections_enabled: Optional[Union[str, Dict[str, bool]]] = None
|
||||
newsletter_section_title: Optional[str] = Field(None, max_length=500)
|
||||
newsletter_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
newsletter_placeholder: Optional[str] = Field(None, max_length=200)
|
||||
newsletter_button_text: Optional[str] = Field(None, max_length=200)
|
||||
newsletter_enabled: Optional[bool] = None
|
||||
trust_badges_section_title: Optional[str] = Field(None, max_length=500)
|
||||
trust_badges_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
trust_badges: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
trust_badges_enabled: Optional[bool] = None
|
||||
promotions_section_title: Optional[str] = Field(None, max_length=500)
|
||||
promotions_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
promotions: Optional[Union[str, List[Dict[str, Any]]]] = None
|
||||
promotions_enabled: Optional[bool] = None
|
||||
blog_section_title: Optional[str] = Field(None, max_length=500)
|
||||
blog_section_subtitle: Optional[str] = Field(None, max_length=1000)
|
||||
blog_posts_limit: Optional[int] = None
|
||||
blog_enabled: Optional[bool] = None
|
||||
copyright_text: Optional[str] = Field(None, max_length=500)
|
||||
is_active: Optional[bool] = True
|
||||
|
||||
@@ -84,7 +116,8 @@ class PageContentUpdateRequest(BaseModel):
|
||||
'features', 'amenities', 'testimonials', 'gallery_images', 'stats',
|
||||
'luxury_features', 'luxury_gallery', 'luxury_testimonials',
|
||||
'luxury_services', 'luxury_experiences', 'awards', 'partners',
|
||||
'team', 'timeline', 'achievements', mode='before')
|
||||
'team', 'timeline', 'achievements', 'sections_enabled', 'trust_badges',
|
||||
'promotions', mode='before')
|
||||
@classmethod
|
||||
def validate_json_fields(cls, v):
|
||||
"""Validate and parse JSON string fields."""
|
||||
|
||||
@@ -44,8 +44,7 @@ else:
|
||||
PageContent.__table__.create(bind=engine, checkfirst=True)
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to ensure required tables exist: {e}')
|
||||
from .auth.routes import auth_routes
|
||||
from .content.routes import privacy_routes
|
||||
|
||||
app = FastAPI(title=settings.APP_NAME, description='Enterprise-grade Hotel Booking API', version=settings.APP_VERSION, docs_url='/api/docs' if not settings.is_production else None, redoc_url='/api/redoc' if not settings.is_production else None, openapi_url='/api/openapi.json' if not settings.is_production else None)
|
||||
app.add_middleware(RequestIDMiddleware)
|
||||
app.add_middleware(CookieConsentMiddleware)
|
||||
@@ -193,7 +192,6 @@ async def metrics(
|
||||
# Custom route for serving uploads with CORS headers
|
||||
# This route takes precedence over the mount below
|
||||
from fastapi.responses import FileResponse
|
||||
import re
|
||||
|
||||
@app.options('/uploads/{file_path:path}')
|
||||
async def serve_upload_file_options(file_path: str, request: Request):
|
||||
|
||||
Binary file not shown.
@@ -52,6 +52,7 @@ class Campaign(Base):
|
||||
# Segmentation
|
||||
segment_id = Column(Integer, ForeignKey('campaign_segments.id'), nullable=True)
|
||||
segment_criteria = Column(JSON, nullable=True) # Store segment criteria as JSON
|
||||
recipient_type = Column(String(50), nullable=True, default='users') # 'users', 'subscribers', 'both'
|
||||
|
||||
# A/B Testing
|
||||
is_ab_test = Column(Boolean, nullable=False, default=False)
|
||||
@@ -283,3 +284,18 @@ class Unsubscribe(Base):
|
||||
user = relationship('User')
|
||||
campaign = relationship('Campaign')
|
||||
|
||||
class NewsletterSubscriber(Base):
|
||||
__tablename__ = 'newsletter_subscribers'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
||||
email = Column(String(255), nullable=False, unique=True, index=True)
|
||||
user_id = Column(Integer, ForeignKey('users.id'), nullable=True, index=True)
|
||||
name = Column(String(255), nullable=True)
|
||||
|
||||
subscribed_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
||||
is_active = Column(Boolean, nullable=False, default=True, index=True)
|
||||
|
||||
# Relationships
|
||||
user = relationship('User')
|
||||
|
||||
|
||||
Binary file not shown.
@@ -1,5 +1,6 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request
|
||||
from sqlalchemy.orm import Session, selectinload
|
||||
from sqlalchemy import func
|
||||
from typing import Optional, List, Union
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel, EmailStr, field_validator
|
||||
@@ -10,7 +11,7 @@ from ...auth.models.user import User
|
||||
from ..models.email_campaign import (
|
||||
Campaign, CampaignStatus, CampaignType,
|
||||
CampaignSegment, EmailTemplate, CampaignEmail, EmailStatus,
|
||||
DripSequence, DripSequenceStep, Unsubscribe
|
||||
DripSequence, DripSequenceStep, Unsubscribe, NewsletterSubscriber
|
||||
)
|
||||
from ..services.email_campaign_service import email_campaign_service
|
||||
|
||||
@@ -31,6 +32,7 @@ class CampaignCreate(BaseModel):
|
||||
reply_to_email: Optional[str] = None
|
||||
track_opens: bool = True
|
||||
track_clicks: bool = True
|
||||
recipient_type: Optional[str] = 'users' # 'users', 'subscribers', 'both'
|
||||
|
||||
@field_validator('segment_id', 'template_id', mode='before')
|
||||
@classmethod
|
||||
@@ -408,7 +410,8 @@ async def create_campaign(
|
||||
from_email=data.from_email,
|
||||
reply_to_email=data.reply_to_email,
|
||||
track_opens=data.track_opens,
|
||||
track_clicks=data.track_clicks
|
||||
track_clicks=data.track_clicks,
|
||||
recipient_type=data.recipient_type or 'users'
|
||||
)
|
||||
|
||||
return {"status": "success", "campaign_id": campaign.id}
|
||||
@@ -533,6 +536,139 @@ async def track_email_click(
|
||||
from fastapi.responses import RedirectResponse
|
||||
return RedirectResponse(url=url)
|
||||
|
||||
# Newsletter Subscription Routes
|
||||
class NewsletterSubscribeRequest(BaseModel):
|
||||
email: EmailStr
|
||||
name: Optional[str] = None
|
||||
|
||||
@router.post("/newsletter/subscribe")
|
||||
async def subscribe_to_newsletter(
|
||||
request: NewsletterSubscribeRequest,
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""Subscribe to newsletter"""
|
||||
email = request.email
|
||||
name = request.name
|
||||
|
||||
# Check if already unsubscribed
|
||||
existing_unsubscribe = db.query(Unsubscribe).filter(
|
||||
Unsubscribe.email == email,
|
||||
Unsubscribe.unsubscribe_all == True
|
||||
).first()
|
||||
|
||||
if existing_unsubscribe:
|
||||
# Remove unsubscribe record to re-subscribe
|
||||
db.delete(existing_unsubscribe)
|
||||
db.commit()
|
||||
|
||||
# Check if user exists
|
||||
user = db.query(User).filter(User.email == email).first()
|
||||
|
||||
# Check if already subscribed
|
||||
existing_subscriber = db.query(NewsletterSubscriber).filter(
|
||||
NewsletterSubscriber.email == email
|
||||
).first()
|
||||
|
||||
if existing_subscriber:
|
||||
# Re-activate if was deactivated
|
||||
if not existing_subscriber.is_active:
|
||||
existing_subscriber.is_active = True
|
||||
existing_subscriber.subscribed_at = datetime.utcnow()
|
||||
if name and not existing_subscriber.name:
|
||||
existing_subscriber.name = name
|
||||
if user and not existing_subscriber.user_id:
|
||||
existing_subscriber.user_id = user.id
|
||||
db.commit()
|
||||
else:
|
||||
# Create new subscriber record
|
||||
subscriber = NewsletterSubscriber(
|
||||
email=email,
|
||||
user_id=user.id if user else None,
|
||||
name=name or (user.full_name if user else None),
|
||||
is_active=True
|
||||
)
|
||||
db.add(subscriber)
|
||||
db.commit()
|
||||
|
||||
# Find or create newsletter segment
|
||||
newsletter_segment = db.query(CampaignSegment).filter(
|
||||
CampaignSegment.name == "Newsletter Subscribers"
|
||||
).first()
|
||||
|
||||
if not newsletter_segment:
|
||||
# Create newsletter segment if it doesn't exist
|
||||
newsletter_segment = CampaignSegment(
|
||||
name="Newsletter Subscribers",
|
||||
description="Users who have subscribed to the newsletter",
|
||||
criteria={"subscribed": True}
|
||||
)
|
||||
db.add(newsletter_segment)
|
||||
db.commit()
|
||||
db.refresh(newsletter_segment)
|
||||
|
||||
# Update segment estimated count - count active subscribers
|
||||
subscriber_count = db.query(func.count(NewsletterSubscriber.id)).filter(
|
||||
NewsletterSubscriber.is_active == True
|
||||
).scalar() or 0
|
||||
|
||||
newsletter_segment.estimated_count = subscriber_count
|
||||
newsletter_segment.last_calculated_at = datetime.utcnow()
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Successfully subscribed to newsletter",
|
||||
"email": email
|
||||
}
|
||||
|
||||
@router.get("/newsletter/subscribers")
|
||||
async def get_newsletter_subscribers(
|
||||
current_user: User = Depends(authorize_roles("admin")),
|
||||
db: Session = Depends(get_db),
|
||||
page: int = Query(1, ge=1),
|
||||
limit: int = Query(50, ge=1, le=100)
|
||||
):
|
||||
"""Get list of newsletter subscribers"""
|
||||
try:
|
||||
# Get all active newsletter subscribers
|
||||
subscribers_query = db.query(NewsletterSubscriber).filter(
|
||||
NewsletterSubscriber.is_active == True
|
||||
).order_by(NewsletterSubscriber.subscribed_at.desc())
|
||||
|
||||
# Get total count
|
||||
total = subscribers_query.count()
|
||||
|
||||
# Pagination
|
||||
offset = (page - 1) * limit
|
||||
paginated_subscribers = subscribers_query.offset(offset).limit(limit).all()
|
||||
|
||||
# Format subscribers
|
||||
subscribers_list = []
|
||||
for subscriber in paginated_subscribers:
|
||||
subscribers_list.append({
|
||||
"email": subscriber.email,
|
||||
"user_id": subscriber.user_id,
|
||||
"name": subscriber.name or (subscriber.user.full_name if subscriber.user else None),
|
||||
"type": "user" if subscriber.user_id else "guest",
|
||||
"subscribed_at": subscriber.subscribed_at.isoformat() if subscriber.subscribed_at else None
|
||||
})
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"data": {
|
||||
"subscribers": subscribers_list,
|
||||
"total": total,
|
||||
"page": page,
|
||||
"limit": limit,
|
||||
"total_pages": (total + limit - 1) // limit if limit > 0 else 1
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
from ...shared.config.logging_config import get_logger
|
||||
logger = get_logger(__name__)
|
||||
logger.error(f"Error fetching newsletter subscribers: {str(e)}", exc_info=True)
|
||||
raise HTTPException(status_code=500, detail=f"Failed to fetch subscribers: {str(e)}")
|
||||
|
||||
# Unsubscribe Routes
|
||||
@router.post("/unsubscribe")
|
||||
async def unsubscribe(
|
||||
@@ -553,6 +689,24 @@ async def unsubscribe(
|
||||
|
||||
user = db.query(User).filter(User.email == email).first()
|
||||
|
||||
# Mark newsletter subscriber as inactive if unsubscribing from all
|
||||
if unsubscribe_all:
|
||||
subscriber = db.query(NewsletterSubscriber).filter(
|
||||
NewsletterSubscriber.email == email
|
||||
).first()
|
||||
if subscriber:
|
||||
subscriber.is_active = False
|
||||
# Update segment count
|
||||
newsletter_segment = db.query(CampaignSegment).filter(
|
||||
CampaignSegment.name == "Newsletter Subscribers"
|
||||
).first()
|
||||
if newsletter_segment:
|
||||
subscriber_count = db.query(func.count(NewsletterSubscriber.id)).filter(
|
||||
NewsletterSubscriber.is_active == True
|
||||
).scalar() or 0
|
||||
newsletter_segment.estimated_count = subscriber_count
|
||||
newsletter_segment.last_calculated_at = datetime.utcnow()
|
||||
|
||||
unsubscribe_record = Unsubscribe(
|
||||
email=email,
|
||||
user_id=user.id if user else None,
|
||||
|
||||
Binary file not shown.
@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
|
||||
from ..models.email_campaign import (
|
||||
Campaign, CampaignStatus, CampaignType, EmailStatus,
|
||||
CampaignSegment, EmailTemplate, CampaignEmail, EmailClick,
|
||||
DripSequence, DripSequenceStep, DripSequenceEnrollment, Unsubscribe
|
||||
DripSequence, DripSequenceStep, DripSequenceEnrollment, Unsubscribe, NewsletterSubscriber
|
||||
)
|
||||
from ...auth.models.user import User
|
||||
from ...bookings.models.booking import Booking
|
||||
@@ -51,19 +51,54 @@ class EmailCampaignService:
|
||||
db: Session,
|
||||
campaign: Campaign
|
||||
) -> List[User]:
|
||||
"""Get list of recipients for a campaign based on segment"""
|
||||
if campaign.segment_id:
|
||||
segment = db.query(CampaignSegment).filter(
|
||||
CampaignSegment.id == campaign.segment_id
|
||||
).first()
|
||||
if segment:
|
||||
return EmailCampaignService._apply_segment_criteria(db, segment.criteria)
|
||||
"""Get list of recipients for a campaign based on segment and recipient_type"""
|
||||
recipient_type = getattr(campaign, 'recipient_type', 'users') or 'users'
|
||||
recipients = []
|
||||
|
||||
# If no segment, return all active users (or based on campaign type)
|
||||
if campaign.campaign_type == CampaignType.newsletter:
|
||||
return db.query(User).filter(User.is_active == True).all()
|
||||
# Get users if recipient_type is 'users' or 'both'
|
||||
if recipient_type in ['users', 'both']:
|
||||
if campaign.segment_id:
|
||||
segment = db.query(CampaignSegment).filter(
|
||||
CampaignSegment.id == campaign.segment_id
|
||||
).first()
|
||||
if segment:
|
||||
recipients.extend(EmailCampaignService._apply_segment_criteria(db, segment.criteria))
|
||||
else:
|
||||
# If segment not found, get all active users
|
||||
recipients.extend(db.query(User).filter(User.is_active == True).all())
|
||||
else:
|
||||
# If no segment, return all active users
|
||||
recipients.extend(db.query(User).filter(User.is_active == True).all())
|
||||
|
||||
return []
|
||||
# Get subscribers if recipient_type is 'subscribers' or 'both'
|
||||
if recipient_type in ['subscribers', 'both']:
|
||||
subscribers = db.query(NewsletterSubscriber).filter(
|
||||
NewsletterSubscriber.is_active == True
|
||||
).all()
|
||||
|
||||
# Convert subscribers to User-like objects or get their User records
|
||||
for subscriber in subscribers:
|
||||
if subscriber.user_id:
|
||||
# If subscriber is a user, get the User object (avoid duplicates)
|
||||
user = db.query(User).filter(User.id == subscriber.user_id).first()
|
||||
if user and user not in recipients:
|
||||
recipients.append(user)
|
||||
else:
|
||||
# Guest subscriber - create a minimal User-like object for sending
|
||||
# We'll handle this in send_campaign by using the email directly
|
||||
# For now, we'll skip guest subscribers in the User list
|
||||
# They'll be handled separately in send_campaign
|
||||
pass
|
||||
|
||||
# Remove duplicates based on email
|
||||
seen_emails = set()
|
||||
unique_recipients = []
|
||||
for user in recipients:
|
||||
if user.email not in seen_emails:
|
||||
seen_emails.add(user.email)
|
||||
unique_recipients.append(user)
|
||||
|
||||
return unique_recipients
|
||||
|
||||
@staticmethod
|
||||
def _apply_segment_criteria(db: Session, criteria: Dict[str, Any]) -> List[User]:
|
||||
@@ -105,9 +140,34 @@ class EmailCampaignService:
|
||||
if campaign.status not in [CampaignStatus.draft, CampaignStatus.scheduled]:
|
||||
raise ValueError(f"Cannot send campaign with status: {campaign.status}")
|
||||
|
||||
# Get recipients
|
||||
recipients = EmailCampaignService.get_campaign_recipients(db, campaign)
|
||||
campaign.total_recipients = len(recipients)
|
||||
# Get recipients (users)
|
||||
user_recipients = EmailCampaignService.get_campaign_recipients(db, campaign)
|
||||
recipient_type = getattr(campaign, 'recipient_type', 'users') or 'users'
|
||||
|
||||
# Get guest subscribers if recipient_type is 'subscribers' or 'both'
|
||||
guest_subscribers = []
|
||||
if recipient_type in ['subscribers', 'both']:
|
||||
guest_subscribers = db.query(NewsletterSubscriber).filter(
|
||||
NewsletterSubscriber.is_active == True,
|
||||
NewsletterSubscriber.user_id.is_(None) # Only guest subscribers
|
||||
).all()
|
||||
|
||||
# Combine all recipient emails
|
||||
all_recipient_emails = set()
|
||||
recipient_data = {} # Store email -> (user_id, name) mapping
|
||||
|
||||
# Add user recipients
|
||||
for user in user_recipients:
|
||||
all_recipient_emails.add(user.email)
|
||||
recipient_data[user.email] = (user.id, user.full_name)
|
||||
|
||||
# Add guest subscribers
|
||||
for subscriber in guest_subscribers:
|
||||
if subscriber.email not in all_recipient_emails:
|
||||
all_recipient_emails.add(subscriber.email)
|
||||
recipient_data[subscriber.email] = (None, subscriber.name)
|
||||
|
||||
campaign.total_recipients = len(all_recipient_emails)
|
||||
|
||||
# Update status
|
||||
campaign.status = CampaignStatus.sending
|
||||
@@ -116,17 +176,19 @@ class EmailCampaignService:
|
||||
sent_count = 0
|
||||
failed_count = 0
|
||||
|
||||
for user in recipients:
|
||||
# Check if user unsubscribed
|
||||
if EmailCampaignService._is_unsubscribed(db, user.email, campaign):
|
||||
for email in all_recipient_emails:
|
||||
# Check if email is unsubscribed
|
||||
if EmailCampaignService._is_unsubscribed(db, email, campaign):
|
||||
continue
|
||||
|
||||
user_id, name = recipient_data[email]
|
||||
|
||||
try:
|
||||
# Create campaign email record
|
||||
campaign_email = CampaignEmail(
|
||||
campaign_id=campaign.id,
|
||||
user_id=user.id,
|
||||
email=user.email,
|
||||
user_id=user_id,
|
||||
email=email,
|
||||
status=EmailStatus.pending
|
||||
)
|
||||
db.add(campaign_email)
|
||||
@@ -135,11 +197,13 @@ class EmailCampaignService:
|
||||
# Replace template variables
|
||||
html_content = EmailCampaignService._replace_variables(
|
||||
campaign.html_content or '',
|
||||
user
|
||||
name or 'Guest',
|
||||
email
|
||||
)
|
||||
subject = EmailCampaignService._replace_variables(
|
||||
campaign.subject,
|
||||
user
|
||||
name or 'Guest',
|
||||
email
|
||||
)
|
||||
|
||||
# Send email (async function)
|
||||
@@ -151,7 +215,7 @@ class EmailCampaignService:
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
loop.run_until_complete(send_email(
|
||||
to=user.email,
|
||||
to=email,
|
||||
subject=subject,
|
||||
html=html_content,
|
||||
text=campaign.text_content
|
||||
@@ -162,7 +226,7 @@ class EmailCampaignService:
|
||||
sent_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send email to {user.email}: {str(e)}")
|
||||
logger.error(f"Failed to send email to {email}: {str(e)}")
|
||||
if 'campaign_email' in locals():
|
||||
campaign_email.status = EmailStatus.failed
|
||||
campaign_email.error_message = str(e)
|
||||
@@ -177,16 +241,17 @@ class EmailCampaignService:
|
||||
return {
|
||||
"sent": sent_count,
|
||||
"failed": failed_count,
|
||||
"total": len(recipients)
|
||||
"total": len(all_recipient_emails)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _replace_variables(content: str, user: User) -> str:
|
||||
"""Replace template variables with user data"""
|
||||
def _replace_variables(content: str, name: str, email: str) -> str:
|
||||
"""Replace template variables with user/subscriber data"""
|
||||
first_name = name.split()[0] if name and ' ' in name else (name or 'Guest')
|
||||
replacements = {
|
||||
'{{name}}': user.full_name or 'Guest',
|
||||
'{{email}}': user.email,
|
||||
'{{first_name}}': user.full_name.split()[0] if user.full_name else 'Guest',
|
||||
'{{name}}': name or 'Guest',
|
||||
'{{email}}': email,
|
||||
'{{first_name}}': first_name,
|
||||
}
|
||||
|
||||
for key, value in replacements.items():
|
||||
|
||||
Binary file not shown.
@@ -1,99 +0,0 @@
|
||||
"""
|
||||
HTML sanitization utilities for backend content storage.
|
||||
Prevents XSS attacks by sanitizing HTML before storing in database.
|
||||
"""
|
||||
import bleach
|
||||
from typing import Optional
|
||||
|
||||
# Allowed HTML tags for rich content
|
||||
ALLOWED_TAGS = [
|
||||
'p', 'br', 'strong', 'em', 'u', 'b', 'i', 'span', 'div',
|
||||
'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
|
||||
'ul', 'ol', 'li',
|
||||
'a', 'blockquote', 'pre', 'code',
|
||||
'table', 'thead', 'tbody', 'tr', 'th', 'td',
|
||||
'img', 'hr', 'section', 'article'
|
||||
]
|
||||
|
||||
# Allowed HTML attributes
|
||||
ALLOWED_ATTRIBUTES = {
|
||||
'a': ['href', 'title', 'target', 'rel'],
|
||||
'img': ['src', 'alt', 'title', 'width', 'height', 'class'],
|
||||
'div': ['class', 'id', 'style'],
|
||||
'span': ['class', 'id', 'style'],
|
||||
'p': ['class', 'id', 'style'],
|
||||
'h1': ['class', 'id'],
|
||||
'h2': ['class', 'id'],
|
||||
'h3': ['class', 'id'],
|
||||
'h4': ['class', 'id'],
|
||||
'h5': ['class', 'id'],
|
||||
'h6': ['class', 'id'],
|
||||
'table': ['class', 'id'],
|
||||
'tr': ['class', 'id'],
|
||||
'th': ['class', 'id', 'colspan', 'rowspan'],
|
||||
'td': ['class', 'id', 'colspan', 'rowspan'],
|
||||
}
|
||||
|
||||
# Allowed URL schemes
|
||||
ALLOWED_SCHEMES = ['http', 'https', 'mailto', 'tel']
|
||||
|
||||
def sanitize_html(html_content: Optional[str]) -> str:
|
||||
"""
|
||||
Sanitize HTML content to prevent XSS attacks.
|
||||
|
||||
Args:
|
||||
html_content: HTML string to sanitize (can be None)
|
||||
|
||||
Returns:
|
||||
Sanitized HTML string safe for storage
|
||||
"""
|
||||
if not html_content:
|
||||
return ''
|
||||
|
||||
# Clean HTML content
|
||||
cleaned = bleach.clean(
|
||||
html_content,
|
||||
tags=ALLOWED_TAGS,
|
||||
attributes=ALLOWED_ATTRIBUTES,
|
||||
protocols=ALLOWED_SCHEMES,
|
||||
strip=True, # Strip disallowed tags instead of escaping
|
||||
strip_comments=True, # Remove HTML comments
|
||||
)
|
||||
|
||||
# Additional link sanitization - ensure external links have rel="noopener"
|
||||
if '<a' in cleaned:
|
||||
import re
|
||||
# Add rel="noopener noreferrer" to external links
|
||||
def add_rel(match):
|
||||
tag = match.group(0)
|
||||
if 'href=' in tag and ('http://' in tag or 'https://' in tag):
|
||||
if 'rel=' not in tag:
|
||||
# Insert rel attribute before closing >
|
||||
return tag[:-1] + ' rel="noopener noreferrer">'
|
||||
elif 'noopener' not in tag and 'noreferrer' not in tag:
|
||||
# Add to existing rel attribute
|
||||
tag = tag.replace('rel="', 'rel="noopener noreferrer ')
|
||||
tag = tag.replace("rel='", "rel='noopener noreferrer ")
|
||||
return tag
|
||||
return tag
|
||||
|
||||
cleaned = re.sub(r'<a[^>]*>', add_rel, cleaned)
|
||||
|
||||
return cleaned
|
||||
|
||||
def sanitize_text_for_html(text: Optional[str]) -> str:
|
||||
"""
|
||||
Escape text content to be safely included in HTML.
|
||||
Use this for plain text that should be displayed as-is.
|
||||
|
||||
Args:
|
||||
text: Plain text string to escape
|
||||
|
||||
Returns:
|
||||
HTML-escaped string
|
||||
"""
|
||||
if not text:
|
||||
return ''
|
||||
|
||||
return bleach.clean(text, tags=[], strip=True)
|
||||
|
||||
@@ -60,6 +60,25 @@ def sanitize_html(content: Optional[str], strip: bool = False) -> str:
|
||||
strip_comments=True
|
||||
)
|
||||
|
||||
# Additional link sanitization - ensure external links have rel="noopener noreferrer"
|
||||
if '<a' in sanitized:
|
||||
import re
|
||||
# Add rel="noopener noreferrer" to external links
|
||||
def add_rel(match):
|
||||
tag = match.group(0)
|
||||
if 'href=' in tag and ('http://' in tag or 'https://' in tag):
|
||||
if 'rel=' not in tag:
|
||||
# Insert rel attribute before closing >
|
||||
return tag[:-1] + ' rel="noopener noreferrer">'
|
||||
elif 'noopener' not in tag and 'noreferrer' not in tag:
|
||||
# Add to existing rel attribute
|
||||
tag = tag.replace('rel="', 'rel="noopener noreferrer ')
|
||||
tag = tag.replace("rel='", "rel='noopener noreferrer ")
|
||||
return tag
|
||||
return tag
|
||||
|
||||
sanitized = re.sub(r'<a[^>]*>', add_rel, sanitized)
|
||||
|
||||
# Linkify URLs (convert plain URLs to links)
|
||||
# Only linkify if content doesn't already contain HTML links
|
||||
if '<a' not in sanitized:
|
||||
@@ -76,6 +95,7 @@ def sanitize_text(content: Optional[str]) -> str:
|
||||
"""
|
||||
Strip all HTML tags from content, leaving only plain text.
|
||||
Useful for fields that should not contain any HTML.
|
||||
Alias for sanitize_text_for_html for backward compatibility.
|
||||
|
||||
Args:
|
||||
content: The content to sanitize (can be None)
|
||||
@@ -93,6 +113,21 @@ def sanitize_text(content: Optional[str]) -> str:
|
||||
return bleach.clean(content, tags=[], strip=True)
|
||||
|
||||
|
||||
def sanitize_text_for_html(text: Optional[str]) -> str:
|
||||
"""
|
||||
Escape text content to be safely included in HTML.
|
||||
Use this for plain text that should be displayed as-is.
|
||||
Alias for sanitize_text for consistency.
|
||||
|
||||
Args:
|
||||
text: Plain text string to escape
|
||||
|
||||
Returns:
|
||||
HTML-escaped string
|
||||
"""
|
||||
return sanitize_text(text)
|
||||
|
||||
|
||||
def sanitize_filename(filename: str) -> str:
|
||||
"""
|
||||
Sanitize filename to prevent path traversal and other attacks.
|
||||
|
||||
@@ -121,6 +121,32 @@ def test_staff_role(db_session):
|
||||
return role
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_accountant_role(db_session):
|
||||
"""Create an accountant role."""
|
||||
role = Role(
|
||||
name="accountant",
|
||||
description="Accountant role"
|
||||
)
|
||||
db_session.add(role)
|
||||
db_session.commit()
|
||||
db_session.refresh(role)
|
||||
return role
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_housekeeping_role(db_session):
|
||||
"""Create a housekeeping role."""
|
||||
role = Role(
|
||||
name="housekeeping",
|
||||
description="Housekeeping role"
|
||||
)
|
||||
db_session.add(role)
|
||||
db_session.commit()
|
||||
db_session.refresh(role)
|
||||
return role
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_user(db_session, test_role):
|
||||
"""Create a test user."""
|
||||
@@ -175,6 +201,42 @@ def test_staff_user(db_session, test_staff_role):
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_accountant_user(db_session, test_accountant_role):
|
||||
"""Create a test accountant user."""
|
||||
hashed_password = bcrypt.hashpw("accountantpassword123".encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
||||
user = User(
|
||||
email="accountant@example.com",
|
||||
password=hashed_password,
|
||||
full_name="Accountant User",
|
||||
phone="1234567890",
|
||||
role_id=test_accountant_role.id,
|
||||
is_active=True
|
||||
)
|
||||
db_session.add(user)
|
||||
db_session.commit()
|
||||
db_session.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_housekeeping_user(db_session, test_housekeeping_role):
|
||||
"""Create a test housekeeping user."""
|
||||
hashed_password = bcrypt.hashpw("housekeepingpassword123".encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
||||
user = User(
|
||||
email="housekeeping@example.com",
|
||||
password=hashed_password,
|
||||
full_name="Housekeeping User",
|
||||
phone="1234567890",
|
||||
role_id=test_housekeeping_role.id,
|
||||
is_active=True
|
||||
)
|
||||
db_session.add(user)
|
||||
db_session.commit()
|
||||
db_session.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth_token(client, test_user):
|
||||
"""Get authentication token for test user (from cookies)."""
|
||||
@@ -227,6 +289,38 @@ def staff_token(client, test_staff_user):
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def accountant_token(client, test_accountant_user):
|
||||
"""Get authentication token for accountant user (from cookies)."""
|
||||
response = client.post(
|
||||
"/api/auth/login",
|
||||
json={
|
||||
"email": "accountant@example.com",
|
||||
"password": "accountantpassword123"
|
||||
}
|
||||
)
|
||||
if response.status_code == 200:
|
||||
cookie_token = response.cookies.get("accessToken")
|
||||
return cookie_token
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def housekeeping_token(client, test_housekeeping_user):
|
||||
"""Get authentication token for housekeeping user (from cookies)."""
|
||||
response = client.post(
|
||||
"/api/auth/login",
|
||||
json={
|
||||
"email": "housekeeping@example.com",
|
||||
"password": "housekeepingpassword123"
|
||||
}
|
||||
)
|
||||
if response.status_code == 200:
|
||||
cookie_token = response.cookies.get("accessToken")
|
||||
return cookie_token
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def authenticated_client(client, test_user):
|
||||
"""Create an authenticated test client (uses cookies)."""
|
||||
@@ -257,6 +351,32 @@ def admin_client(client, test_admin_user):
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def accountant_client(client, test_accountant_user):
|
||||
"""Create an authenticated accountant test client (uses cookies)."""
|
||||
response = client.post(
|
||||
"/api/auth/login",
|
||||
json={
|
||||
"email": "accountant@example.com",
|
||||
"password": "accountantpassword123"
|
||||
}
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def housekeeping_client(client, test_housekeeping_user):
|
||||
"""Create an authenticated housekeeping test client (uses cookies)."""
|
||||
response = client.post(
|
||||
"/api/auth/login",
|
||||
json={
|
||||
"email": "housekeeping@example.com",
|
||||
"password": "housekeepingpassword123"
|
||||
}
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_room_type(db_session):
|
||||
"""Create a test room type."""
|
||||
|
||||
241
Backend/src/tests/test_role_permissions.py
Normal file
241
Backend/src/tests/test_role_permissions.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
Comprehensive role-based permission tests.
|
||||
Tests that each user role can only access authorized endpoints and features.
|
||||
"""
|
||||
import pytest
|
||||
from fastapi import status
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestAdminPermissions:
|
||||
"""Test admin role permissions."""
|
||||
|
||||
def test_admin_can_access_user_management(self, admin_client):
|
||||
"""Admin should be able to access user management."""
|
||||
response = admin_client.get("/api/admin/users")
|
||||
assert response.status_code in [200, 404] # 404 if endpoint not implemented
|
||||
|
||||
def test_admin_can_create_user(self, admin_client):
|
||||
"""Admin should be able to create users."""
|
||||
response = admin_client.post(
|
||||
"/api/admin/users",
|
||||
json={
|
||||
"email": "newuser@test.com",
|
||||
"password": "Test123!",
|
||||
"full_name": "New User",
|
||||
"role": "customer"
|
||||
}
|
||||
)
|
||||
assert response.status_code in [201, 400, 404]
|
||||
|
||||
def test_admin_can_access_analytics(self, admin_client):
|
||||
"""Admin should be able to access analytics."""
|
||||
response = admin_client.get("/api/admin/analytics/dashboard")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_admin_can_manage_rooms(self, admin_client):
|
||||
"""Admin should be able to manage rooms."""
|
||||
response = admin_client.get("/api/admin/rooms")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_admin_can_manage_bookings(self, admin_client):
|
||||
"""Admin should be able to manage all bookings."""
|
||||
response = admin_client.get("/api/admin/bookings")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_admin_can_manage_payments(self, admin_client):
|
||||
"""Admin should be able to manage payments."""
|
||||
response = admin_client.get("/api/admin/payments")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_admin_can_manage_invoices(self, admin_client):
|
||||
"""Admin should be able to manage invoices."""
|
||||
response = admin_client.get("/api/admin/invoices")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestStaffPermissions:
|
||||
"""Test staff role permissions."""
|
||||
|
||||
def test_staff_can_view_bookings(self, staff_token, client):
|
||||
"""Staff should be able to view bookings."""
|
||||
response = client.get(
|
||||
"/api/staff/bookings",
|
||||
cookies={"accessToken": staff_token} if staff_token else {}
|
||||
)
|
||||
assert response.status_code in [200, 401, 403, 404]
|
||||
|
||||
def test_staff_can_update_booking_status(self, staff_token, client, test_booking):
|
||||
"""Staff should be able to update booking status."""
|
||||
response = client.put(
|
||||
f"/api/staff/bookings/{test_booking.id}",
|
||||
json={"status": "checked_in"},
|
||||
cookies={"accessToken": staff_token} if staff_token else {}
|
||||
)
|
||||
assert response.status_code in [200, 401, 403, 404]
|
||||
|
||||
def test_staff_cannot_delete_users(self, staff_token, client, test_user):
|
||||
"""Staff should NOT be able to delete users."""
|
||||
response = client.delete(
|
||||
f"/api/admin/users/{test_user.id}",
|
||||
cookies={"accessToken": staff_token} if staff_token else {}
|
||||
)
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
def test_staff_cannot_access_system_settings(self, staff_token, client):
|
||||
"""Staff should NOT be able to access system settings."""
|
||||
response = client.get(
|
||||
"/api/admin/settings",
|
||||
cookies={"accessToken": staff_token} if staff_token else {}
|
||||
)
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestCustomerPermissions:
|
||||
"""Test customer role permissions."""
|
||||
|
||||
def test_customer_can_view_own_bookings(self, authenticated_client, test_user, test_booking):
|
||||
"""Customer should be able to view their own bookings."""
|
||||
response = authenticated_client.get("/api/bookings/me")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_customer_can_create_booking(self, authenticated_client, test_room):
|
||||
"""Customer should be able to create bookings."""
|
||||
from datetime import datetime, timedelta
|
||||
check_in = datetime.utcnow() + timedelta(days=1)
|
||||
check_out = datetime.utcnow() + timedelta(days=3)
|
||||
|
||||
response = authenticated_client.post(
|
||||
"/api/bookings",
|
||||
json={
|
||||
"room_id": test_room.id,
|
||||
"check_in_date": check_in.isoformat(),
|
||||
"check_out_date": check_out.isoformat(),
|
||||
"num_guests": 2
|
||||
}
|
||||
)
|
||||
assert response.status_code in [201, 400, 404]
|
||||
|
||||
def test_customer_cannot_view_other_bookings(self, authenticated_client, test_booking):
|
||||
"""Customer should NOT be able to view other users' bookings."""
|
||||
# Try to access booking that doesn't belong to them
|
||||
response = authenticated_client.get(f"/api/bookings/{test_booking.id}")
|
||||
# Should return 403 or 404, not 200
|
||||
assert response.status_code in [403, 404]
|
||||
|
||||
def test_customer_cannot_manage_users(self, authenticated_client):
|
||||
"""Customer should NOT be able to manage users."""
|
||||
response = authenticated_client.get("/api/admin/users")
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
def test_customer_can_view_rooms(self, client):
|
||||
"""Customer should be able to view available rooms."""
|
||||
response = client.get("/api/rooms")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_customer_can_view_own_invoices(self, authenticated_client):
|
||||
"""Customer should be able to view their own invoices."""
|
||||
response = authenticated_client.get("/api/invoices/me")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestAccountantPermissions:
|
||||
"""Test accountant role permissions."""
|
||||
|
||||
def test_accountant_can_view_payments(self, accountant_client):
|
||||
"""Accountant should be able to view payments."""
|
||||
response = accountant_client.get("/api/accountant/payments")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_accountant_can_view_invoices(self, accountant_client):
|
||||
"""Accountant should be able to view invoices."""
|
||||
response = accountant_client.get("/api/accountant/invoices")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_accountant_can_generate_reports(self, accountant_client):
|
||||
"""Accountant should be able to generate financial reports."""
|
||||
response = accountant_client.get("/api/accountant/reports")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_accountant_cannot_manage_rooms(self, accountant_client):
|
||||
"""Accountant should NOT be able to manage rooms."""
|
||||
response = accountant_client.get("/api/admin/rooms")
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
def test_accountant_cannot_manage_bookings(self, accountant_client):
|
||||
"""Accountant should NOT be able to manage bookings."""
|
||||
response = accountant_client.put("/api/admin/bookings/1", json={"status": "confirmed"})
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestHousekeepingPermissions:
|
||||
"""Test housekeeping role permissions."""
|
||||
|
||||
def test_housekeeping_can_view_tasks(self, housekeeping_client):
|
||||
"""Housekeeping should be able to view assigned tasks."""
|
||||
response = housekeeping_client.get("/api/housekeeping/tasks")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_housekeeping_can_update_task_status(self, housekeeping_client):
|
||||
"""Housekeeping should be able to update task status."""
|
||||
# This test would need a test task fixture
|
||||
# For now, just test the endpoint exists
|
||||
response = housekeeping_client.put("/api/housekeeping/tasks/1", json={"status": "completed"})
|
||||
assert response.status_code in [200, 404, 400]
|
||||
|
||||
def test_housekeeping_can_update_room_status(self, housekeeping_client):
|
||||
"""Housekeeping should be able to update room status."""
|
||||
response = housekeeping_client.put("/api/housekeeping/rooms/1/status", json={"status": "clean"})
|
||||
assert response.status_code in [200, 404, 400]
|
||||
|
||||
def test_housekeeping_cannot_manage_bookings(self, housekeeping_client):
|
||||
"""Housekeeping should NOT be able to manage bookings."""
|
||||
response = housekeeping_client.get("/api/admin/bookings")
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
def test_housekeeping_cannot_access_payments(self, housekeeping_client):
|
||||
"""Housekeeping should NOT be able to access payments."""
|
||||
response = housekeeping_client.get("/api/admin/payments")
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
class TestUnauthenticatedAccess:
|
||||
"""Test that unauthenticated users cannot access protected endpoints."""
|
||||
|
||||
def test_unauthenticated_cannot_access_admin_endpoints(self, client):
|
||||
"""Unauthenticated users should not access admin endpoints."""
|
||||
response = client.get("/api/admin/users")
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
def test_unauthenticated_cannot_access_staff_endpoints(self, client):
|
||||
"""Unauthenticated users should not access staff endpoints."""
|
||||
response = client.get("/api/staff/bookings")
|
||||
assert response.status_code in [401, 403, 404]
|
||||
|
||||
def test_unauthenticated_can_view_public_rooms(self, client):
|
||||
"""Unauthenticated users should be able to view public room listings."""
|
||||
response = client.get("/api/rooms")
|
||||
assert response.status_code in [200, 404]
|
||||
|
||||
def test_unauthenticated_cannot_create_booking(self, client, test_room):
|
||||
"""Unauthenticated users should not be able to create bookings."""
|
||||
from datetime import datetime, timedelta
|
||||
check_in = datetime.utcnow() + timedelta(days=1)
|
||||
check_out = datetime.utcnow() + timedelta(days=3)
|
||||
|
||||
response = client.post(
|
||||
"/api/bookings",
|
||||
json={
|
||||
"room_id": test_room.id,
|
||||
"check_in_date": check_in.isoformat(),
|
||||
"check_out_date": check_out.isoformat(),
|
||||
"num_guests": 2
|
||||
}
|
||||
)
|
||||
assert response.status_code in [401, 403]
|
||||
|
||||
Reference in New Issue
Block a user