Files
Hotel-Booking/Backend/src/services/stripe_service.py
Iliyan Angelov 44e11520c5 updates
2025-11-20 02:18:52 +02:00

584 lines
27 KiB
Python

"""
Stripe payment service for processing card payments
"""
import logging
import stripe
from typing import Optional, Dict, Any
from ..config.settings import settings
from ..models.payment import Payment, PaymentMethod, PaymentType, PaymentStatus
from ..models.booking import Booking, BookingStatus
from ..models.system_settings import SystemSettings
from sqlalchemy.orm import Session
from datetime import datetime
logger = logging.getLogger(__name__)
def get_stripe_secret_key(db: Session) -> Optional[str]:
"""Get Stripe secret key from database or environment variable"""
try:
setting = db.query(SystemSettings).filter(
SystemSettings.key == "stripe_secret_key"
).first()
if setting and setting.value:
return setting.value
except Exception:
pass
# Fallback to environment variable
return settings.STRIPE_SECRET_KEY if settings.STRIPE_SECRET_KEY else None
def get_stripe_publishable_key(db: Session) -> Optional[str]:
"""Get Stripe publishable key from database or environment variable"""
try:
setting = db.query(SystemSettings).filter(
SystemSettings.key == "stripe_publishable_key"
).first()
if setting and setting.value:
return setting.value
except Exception:
pass
# Fallback to environment variable
return settings.STRIPE_PUBLISHABLE_KEY if settings.STRIPE_PUBLISHABLE_KEY else None
def get_stripe_webhook_secret(db: Session) -> Optional[str]:
"""Get Stripe webhook secret from database or environment variable"""
try:
setting = db.query(SystemSettings).filter(
SystemSettings.key == "stripe_webhook_secret"
).first()
if setting and setting.value:
return setting.value
except Exception:
pass
# Fallback to environment variable
return settings.STRIPE_WEBHOOK_SECRET if settings.STRIPE_WEBHOOK_SECRET else None
class StripeService:
"""Service for handling Stripe payments"""
@staticmethod
def create_payment_intent(
amount: float,
currency: str = "usd",
metadata: Optional[Dict[str, Any]] = None,
customer_id: Optional[str] = None,
db: Optional[Session] = None
) -> Dict[str, Any]:
"""
Create a Stripe Payment Intent
Args:
amount: Payment amount in smallest currency unit (cents for USD)
currency: Currency code (default: usd)
metadata: Additional metadata to attach to the payment intent
customer_id: Optional Stripe customer ID
db: Optional database session to get keys from database
Returns:
Payment intent object
"""
# Get secret key from database or environment
secret_key = None
if db:
secret_key = get_stripe_secret_key(db)
if not secret_key:
secret_key = settings.STRIPE_SECRET_KEY
if not secret_key:
raise ValueError("Stripe secret key is not configured")
# Set the API key for this request
stripe.api_key = secret_key
# Validate amount is reasonable (Stripe max is $999,999.99)
if amount <= 0:
raise ValueError("Amount must be greater than 0")
if amount > 999999.99:
raise ValueError(f"Amount ${amount:,.2f} exceeds Stripe's maximum of $999,999.99")
# Convert amount to cents (smallest currency unit)
# Amount should be in dollars, so multiply by 100 to get cents
amount_in_cents = int(round(amount * 100))
# Double-check the cents amount doesn't exceed Stripe's limit
if amount_in_cents > 99999999: # $999,999.99 in cents
raise ValueError(f"Amount ${amount:,.2f} (${amount_in_cents} cents) exceeds Stripe's maximum")
intent_params = {
"amount": amount_in_cents,
"currency": currency,
"automatic_payment_methods": {
"enabled": True,
},
"metadata": metadata or {},
}
if customer_id:
intent_params["customer"] = customer_id
try:
intent = stripe.PaymentIntent.create(**intent_params)
return {
"client_secret": intent.client_secret,
"id": intent.id,
"status": intent.status,
"amount": intent.amount,
"currency": intent.currency,
}
except stripe.StripeError as e:
raise ValueError(f"Stripe error: {str(e)}")
@staticmethod
def retrieve_payment_intent(
payment_intent_id: str,
db: Optional[Session] = None
) -> Dict[str, Any]:
"""
Retrieve a payment intent by ID
Args:
payment_intent_id: Stripe payment intent ID
db: Optional database session to get keys from database
Returns:
Payment intent object
"""
# Get secret key from database or environment
secret_key = None
if db:
secret_key = get_stripe_secret_key(db)
if not secret_key:
secret_key = settings.STRIPE_SECRET_KEY
if not secret_key:
raise ValueError("Stripe secret key is not configured")
# Set the API key for this request
stripe.api_key = secret_key
try:
intent = stripe.PaymentIntent.retrieve(payment_intent_id)
# Safely access charges - they may not exist on all payment intents
charges = []
if hasattr(intent, 'charges') and intent.charges:
charges_data = getattr(intent.charges, 'data', [])
charges = [
{
"id": charge.id,
"paid": charge.paid,
"status": charge.status,
}
for charge in charges_data
]
return {
"id": intent.id,
"status": intent.status,
"amount": intent.amount / 100, # Convert from cents
"currency": intent.currency,
"metadata": intent.metadata,
"charges": charges,
}
except stripe.StripeError as e:
raise ValueError(f"Stripe error: {str(e)}")
@staticmethod
async def confirm_payment(
payment_intent_id: str,
db: Session,
booking_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Confirm a payment and update database records
Args:
payment_intent_id: Stripe payment intent ID
db: Database session
booking_id: Optional booking ID for metadata lookup
Returns:
Payment record dictionary
"""
try:
intent_data = StripeService.retrieve_payment_intent(payment_intent_id, db)
# Find or get booking_id from metadata
if not booking_id and intent_data.get("metadata"):
booking_id = intent_data["metadata"].get("booking_id")
if booking_id:
booking_id = int(booking_id)
if not booking_id:
raise ValueError("Booking ID is required")
booking = db.query(Booking).filter(Booking.id == booking_id).first()
if not booking:
raise ValueError("Booking not found")
# Check payment intent status
payment_status = intent_data.get("status")
print(f"Payment intent status: {payment_status}")
# Accept succeeded or processing status (processing means payment is being processed)
if payment_status not in ["succeeded", "processing"]:
raise ValueError(f"Payment intent not in a valid state. Status: {payment_status}. Payment may still be processing or may have failed.")
# Find existing payment or create new one
payment = db.query(Payment).filter(
Payment.booking_id == booking_id,
Payment.transaction_id == payment_intent_id,
Payment.payment_method == PaymentMethod.stripe
).first()
# If not found, try to find pending deposit payment (for cash bookings with deposit)
# This allows updating the payment_method from the default to stripe
if not payment:
payment = db.query(Payment).filter(
Payment.booking_id == booking_id,
Payment.payment_type == PaymentType.deposit,
Payment.payment_status == PaymentStatus.pending
).order_by(Payment.created_at.desc()).first()
amount = intent_data["amount"]
if payment:
# Update existing payment
# Only mark as completed if payment intent succeeded
if payment_status == "succeeded":
payment.payment_status = PaymentStatus.completed
payment.payment_date = datetime.utcnow()
# If processing, keep as pending (will be updated by webhook)
payment.amount = amount
payment.payment_method = PaymentMethod.stripe # Update payment method to Stripe
else:
# Create new payment record
payment_type = PaymentType.full
if booking.requires_deposit and not booking.deposit_paid:
payment_type = PaymentType.deposit
# Only mark as completed if payment intent succeeded
payment_status_enum = PaymentStatus.completed if payment_status == "succeeded" else PaymentStatus.pending
payment_date = datetime.utcnow() if payment_status == "succeeded" else None
payment = Payment(
booking_id=booking_id,
amount=amount,
payment_method=PaymentMethod.stripe,
payment_type=payment_type,
payment_status=payment_status_enum,
transaction_id=payment_intent_id,
payment_date=payment_date,
notes=f"Stripe payment - Intent: {payment_intent_id} (Status: {payment_status})",
)
db.add(payment)
# Commit payment first to ensure it's saved
db.commit()
db.refresh(payment)
# Update booking status only if payment is completed
if payment.payment_status == PaymentStatus.completed:
# Refresh booking to get updated payments relationship
db.refresh(booking)
# Calculate total paid from all completed payments (now includes current payment)
# This needs to be calculated before the if/elif blocks
total_paid = sum(
float(p.amount) for p in booking.payments
if p.payment_status == PaymentStatus.completed
)
# Update invoice status based on payment
from ..models.invoice import Invoice, InvoiceStatus
from ..services.invoice_service import InvoiceService
# Find invoices for this booking and update their status
invoices = db.query(Invoice).filter(Invoice.booking_id == booking_id).all()
for invoice in invoices:
# Update invoice amount_paid and balance_due
invoice.amount_paid = total_paid
invoice.balance_due = float(invoice.total_amount) - total_paid
# Update invoice status
if invoice.balance_due <= 0:
invoice.status = InvoiceStatus.paid
invoice.paid_date = datetime.utcnow()
elif invoice.amount_paid > 0:
invoice.status = InvoiceStatus.sent
booking_was_confirmed = False
should_send_email = False
if payment.payment_type == PaymentType.deposit:
# Mark deposit as paid and confirm booking
booking.deposit_paid = True
# Restore cancelled bookings or confirm pending bookings
if booking.status in [BookingStatus.pending, BookingStatus.cancelled]:
booking.status = BookingStatus.confirmed
booking_was_confirmed = True
should_send_email = True
elif booking.status == BookingStatus.confirmed:
# Booking already confirmed, but deposit was just paid
should_send_email = True
elif payment.payment_type == PaymentType.full:
# Confirm booking if:
# 1. Total paid (all payments) covers the booking price, OR
# 2. This single payment covers the entire booking amount
# Also restore cancelled bookings when payment succeeds
if total_paid >= float(booking.total_price) or float(payment.amount) >= float(booking.total_price):
if booking.status in [BookingStatus.pending, BookingStatus.cancelled]:
booking.status = BookingStatus.confirmed
booking_was_confirmed = True
should_send_email = True
elif booking.status == BookingStatus.confirmed:
# Booking already confirmed, but full payment was just completed
should_send_email = True
# Send booking confirmation email if booking was just confirmed or payment completed
if should_send_email:
try:
from ..utils.mailer import send_email
from ..utils.email_templates import booking_confirmation_email_template
from ..models.system_settings import SystemSettings
from ..models.room import Room
from sqlalchemy.orm import selectinload
import os
from ..config.settings import settings
# Get client URL from settings
client_url_setting = db.query(SystemSettings).filter(SystemSettings.key == "client_url").first()
client_url = client_url_setting.value if client_url_setting and client_url_setting.value else (settings.CLIENT_URL or os.getenv("CLIENT_URL", "http://localhost:5173"))
# Get platform currency for email
currency_setting = db.query(SystemSettings).filter(SystemSettings.key == "platform_currency").first()
currency = currency_setting.value if currency_setting and currency_setting.value else "USD"
# Get currency symbol
currency_symbols = {
"USD": "$", "EUR": "", "GBP": "£", "JPY": "¥", "CNY": "¥",
"KRW": "", "SGD": "S$", "THB": "฿", "AUD": "A$", "CAD": "C$",
"VND": "", "INR": "", "CHF": "CHF", "NZD": "NZ$"
}
currency_symbol = currency_symbols.get(currency, currency)
# Load booking with room details for email
booking_with_room = db.query(Booking).options(
selectinload(Booking.room).selectinload(Room.room_type)
).filter(Booking.id == booking_id).first()
room = booking_with_room.room if booking_with_room else None
room_type_name = room.room_type.name if room and room.room_type else "Room"
# Calculate amount paid and remaining due
amount_paid = total_paid
payment_type_str = payment.payment_type.value if payment.payment_type else None
email_html = booking_confirmation_email_template(
booking_number=booking.booking_number,
guest_name=booking.user.full_name if booking.user else "Guest",
room_number=room.room_number if room else "N/A",
room_type=room_type_name,
check_in=booking.check_in_date.strftime("%B %d, %Y") if booking.check_in_date else "N/A",
check_out=booking.check_out_date.strftime("%B %d, %Y") if booking.check_out_date else "N/A",
num_guests=booking.num_guests,
total_price=float(booking.total_price),
requires_deposit=False, # Payment completed, no deposit message needed
deposit_amount=None,
amount_paid=amount_paid,
payment_type=payment_type_str,
client_url=client_url,
currency_symbol=currency_symbol
)
if booking.user:
await send_email(
to=booking.user.email,
subject=f"Booking Confirmed - {booking.booking_number}",
html=email_html
)
logger.info(f"Booking confirmation email sent to {booking.user.email}")
except Exception as email_error:
logger.error(f"Failed to send booking confirmation email: {str(email_error)}")
# Send invoice email if payment is completed and invoice is now paid
from ..utils.mailer import send_email
from ..services.invoice_service import InvoiceService
# Load user for email
from ..models.user import User
user = db.query(User).filter(User.id == booking.user_id).first()
for invoice in invoices:
if invoice.status == InvoiceStatus.paid and invoice.balance_due <= 0:
try:
invoice_dict = InvoiceService.invoice_to_dict(invoice)
invoice_html = _generate_invoice_email_html(invoice_dict, is_proforma=invoice.is_proforma)
invoice_type = "Proforma Invoice" if invoice.is_proforma else "Invoice"
if user:
await send_email(
to=user.email,
subject=f"{invoice_type} {invoice.invoice_number} - Payment Confirmed",
html=invoice_html
)
logger.info(f"{invoice_type} {invoice.invoice_number} sent to {user.email}")
except Exception as email_error:
logger.error(f"Failed to send invoice email: {str(email_error)}")
# Commit booking and invoice status updates
db.commit()
db.refresh(booking)
# Safely get enum values
def get_enum_value(enum_obj):
"""Safely extract value from enum or return as-is"""
if enum_obj is None:
return None
if isinstance(enum_obj, (PaymentMethod, PaymentType, PaymentStatus)):
return enum_obj.value
return enum_obj
try:
return {
"id": payment.id,
"booking_id": payment.booking_id,
"amount": float(payment.amount) if payment.amount else 0.0,
"payment_method": get_enum_value(payment.payment_method),
"payment_type": get_enum_value(payment.payment_type),
"payment_status": get_enum_value(payment.payment_status),
"transaction_id": payment.transaction_id,
"payment_date": payment.payment_date.isoformat() if payment.payment_date else None,
}
except AttributeError as ae:
print(f"AttributeError accessing payment fields: {ae}")
print(f"Payment object: {payment}")
print(f"Payment payment_method: {payment.payment_method if hasattr(payment, 'payment_method') else 'missing'}")
print(f"Payment payment_type: {payment.payment_type if hasattr(payment, 'payment_type') else 'missing'}")
print(f"Payment payment_status: {payment.payment_status if hasattr(payment, 'payment_status') else 'missing'}")
raise
except ValueError as e:
# Re-raise ValueError as-is (these are expected errors)
db.rollback()
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
error_msg = str(e) if str(e) else f"{type(e).__name__}: {repr(e)}"
print(f"Error in confirm_payment: {error_msg}")
print(f"Traceback: {error_details}")
db.rollback()
raise ValueError(f"Error confirming payment: {error_msg}")
@staticmethod
async def handle_webhook(
payload: bytes,
signature: str,
db: Session
) -> Dict[str, Any]:
"""
Handle Stripe webhook events
Args:
payload: Raw webhook payload
signature: Stripe signature header
db: Database session
Returns:
Webhook event data
"""
webhook_secret = get_stripe_webhook_secret(db)
if not webhook_secret:
webhook_secret = settings.STRIPE_WEBHOOK_SECRET
if not webhook_secret:
raise ValueError("Stripe webhook secret is not configured. Please configure it in Admin Panel (Settings > Stripe Settings) or set STRIPE_WEBHOOK_SECRET environment variable.")
try:
event = stripe.Webhook.construct_event(
payload, signature, webhook_secret
)
except ValueError as e:
raise ValueError(f"Invalid payload: {str(e)}")
except stripe.SignatureVerificationError as e:
raise ValueError(f"Invalid signature: {str(e)}")
# Handle the event
if event["type"] == "payment_intent.succeeded":
payment_intent = event["data"]["object"]
payment_intent_id = payment_intent["id"]
metadata = payment_intent.get("metadata", {})
booking_id = metadata.get("booking_id")
if booking_id:
try:
await StripeService.confirm_payment(
payment_intent_id=payment_intent_id,
db=db,
booking_id=int(booking_id)
)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error processing webhook for booking {booking_id}: {str(e)}")
elif event["type"] == "payment_intent.payment_failed":
payment_intent = event["data"]["object"]
payment_intent_id = payment_intent["id"]
metadata = payment_intent.get("metadata", {})
booking_id = metadata.get("booking_id")
if booking_id:
# Update payment status to failed
payment = db.query(Payment).filter(
Payment.transaction_id == payment_intent_id,
Payment.booking_id == int(booking_id)
).first()
if payment:
payment.payment_status = PaymentStatus.failed
db.commit()
# Auto-cancel booking when payment fails
booking = db.query(Booking).filter(Booking.id == int(booking_id)).first()
if booking and booking.status != BookingStatus.cancelled:
booking.status = BookingStatus.cancelled
db.commit()
db.refresh(booking)
# Send cancellation email (non-blocking)
try:
if booking.user:
from ..utils.mailer import send_email
from ..utils.email_templates import booking_status_changed_email_template
from ..models.system_settings import SystemSettings
from ..config.settings import settings
import os
# Get client URL from settings
client_url_setting = db.query(SystemSettings).filter(SystemSettings.key == "client_url").first()
client_url = client_url_setting.value if client_url_setting and client_url_setting.value else (settings.CLIENT_URL or os.getenv("CLIENT_URL", "http://localhost:5173"))
email_html = booking_status_changed_email_template(
booking_number=booking.booking_number,
guest_name=booking.user.full_name if booking.user else "Guest",
status="cancelled",
client_url=client_url
)
await send_email(
to=booking.user.email,
subject=f"Booking Cancelled - {booking.booking_number}",
html=email_html
)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to send cancellation email: {e}")
return {
"status": "success",
"event_type": event["type"],
"event_id": event["id"],
}