Files
Hotel-Booking/Backend/src/hotel_services/routes/guest_request_routes.py
Iliyan Angelov 876af48145 updates
2025-12-07 01:28:03 +02:00

533 lines
25 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, or_, desc
from typing import List, Optional
from datetime import datetime
from ...shared.config.database import get_db
from ...shared.config.logging_config import get_logger
from ...security.middleware.auth import get_current_user, authorize_roles
from ...auth.models.user import User
from ...auth.models.role import Role
from ..models.guest_request import GuestRequest, RequestType, RequestStatus, RequestPriority
from ...bookings.models.booking import Booking, BookingStatus
from ...rooms.models.room import Room
from pydantic import BaseModel
from ...shared.utils.sanitization import sanitize_text
logger = get_logger(__name__)
router = APIRouter(prefix='/guest-requests', tags=['guest-requests'])
# ==================== Pydantic Schemas ====================
class GuestRequestCreate(BaseModel):
booking_id: int
room_id: int
request_type: str
title: str
description: Optional[str] = None
priority: str = 'normal'
guest_notes: Optional[str] = None
class GuestRequestUpdate(BaseModel):
status: Optional[str] = None
assigned_to: Optional[int] = None
staff_notes: Optional[str] = None
# ==================== Guest Requests ====================
@router.get('/')
async def get_guest_requests(
status: Optional[str] = Query(None),
request_type: Optional[str] = Query(None),
room_id: Optional[int] = Query(None),
assigned_to: Optional[int] = Query(None),
priority: Optional[str] = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get guest requests with filtering"""
try:
query = db.query(GuestRequest).options(
joinedload(GuestRequest.booking),
joinedload(GuestRequest.room),
joinedload(GuestRequest.guest)
)
# Check user role to determine access level
role = db.query(Role).filter(Role.id == current_user.role_id).first()
role_name = role.name if role else 'customer'
# Customers can only see their own requests
if role_name == 'customer':
query = query.filter(GuestRequest.user_id == current_user.id)
# Housekeeping can only see requests assigned to them or unassigned
elif role_name == 'housekeeping':
query = query.filter(
or_(
GuestRequest.assigned_to == current_user.id,
GuestRequest.assigned_to.is_(None)
)
)
# Admin and staff can see all requests (no additional filter needed)
if status:
query = query.filter(GuestRequest.status == status)
if request_type:
query = query.filter(GuestRequest.request_type == request_type)
if room_id:
query = query.filter(GuestRequest.room_id == room_id)
if assigned_to:
query = query.filter(GuestRequest.assigned_to == assigned_to)
if priority:
query = query.filter(GuestRequest.priority == priority)
# Only show requests for checked-in bookings (guests must be in the room)
query = query.join(Booking).filter(
Booking.status == BookingStatus.checked_in
)
total = query.count()
requests = query.order_by(
desc(GuestRequest.priority == RequestPriority.urgent),
desc(GuestRequest.priority == RequestPriority.high),
desc(GuestRequest.requested_at)
).offset((page - 1) * limit).limit(limit).all()
return {
'status': 'success',
'data': {
'requests': [
{
'id': req.id,
'booking_id': req.booking_id,
'room_id': req.room_id,
'room_number': req.room.room_number if req.room else None,
'user_id': req.user_id,
'guest_name': req.guest.full_name if req.guest else None,
'request_type': req.request_type.value,
'status': req.status.value,
'priority': req.priority.value,
'title': req.title,
'description': req.description,
'guest_notes': req.guest_notes,
'staff_notes': req.staff_notes,
'assigned_to': req.assigned_to,
'assigned_staff_name': req.assigned_staff.full_name if req.assigned_staff else None,
'fulfilled_by': req.fulfilled_by,
'requested_at': req.requested_at.isoformat() if req.requested_at else None,
'started_at': req.started_at.isoformat() if req.started_at else None,
'fulfilled_at': req.fulfilled_at.isoformat() if req.fulfilled_at else None,
'response_time_minutes': req.response_time_minutes,
'fulfillment_time_minutes': req.fulfillment_time_minutes,
}
for req in requests
],
'pagination': {
'total': total,
'page': page,
'limit': limit,
'total_pages': (total + limit - 1) // limit
}
}
}
except Exception as e:
logger.error(f'Error fetching guest requests: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail='Failed to fetch guest requests')
@router.post('/')
async def create_guest_request(
request_data: GuestRequestCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Create a new guest request"""
try:
# Verify booking belongs to user
booking = db.query(Booking).filter(Booking.id == request_data.booking_id).first()
if not booking:
raise HTTPException(status_code=404, detail='Booking not found')
if booking.user_id != current_user.id:
raise HTTPException(status_code=403, detail='You can only create requests for your own bookings')
# Guests can only create requests when they are checked in (in the room)
if booking.status != BookingStatus.checked_in:
raise HTTPException(
status_code=400,
detail='You can only create requests when you are checked in. Please check in first or contact reception.'
)
# Verify room matches booking
if booking.room_id != request_data.room_id:
raise HTTPException(status_code=400, detail='Room ID does not match booking')
# Sanitize user input to prevent XSS
sanitized_title = sanitize_text(request_data.title)
sanitized_description = sanitize_text(request_data.description) if request_data.description else None
sanitized_guest_notes = sanitize_text(request_data.guest_notes) if request_data.guest_notes else None
guest_request = GuestRequest(
booking_id=request_data.booking_id,
room_id=request_data.room_id,
user_id=current_user.id,
request_type=RequestType(request_data.request_type),
priority=RequestPriority(request_data.priority),
title=sanitized_title,
description=sanitized_description,
guest_notes=sanitized_guest_notes,
)
db.add(guest_request)
db.flush() # Flush to get the ID for task creation
# Auto-create housekeeping task for request types that require housekeeping
request_type = RequestType(request_data.request_type)
task_types_requiring_housekeeping = {
RequestType.extra_towels,
RequestType.extra_pillows,
RequestType.room_cleaning,
RequestType.turndown_service,
RequestType.amenities,
}
if request_type in task_types_requiring_housekeeping:
try:
from ...hotel_services.models.housekeeping_task import HousekeepingTask, HousekeepingStatus, HousekeepingType
from ...rooms.models.room import Room
# Determine housekeeping task type based on request type
task_type_map = {
RequestType.room_cleaning: HousekeepingType.stayover,
RequestType.turndown_service: HousekeepingType.turndown,
RequestType.extra_towels: HousekeepingType.stayover,
RequestType.extra_pillows: HousekeepingType.stayover,
RequestType.amenities: HousekeepingType.stayover,
}
housekeeping_task_type = task_type_map.get(request_type, HousekeepingType.stayover)
# Create default checklist based on request type
checklist_items = []
if request_type == RequestType.room_cleaning:
checklist_items = [
{'item': 'Room cleaned', 'completed': False, 'notes': ''},
{'item': 'Bathroom cleaned', 'completed': False, 'notes': ''},
{'item': 'Trash emptied', 'completed': False, 'notes': ''},
{'item': 'Beds made', 'completed': False, 'notes': ''},
]
elif request_type == RequestType.turndown_service:
checklist_items = [
{'item': 'Beds turned down', 'completed': False, 'notes': ''},
{'item': 'Curtains closed', 'completed': False, 'notes': ''},
{'item': 'Lights dimmed', 'completed': False, 'notes': ''},
{'item': 'Amenities refreshed', 'completed': False, 'notes': ''},
]
elif request_type in [RequestType.extra_towels, RequestType.extra_pillows, RequestType.amenities]:
item_name = 'Extra towels' if request_type == RequestType.extra_towels else \
'Extra pillows' if request_type == RequestType.extra_pillows else 'Amenities'
checklist_items = [
{'item': f'{item_name} delivered', 'completed': False, 'notes': request_data.description or ''},
]
# Check if a similar task already exists for this room
existing_task = db.query(HousekeepingTask).filter(
and_(
HousekeepingTask.room_id == request_data.room_id,
HousekeepingTask.task_type == housekeeping_task_type,
HousekeepingTask.status.in_([HousekeepingStatus.pending, HousekeepingStatus.in_progress]),
HousekeepingTask.booking_id == request_data.booking_id
)
).first()
if not existing_task:
# Create housekeeping task
housekeeping_task = HousekeepingTask(
room_id=request_data.room_id,
booking_id=request_data.booking_id,
task_type=housekeeping_task_type,
status=HousekeepingStatus.pending,
scheduled_time=datetime.utcnow(), # Schedule immediately for guest requests
created_by=None, # Created by system/guest request
checklist_items=checklist_items,
notes=f'Auto-created from guest request: {request_data.title}. Guest notes: {request_data.guest_notes or "None"}',
estimated_duration_minutes=15 if request_type in [RequestType.extra_towels, RequestType.extra_pillows, RequestType.amenities] else 30
)
db.add(housekeeping_task)
db.flush()
# Link guest request to housekeeping task via notes
guest_request.staff_notes = f'Auto-created housekeeping task #{housekeeping_task.id}'
# Send notification to housekeeping users
try:
from ...notifications.routes.notification_routes import notification_manager
room = db.query(Room).filter(Room.id == request_data.room_id).first()
task_data_notification = {
'id': housekeeping_task.id,
'room_id': housekeeping_task.room_id,
'room_number': room.room_number if room else None,
'task_type': housekeeping_task.task_type.value,
'status': housekeeping_task.status.value,
'scheduled_time': housekeeping_task.scheduled_time.isoformat() if housekeeping_task.scheduled_time else None,
'guest_request_id': guest_request.id,
'guest_request_title': request_data.title,
'created_at': housekeeping_task.created_at.isoformat() if housekeeping_task.created_at else None
}
notification_data = {
'type': 'housekeeping_task_available',
'data': task_data_notification
}
# Send notification to all housekeeping users
await notification_manager.send_to_role('housekeeping', notification_data)
except Exception as e:
logger.error(f'Error sending housekeeping notification for guest request: {str(e)}', exc_info=True)
logger.info(f'Auto-created housekeeping task {housekeeping_task.id} for guest request {guest_request.id} (type: {request_type.value})')
except Exception as e:
# Don't fail guest request creation if task creation fails
logger.error(f'Error auto-creating housekeeping task for guest request: {str(e)}', exc_info=True)
db.commit()
db.refresh(guest_request)
return {
'status': 'success',
'message': 'Guest request created successfully',
'data': {'id': guest_request.id}
}
except ValueError as e:
raise HTTPException(status_code=400, detail=f'Invalid enum value: {str(e)}')
except Exception as e:
logger.error(f'Error creating guest request: {str(e)}', exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail='Failed to create guest request')
@router.put('/{request_id}')
async def update_guest_request(
request_id: int,
request_data: GuestRequestUpdate,
current_user: User = Depends(authorize_roles('admin', 'staff', 'housekeeping')),
db: Session = Depends(get_db)
):
"""Update a guest request (assign, update status, add notes)"""
try:
guest_request = db.query(GuestRequest).filter(GuestRequest.id == request_id).first()
if not guest_request:
raise HTTPException(status_code=404, detail='Guest request not found')
# Check permissions
role = db.query(Role).filter(Role.id == current_user.role_id).first()
is_housekeeping = role and role.name == 'housekeeping'
if is_housekeeping:
# Housekeeping can only update requests assigned to them or unassigned
if guest_request.assigned_to and guest_request.assigned_to != current_user.id:
raise HTTPException(status_code=403, detail='You can only update requests assigned to you')
update_data = request_data.dict(exclude_unset=True)
# Handle status changes
if 'status' in update_data:
new_status = RequestStatus(update_data['status'])
old_status = guest_request.status
# Track timestamps
if new_status == RequestStatus.in_progress and old_status == RequestStatus.pending:
guest_request.started_at = datetime.utcnow()
if guest_request.requested_at:
delta = datetime.utcnow() - guest_request.requested_at
guest_request.response_time_minutes = int(delta.total_seconds() / 60)
# Auto-assign if not assigned
if not guest_request.assigned_to:
guest_request.assigned_to = current_user.id
elif new_status == RequestStatus.fulfilled and old_status != RequestStatus.fulfilled:
guest_request.fulfilled_at = datetime.utcnow()
guest_request.fulfilled_by = current_user.id
if guest_request.started_at:
delta = datetime.utcnow() - guest_request.started_at
guest_request.fulfillment_time_minutes = int(delta.total_seconds() / 60)
update_data['status'] = new_status
if 'assigned_to' in update_data:
update_data['assigned_to'] = update_data['assigned_to'] if update_data['assigned_to'] else None
for key, value in update_data.items():
setattr(guest_request, key, value)
db.commit()
db.refresh(guest_request)
return {
'status': 'success',
'message': 'Guest request updated successfully'
}
except ValueError as e:
raise HTTPException(status_code=400, detail=f'Invalid enum value: {str(e)}')
except Exception as e:
logger.error(f'Error updating guest request: {str(e)}', exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail='Failed to update guest request')
@router.get('/{request_id}')
async def get_guest_request(
request_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get a single guest request"""
try:
request = db.query(GuestRequest).options(
joinedload(GuestRequest.booking),
joinedload(GuestRequest.room),
joinedload(GuestRequest.guest),
joinedload(GuestRequest.assigned_staff),
joinedload(GuestRequest.fulfilled_staff)
).filter(GuestRequest.id == request_id).first()
if not request:
raise HTTPException(status_code=404, detail='Guest request not found')
# Check permissions
role = db.query(Role).filter(Role.id == current_user.role_id).first()
role_name = role.name if role else 'customer'
# Customers can only view their own requests
if role_name == 'customer':
if request.user_id != current_user.id:
raise HTTPException(status_code=403, detail='You can only view your own requests')
# Housekeeping can only view requests assigned to them or unassigned
elif role_name == 'housekeeping':
if request.assigned_to and request.assigned_to != current_user.id:
raise HTTPException(status_code=403, detail='You can only view requests assigned to you')
# Admin and staff can view all requests (no additional check needed)
return {
'status': 'success',
'data': {
'id': request.id,
'booking_id': request.booking_id,
'room_id': request.room_id,
'room_number': request.room.room_number if request.room else None,
'user_id': request.user_id,
'guest_name': request.guest.full_name if request.guest else None,
'guest_email': request.guest.email if request.guest else None,
'request_type': request.request_type.value,
'status': request.status.value,
'priority': request.priority.value,
'title': request.title,
'description': request.description,
'guest_notes': request.guest_notes,
'staff_notes': request.staff_notes,
'assigned_to': request.assigned_to,
'assigned_staff_name': request.assigned_staff.full_name if request.assigned_staff else None,
'fulfilled_by': request.fulfilled_by,
'fulfilled_staff_name': request.fulfilled_staff.full_name if request.fulfilled_staff else None,
'requested_at': request.requested_at.isoformat() if request.requested_at else None,
'started_at': request.started_at.isoformat() if request.started_at else None,
'fulfilled_at': request.fulfilled_at.isoformat() if request.fulfilled_at else None,
'response_time_minutes': request.response_time_minutes,
'fulfillment_time_minutes': request.fulfillment_time_minutes,
}
}
except Exception as e:
logger.error(f'Error fetching guest request: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail='Failed to fetch guest request')
@router.post('/{request_id}/assign')
async def assign_request(
request_id: int,
current_user: User = Depends(authorize_roles('admin', 'staff', 'housekeeping')),
db: Session = Depends(get_db)
):
"""Assign a request to the current user (housekeeping)"""
try:
request = db.query(GuestRequest).filter(GuestRequest.id == request_id).first()
if not request:
raise HTTPException(status_code=404, detail='Guest request not found')
if request.status == RequestStatus.fulfilled:
raise HTTPException(status_code=400, detail='Cannot assign a fulfilled request')
request.assigned_to = current_user.id
if request.status == RequestStatus.pending:
request.status = RequestStatus.in_progress
request.started_at = datetime.utcnow()
if request.requested_at:
delta = datetime.utcnow() - request.requested_at
request.response_time_minutes = int(delta.total_seconds() / 60)
db.commit()
return {
'status': 'success',
'message': 'Request assigned successfully'
}
except Exception as e:
logger.error(f'Error assigning request: {str(e)}', exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail='Failed to assign request')
@router.post('/{request_id}/fulfill')
async def fulfill_request(
request_id: int,
staff_notes: Optional[str] = None,
current_user: User = Depends(authorize_roles('admin', 'staff', 'housekeeping')),
db: Session = Depends(get_db)
):
"""Mark a request as fulfilled"""
try:
request = db.query(GuestRequest).filter(GuestRequest.id == request_id).first()
if not request:
raise HTTPException(status_code=404, detail='Guest request not found')
# Check permissions
role = db.query(Role).filter(Role.id == current_user.role_id).first()
is_housekeeping = role and role.name == 'housekeeping'
if is_housekeeping and request.assigned_to != current_user.id:
raise HTTPException(status_code=403, detail='You can only fulfill requests assigned to you')
if request.status == RequestStatus.fulfilled:
raise HTTPException(status_code=400, detail='Request is already fulfilled')
request.status = RequestStatus.fulfilled
request.fulfilled_by = current_user.id
request.fulfilled_at = datetime.utcnow()
if staff_notes:
sanitized_notes = sanitize_text(staff_notes)
request.staff_notes = (request.staff_notes or '') + f'\n{sanitized_notes}' if request.staff_notes else sanitized_notes
if request.started_at:
delta = datetime.utcnow() - request.started_at
request.fulfillment_time_minutes = int(delta.total_seconds() / 60)
elif request.requested_at:
# If never started, calculate from request time
delta = datetime.utcnow() - request.requested_at
request.fulfillment_time_minutes = int(delta.total_seconds() / 60)
db.commit()
return {
'status': 'success',
'message': 'Request marked as fulfilled'
}
except Exception as e:
logger.error(f'Error fulfilling request: {str(e)}', exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail='Failed to fulfill request')