Files
Hotel-Booking/Backend/src/services/invoice_service.py
Iliyan Angelov 6f85b8cf17 updates
2025-11-21 01:20:51 +02:00

175 lines
13 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import func, and_, or_
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from ..models.invoice import Invoice, InvoiceItem, InvoiceStatus
from ..models.booking import Booking
from ..models.payment import Payment, PaymentStatus
from ..models.user import User
def generate_invoice_number(db: Session, is_proforma: bool=False) -> str:
prefix = 'PRO' if is_proforma else 'INV'
today = datetime.utcnow().strftime('%Y%m%d')
last_invoice = db.query(Invoice).filter(Invoice.invoice_number.like(f'{prefix}-{today}-%')).order_by(Invoice.invoice_number.desc()).first()
if last_invoice:
try:
sequence = int(last_invoice.invoice_number.split('-')[-1])
sequence += 1
except (ValueError, IndexError):
sequence = 1
else:
sequence = 1
return f'{prefix}-{today}-{sequence:04d}'
class InvoiceService:
@staticmethod
def create_invoice_from_booking(booking_id: int, db: Session, created_by_id: Optional[int]=None, tax_rate: float=0.0, discount_amount: float=0.0, due_days: int=30, is_proforma: bool=False, invoice_amount: Optional[float]=None, **kwargs) -> Dict[str, Any]:
from sqlalchemy.orm import selectinload
booking = db.query(Booking).options(selectinload(Booking.service_usages).selectinload('service'), selectinload(Booking.room).selectinload('room_type'), selectinload(Booking.payments)).filter(Booking.id == booking_id).first()
if not booking:
raise ValueError('Booking not found')
user = db.query(User).filter(User.id == booking.user_id).first()
if not user:
raise ValueError('User not found')
invoice_number = generate_invoice_number(db, is_proforma=is_proforma)
booking_total = float(booking.total_price)
if invoice_amount is not None:
subtotal = float(invoice_amount)
if invoice_amount < booking_total and discount_amount > 0:
if discount_amount > subtotal * 0.5:
proportion = float(invoice_amount) / booking_total
original_discount = float(booking.discount_amount) if booking.discount_amount else discount_amount
discount_amount = original_discount * proportion
else:
subtotal = booking_total
tax_amount = (subtotal - discount_amount) * (tax_rate / 100)
total_amount = subtotal + tax_amount - discount_amount
amount_paid = sum((float(p.amount) for p in booking.payments if p.payment_status == PaymentStatus.completed))
balance_due = total_amount - amount_paid
if balance_due <= 0:
status = InvoiceStatus.paid
paid_date = datetime.utcnow()
elif amount_paid > 0:
status = InvoiceStatus.sent
paid_date = None
else:
status = InvoiceStatus.draft
paid_date = None
invoice = Invoice(invoice_number=invoice_number, booking_id=booking_id, user_id=booking.user_id, issue_date=datetime.utcnow(), due_date=datetime.utcnow() + timedelta(days=due_days), paid_date=paid_date, subtotal=subtotal, tax_rate=tax_rate, tax_amount=tax_amount, discount_amount=discount_amount, total_amount=total_amount, amount_paid=amount_paid, balance_due=balance_due, status=status, is_proforma=is_proforma, company_name=kwargs.get('company_name'), company_address=kwargs.get('company_address'), company_phone=kwargs.get('company_phone'), company_email=kwargs.get('company_email'), company_tax_id=kwargs.get('company_tax_id'), company_logo_url=kwargs.get('company_logo_url'), customer_name=user.full_name or f'{user.email}', customer_email=user.email, customer_address=user.address, customer_phone=user.phone, customer_tax_id=kwargs.get('customer_tax_id'), notes=kwargs.get('notes'), terms_and_conditions=kwargs.get('terms_and_conditions'), payment_instructions=kwargs.get('payment_instructions'), created_by_id=created_by_id)
db.add(invoice)
db.flush()
services_total = sum((float(su.total_price) for su in booking.service_usages))
booking_total = float(booking.total_price)
room_price = booking_total - services_total
nights = (booking.check_out_date - booking.check_in_date).days
if nights <= 0:
nights = 1
if invoice_amount is not None and invoice_amount < booking_total:
proportion = float(invoice_amount) / booking_total
room_price = room_price * proportion
services_total = services_total * proportion
item_description_suffix = f' (Partial: {proportion * 100:.0f}%)'
else:
item_description_suffix = ''
room_item = InvoiceItem(invoice_id=invoice.id, description=f'Room: {booking.room.room_number} - {(booking.room.room_type.name if booking.room.room_type else 'N/A')} ({nights} night{('s' if nights > 1 else '')}){item_description_suffix}', quantity=nights, unit_price=room_price / nights if nights > 0 else room_price, tax_rate=tax_rate, discount_amount=0.0, line_total=room_price, room_id=booking.room_id)
db.add(room_item)
for service_usage in booking.service_usages:
service_item_price = float(service_usage.total_price)
if invoice_amount is not None and invoice_amount < booking_total:
proportion = float(invoice_amount) / booking_total
service_item_price = service_item_price * proportion
service_item = InvoiceItem(invoice_id=invoice.id, description=f'Service: {service_usage.service.name}{item_description_suffix}', quantity=float(service_usage.quantity), unit_price=service_item_price / float(service_usage.quantity) if service_usage.quantity > 0 else service_item_price, tax_rate=tax_rate, discount_amount=0.0, line_total=service_item_price, service_id=service_usage.service_id)
db.add(service_item)
subtotal = room_price + services_total
tax_amount = (subtotal - discount_amount) * (tax_rate / 100)
total_amount = subtotal + tax_amount - discount_amount
balance_due = total_amount - amount_paid
invoice.subtotal = subtotal
invoice.tax_amount = tax_amount
invoice.total_amount = total_amount
invoice.balance_due = balance_due
db.commit()
db.refresh(invoice)
return InvoiceService.invoice_to_dict(invoice)
@staticmethod
def update_invoice(invoice_id: int, db: Session, updated_by_id: Optional[int]=None, **kwargs) -> Dict[str, Any]:
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if not invoice:
raise ValueError('Invoice not found')
allowed_fields = ['company_name', 'company_address', 'company_phone', 'company_email', 'company_tax_id', 'company_logo_url', 'notes', 'terms_and_conditions', 'payment_instructions', 'status', 'due_date', 'tax_rate', 'discount_amount']
for field in allowed_fields:
if field in kwargs:
setattr(invoice, field, kwargs[field])
if 'tax_rate' in kwargs or 'discount_amount' in kwargs:
tax_rate = kwargs.get('tax_rate', invoice.tax_rate)
discount_amount = kwargs.get('discount_amount', invoice.discount_amount)
invoice.tax_amount = (invoice.subtotal - discount_amount) * (float(tax_rate) / 100)
invoice.total_amount = invoice.subtotal + invoice.tax_amount - discount_amount
invoice.balance_due = invoice.total_amount - invoice.amount_paid
if invoice.balance_due <= 0 and invoice.status != InvoiceStatus.paid:
invoice.status = InvoiceStatus.paid
invoice.paid_date = datetime.utcnow()
elif invoice.balance_due > 0 and invoice.status == InvoiceStatus.paid:
invoice.status = InvoiceStatus.sent
invoice.paid_date = None
invoice.updated_by_id = updated_by_id
invoice.updated_at = datetime.utcnow()
db.commit()
db.refresh(invoice)
return InvoiceService.invoice_to_dict(invoice)
@staticmethod
def mark_invoice_as_paid(invoice_id: int, db: Session, amount: Optional[float]=None, updated_by_id: Optional[int]=None) -> Dict[str, Any]:
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if not invoice:
raise ValueError('Invoice not found')
payment_amount = amount if amount is not None else float(invoice.balance_due)
invoice.amount_paid += payment_amount
invoice.balance_due = invoice.total_amount - invoice.amount_paid
if invoice.balance_due <= 0:
invoice.status = InvoiceStatus.paid
invoice.paid_date = datetime.utcnow()
else:
invoice.status = InvoiceStatus.sent
invoice.updated_by_id = updated_by_id
invoice.updated_at = datetime.utcnow()
db.commit()
db.refresh(invoice)
return InvoiceService.invoice_to_dict(invoice)
@staticmethod
def get_invoice(invoice_id: int, db: Session) -> Optional[Dict[str, Any]]:
invoice = db.query(Invoice).filter(Invoice.id == invoice_id).first()
if not invoice:
return None
return InvoiceService.invoice_to_dict(invoice)
@staticmethod
def get_invoices(db: Session, user_id: Optional[int]=None, booking_id: Optional[int]=None, status: Optional[str]=None, page: int=1, limit: int=10) -> Dict[str, Any]:
query = db.query(Invoice)
if user_id:
query = query.filter(Invoice.user_id == user_id)
if booking_id:
query = query.filter(Invoice.booking_id == booking_id)
if status:
try:
status_enum = InvoiceStatus(status)
query = query.filter(Invoice.status == status_enum)
except ValueError:
pass
total = query.count()
offset = (page - 1) * limit
invoices = query.order_by(Invoice.created_at.desc()).offset(offset).limit(limit).all()
return {'invoices': [InvoiceService.invoice_to_dict(inv) for inv in invoices], 'total': total, 'page': page, 'limit': limit, 'total_pages': (total + limit - 1) // limit}
@staticmethod
def invoice_to_dict(invoice: Invoice) -> Dict[str, Any]:
promotion_code = None
if invoice.notes and 'Promotion Code:' in invoice.notes:
try:
promotion_code = invoice.notes.split('Promotion Code:')[1].split('\n')[0].strip()
except:
pass
return {'id': invoice.id, 'invoice_number': invoice.invoice_number, 'booking_id': invoice.booking_id, 'user_id': invoice.user_id, 'issue_date': invoice.issue_date.isoformat() if invoice.issue_date else None, 'due_date': invoice.due_date.isoformat() if invoice.due_date else None, 'paid_date': invoice.paid_date.isoformat() if invoice.paid_date else None, 'subtotal': float(invoice.subtotal) if invoice.subtotal else 0.0, 'tax_rate': float(invoice.tax_rate) if invoice.tax_rate else 0.0, 'tax_amount': float(invoice.tax_amount) if invoice.tax_amount else 0.0, 'discount_amount': float(invoice.discount_amount) if invoice.discount_amount else 0.0, 'total_amount': float(invoice.total_amount) if invoice.total_amount else 0.0, 'amount_paid': float(invoice.amount_paid) if invoice.amount_paid else 0.0, 'balance_due': float(invoice.balance_due) if invoice.balance_due else 0.0, 'status': invoice.status.value if invoice.status else None, 'company_name': invoice.company_name, 'company_address': invoice.company_address, 'company_phone': invoice.company_phone, 'company_email': invoice.company_email, 'company_tax_id': invoice.company_tax_id, 'company_logo_url': invoice.company_logo_url, 'customer_name': invoice.customer_name, 'customer_email': invoice.customer_email, 'customer_address': invoice.customer_address, 'customer_phone': invoice.customer_phone, 'customer_tax_id': invoice.customer_tax_id, 'notes': invoice.notes, 'terms_and_conditions': invoice.terms_and_conditions, 'payment_instructions': invoice.payment_instructions, 'is_proforma': invoice.is_proforma if hasattr(invoice, 'is_proforma') else False, 'promotion_code': promotion_code, 'items': [{'id': item.id, 'description': item.description, 'quantity': float(item.quantity) if item.quantity else 0.0, 'unit_price': float(item.unit_price) if item.unit_price else 0.0, 'tax_rate': float(item.tax_rate) if item.tax_rate else 0.0, 'discount_amount': float(item.discount_amount) if item.discount_amount else 0.0, 'line_total': float(item.line_total) if item.line_total else 0.0, 'room_id': item.room_id, 'service_id': item.service_id} for item in invoice.items], 'created_at': invoice.created_at.isoformat() if invoice.created_at else None, 'updated_at': invoice.updated_at.isoformat() if invoice.updated_at else None}