307 lines
15 KiB
Python
307 lines
15 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request
|
|
from sqlalchemy.orm import Session
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
from ...shared.config.database import get_db
|
|
from ...shared.config.logging_config import get_logger
|
|
from ...security.middleware.auth import get_current_user, authorize_roles
|
|
from ...security.middleware.step_up_auth import authorize_financial_access
|
|
from ...auth.models.user import User
|
|
from ..models.invoice import Invoice, InvoiceStatus
|
|
from ...bookings.models.booking import Booking
|
|
from ..services.invoice_service import InvoiceService
|
|
from ...shared.utils.role_helpers import can_access_all_invoices, can_create_invoices
|
|
from ...shared.utils.response_helpers import success_response
|
|
from ...shared.utils.request_helpers import get_request_id
|
|
from ..services.financial_audit_service import financial_audit_service
|
|
from ..models.financial_audit_trail import FinancialActionType
|
|
from ..schemas.invoice import (
|
|
CreateInvoiceRequest,
|
|
UpdateInvoiceRequest,
|
|
MarkInvoicePaidRequest
|
|
)
|
|
|
|
logger = get_logger(__name__)
|
|
router = APIRouter(prefix='/invoices', tags=['invoices'])
|
|
|
|
@router.get('/')
|
|
async def get_invoices(request: Request, booking_id: Optional[int]=Query(None), status_filter: Optional[str]=Query(None, alias='status'), page: int=Query(1, ge=1), limit: int=Query(10, ge=1, le=100), current_user: User=Depends(get_current_user), db: Session=Depends(get_db)):
|
|
try:
|
|
# SECURITY: Verify booking ownership when booking_id is provided
|
|
if booking_id and not can_access_all_invoices(current_user, db):
|
|
booking = db.query(Booking).filter(Booking.id == booking_id).first()
|
|
if not booking:
|
|
raise HTTPException(status_code=404, detail='Booking not found')
|
|
if booking.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail='Forbidden: You do not have permission to access invoices for this booking')
|
|
|
|
user_id = None if can_access_all_invoices(current_user, db) else current_user.id
|
|
result = InvoiceService.get_invoices(db=db, user_id=user_id, booking_id=booking_id, status=status_filter, page=page, limit=limit)
|
|
return success_response(data=result)
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error fetching invoices: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.get('/{id}')
|
|
async def get_invoice_by_id(id: int, current_user: User=Depends(get_current_user), db: Session=Depends(get_db)):
|
|
try:
|
|
invoice = InvoiceService.get_invoice(id, db)
|
|
if not invoice:
|
|
raise HTTPException(status_code=404, detail='Invoice not found')
|
|
# SECURITY: Verify invoice ownership for non-admin/accountant users
|
|
if not can_access_all_invoices(current_user, db):
|
|
if 'user_id' not in invoice or invoice['user_id'] != current_user.id:
|
|
raise HTTPException(status_code=403, detail='Forbidden: You do not have permission to access this invoice')
|
|
return success_response(data={'invoice': invoice})
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error fetching invoice by id: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post('/', dependencies=[Depends(authorize_roles('admin', 'staff', 'accountant'))])
|
|
async def create_invoice(request: Request, invoice_data: CreateInvoiceRequest, current_user: User=Depends(authorize_roles('admin', 'staff', 'accountant')), db: Session=Depends(get_db)):
|
|
try:
|
|
# Defense in depth: Additional business logic check (authorize_roles already verified)
|
|
if not can_create_invoices(current_user, db):
|
|
raise HTTPException(status_code=403, detail='Forbidden')
|
|
booking_id = invoice_data.booking_id
|
|
booking = db.query(Booking).filter(Booking.id == booking_id).first()
|
|
if not booking:
|
|
raise HTTPException(status_code=404, detail='Booking not found')
|
|
invoice_kwargs = {
|
|
'company_name': invoice_data.company_name,
|
|
'company_address': invoice_data.company_address,
|
|
'company_phone': invoice_data.company_phone,
|
|
'company_email': invoice_data.company_email,
|
|
'company_tax_id': invoice_data.company_tax_id,
|
|
'company_logo_url': invoice_data.company_logo_url,
|
|
'customer_tax_id': invoice_data.customer_tax_id,
|
|
'notes': invoice_data.notes,
|
|
'terms_and_conditions': invoice_data.terms_and_conditions,
|
|
'payment_instructions': invoice_data.payment_instructions
|
|
}
|
|
invoice_notes = invoice_kwargs.get('notes', '')
|
|
if booking.promotion_code:
|
|
promotion_note = f'Promotion Code: {booking.promotion_code}'
|
|
invoice_notes = f'{promotion_note}\n{invoice_notes}'.strip() if invoice_notes else promotion_note
|
|
invoice_kwargs['notes'] = invoice_notes
|
|
request_id = get_request_id(request)
|
|
invoice = InvoiceService.create_invoice_from_booking(
|
|
booking_id=booking_id,
|
|
db=db,
|
|
created_by_id=current_user.id,
|
|
tax_rate=invoice_data.tax_rate,
|
|
discount_amount=invoice_data.discount_amount,
|
|
due_days=invoice_data.due_days,
|
|
request_id=request_id,
|
|
**invoice_kwargs
|
|
)
|
|
# Financial audit: invoice created
|
|
try:
|
|
financial_audit_service.log_financial_action(
|
|
db=db,
|
|
action_type=FinancialActionType.invoice_created,
|
|
performed_by=current_user.id,
|
|
action_description=f'Invoice {invoice.get("invoice_number")} created from booking {booking_id}',
|
|
invoice_id=invoice.get('id'),
|
|
booking_id=booking_id,
|
|
amount=invoice.get('total_amount'),
|
|
currency=None,
|
|
metadata={
|
|
'request_id': request_id,
|
|
'role': getattr(current_user.role, 'name', None),
|
|
},
|
|
)
|
|
except Exception:
|
|
# Do not block main flow on audit logging failure
|
|
logger.warning('Failed to log financial audit for invoice creation', exc_info=True)
|
|
return success_response(data={'invoice': invoice}, message='Invoice created successfully')
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
db.rollback()
|
|
logger.error(f'Error creating invoice: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error creating invoice: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.put('/{id}')
|
|
async def update_invoice(request: Request, id: int, invoice_data: UpdateInvoiceRequest, current_user: User=Depends(authorize_financial_access('admin', 'accountant')), db: Session=Depends(get_db)):
|
|
try:
|
|
invoice = db.query(Invoice).filter(Invoice.id == id).first()
|
|
if not invoice:
|
|
raise HTTPException(status_code=404, detail='Invoice not found')
|
|
request_id = get_request_id(request)
|
|
invoice_dict = invoice_data.model_dump(exclude_unset=True)
|
|
updated_invoice = InvoiceService.update_invoice(invoice_id=id, db=db, updated_by_id=current_user.id, request_id=request_id, **invoice_dict)
|
|
# Financial audit: invoice updated
|
|
try:
|
|
financial_audit_service.log_financial_action(
|
|
db=db,
|
|
action_type=FinancialActionType.invoice_updated,
|
|
performed_by=current_user.id,
|
|
action_description=f'Invoice {updated_invoice.get("invoice_number")} updated',
|
|
invoice_id=id,
|
|
booking_id=updated_invoice.get('booking_id'),
|
|
amount=updated_invoice.get('total_amount'),
|
|
previous_amount=float(invoice.total_amount) if getattr(invoice, 'total_amount', None) is not None else None,
|
|
currency=None,
|
|
metadata={
|
|
'request_id': request_id,
|
|
'updated_fields': list(invoice_dict.keys()),
|
|
'role': getattr(current_user.role, 'name', None),
|
|
},
|
|
)
|
|
except Exception:
|
|
logger.warning('Failed to log financial audit for invoice update', exc_info=True)
|
|
return success_response(data={'invoice': updated_invoice}, message='Invoice updated successfully')
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
db.rollback()
|
|
logger.error(f'Error updating invoice: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error updating invoice: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post('/{id}/mark-paid')
|
|
async def mark_invoice_as_paid(request: Request, id: int, payment_data: MarkInvoicePaidRequest, current_user: User=Depends(authorize_financial_access('admin', 'accountant')), db: Session=Depends(get_db)):
|
|
try:
|
|
request_id = get_request_id(request)
|
|
amount = payment_data.amount
|
|
updated_invoice = InvoiceService.mark_invoice_as_paid(invoice_id=id, db=db, amount=amount, updated_by_id=current_user.id, request_id=request_id)
|
|
# Financial audit: invoice marked as paid
|
|
try:
|
|
financial_audit_service.log_financial_action(
|
|
db=db,
|
|
action_type=FinancialActionType.invoice_paid,
|
|
performed_by=current_user.id,
|
|
action_description=f'Invoice {updated_invoice.get("invoice_number")} marked as paid',
|
|
invoice_id=id,
|
|
booking_id=updated_invoice.get('booking_id'),
|
|
amount=amount if amount is not None else updated_invoice.get('total_amount'),
|
|
currency=None,
|
|
metadata={
|
|
'request_id': request_id,
|
|
'role': getattr(current_user.role, 'name', None),
|
|
},
|
|
)
|
|
except Exception:
|
|
logger.warning('Failed to log financial audit for mark-as-paid', exc_info=True)
|
|
return success_response(data={'invoice': updated_invoice}, message='Invoice marked as paid successfully')
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
db.rollback()
|
|
logger.error(f'Error marking invoice as paid: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error marking invoice as paid: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.delete('/{id}')
|
|
async def delete_invoice(id: int, request: Request, current_user: User=Depends(authorize_roles('admin')), db: Session=Depends(get_db)):
|
|
client_ip = request.client.host if request.client else None
|
|
user_agent = request.headers.get('User-Agent')
|
|
request_id = get_request_id(request)
|
|
|
|
try:
|
|
invoice = db.query(Invoice).filter(Invoice.id == id).first()
|
|
if not invoice:
|
|
raise HTTPException(status_code=404, detail='Invoice not found')
|
|
|
|
# Capture invoice info before deletion for audit
|
|
deleted_invoice_info = {
|
|
'invoice_id': invoice.id,
|
|
'invoice_number': invoice.invoice_number,
|
|
'user_id': invoice.user_id,
|
|
'booking_id': invoice.booking_id,
|
|
'total_amount': float(invoice.total_amount) if invoice.total_amount else 0.0,
|
|
'status': invoice.status.value if hasattr(invoice.status, 'value') else str(invoice.status),
|
|
}
|
|
|
|
db.delete(invoice)
|
|
db.commit()
|
|
|
|
# SECURITY: Log invoice deletion for audit trail
|
|
try:
|
|
await audit_service.log_action(
|
|
db=db,
|
|
action='invoice_deleted',
|
|
resource_type='invoice',
|
|
user_id=current_user.id,
|
|
resource_id=id,
|
|
ip_address=client_ip,
|
|
user_agent=user_agent,
|
|
request_id=request_id,
|
|
details=deleted_invoice_info,
|
|
status='success'
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f'Failed to log invoice deletion audit: {e}')
|
|
|
|
return success_response(message='Invoice deleted successfully')
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.get('/booking/{booking_id}')
|
|
async def get_invoices_by_booking(booking_id: int, current_user: User=Depends(get_current_user), db: Session=Depends(get_db)):
|
|
try:
|
|
booking = db.query(Booking).filter(Booking.id == booking_id).first()
|
|
if not booking:
|
|
raise HTTPException(status_code=404, detail='Booking not found')
|
|
if not can_access_all_invoices(current_user, db) and booking.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail='Forbidden')
|
|
result = InvoiceService.get_invoices(db=db, booking_id=booking_id)
|
|
return success_response(data=result)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error fetching invoices by booking: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post('/{id}/send-email')
|
|
async def send_invoice_email(request: Request, id: int, current_user: User=Depends(authorize_roles('admin', 'staff', 'accountant')), db: Session=Depends(get_db)):
|
|
try:
|
|
invoice = db.query(Invoice).filter(Invoice.id == id).first()
|
|
if not invoice:
|
|
raise HTTPException(status_code=404, detail='Invoice not found')
|
|
|
|
from ..routes.booking_routes import _generate_invoice_email_html
|
|
from ...auth.models.user import User as UserModel
|
|
from ...shared.utils.mailer import send_email
|
|
|
|
invoice_dict = InvoiceService.invoice_to_dict(invoice)
|
|
invoice_html = _generate_invoice_email_html(invoice_dict, is_proforma=invoice.is_proforma)
|
|
invoice_type = 'Proforma Invoice' if invoice.is_proforma else 'Invoice'
|
|
|
|
user = db.query(UserModel).filter(UserModel.id == invoice.user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail='User not found')
|
|
|
|
await send_email(
|
|
to=user.email,
|
|
subject=f'{invoice_type} {invoice.invoice_number}',
|
|
html=invoice_html
|
|
)
|
|
|
|
logger.info(f'{invoice_type} {invoice.invoice_number} sent to {user.email}')
|
|
return success_response(message=f'{invoice_type} sent successfully to {user.email}')
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error sending invoice email: {str(e)}', exc_info=True)
|
|
raise HTTPException(status_code=500, detail=str(e)) |