This commit is contained in:
Iliyan Angelov
2025-12-10 01:41:57 +02:00
parent 9de9d9701e
commit ab42d86127
32 changed files with 3235 additions and 761 deletions

View File

@@ -0,0 +1,86 @@
"""add_shift_update_to_notification_type
Revision ID: add_shift_update_001
Revises: add_staff_shifts_001
Create Date: 2025-12-09 20:55:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'add_shift_update_001'
down_revision = 'add_staff_shifts_001'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add 'shift_update' to notification_type enum in notifications table
op.execute("""
ALTER TABLE notifications
MODIFY COLUMN notification_type ENUM(
'booking_confirmation',
'payment_receipt',
'pre_arrival_reminder',
'check_in_reminder',
'check_out_reminder',
'marketing_campaign',
'loyalty_update',
'shift_update',
'system_alert',
'custom'
) NOT NULL
""")
# Also update notification_templates table if it exists
op.execute("""
ALTER TABLE notification_templates
MODIFY COLUMN notification_type ENUM(
'booking_confirmation',
'payment_receipt',
'pre_arrival_reminder',
'check_in_reminder',
'check_out_reminder',
'marketing_campaign',
'loyalty_update',
'shift_update',
'system_alert',
'custom'
) NOT NULL
""")
def downgrade() -> None:
# Remove 'shift_update' from notification_type enum
op.execute("""
ALTER TABLE notifications
MODIFY COLUMN notification_type ENUM(
'booking_confirmation',
'payment_receipt',
'pre_arrival_reminder',
'check_in_reminder',
'check_out_reminder',
'marketing_campaign',
'loyalty_update',
'system_alert',
'custom'
) NOT NULL
""")
op.execute("""
ALTER TABLE notification_templates
MODIFY COLUMN notification_type ENUM(
'booking_confirmation',
'payment_receipt',
'pre_arrival_reminder',
'check_in_reminder',
'check_out_reminder',
'marketing_campaign',
'loyalty_update',
'system_alert',
'custom'
) NOT NULL
""")

View File

@@ -0,0 +1,95 @@
"""
Admin session tracking model for security monitoring.
Separate from accountant sessions to maintain clear separation of concerns.
"""
from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey, JSON, Index, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from ...shared.config.database import Base
class AdminSession(Base):
"""Track admin sessions for security monitoring."""
__tablename__ = 'admin_sessions'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# User reference
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, index=True)
# Session details
session_token = Column(String(255), unique=True, nullable=False, index=True)
ip_address = Column(String(45), nullable=True) # IPv6 compatible
user_agent = Column(Text, nullable=True)
device_fingerprint = Column(String(255), nullable=True)
# Location (if available)
country = Column(String(2), nullable=True) # ISO country code
city = Column(String(100), nullable=True)
# Session status
is_active = Column(Boolean, default=True, nullable=False, index=True)
last_activity = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
# Step-up authentication
step_up_authenticated = Column(Boolean, default=False, nullable=False)
step_up_expires_at = Column(DateTime, nullable=True)
# Session metadata
session_metadata = Column(JSON, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
expires_at = Column(DateTime, nullable=False, index=True)
# Relationships
user = relationship('User', foreign_keys=[user_id])
# Indexes
__table_args__ = (
Index('idx_admin_session_user_active', 'user_id', 'is_active', 'last_activity'),
Index('idx_admin_session_expires', 'expires_at', 'is_active'),
)
class AdminActivityLog(Base):
"""Log admin activity for security monitoring and audit trails."""
__tablename__ = 'admin_activity_logs'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# User reference
user_id = Column(Integer, ForeignKey('users.id'), nullable=False, index=True)
session_id = Column(Integer, ForeignKey('admin_sessions.id'), nullable=True, index=True)
# Activity details
activity_type = Column(String(50), nullable=False, index=True) # 'login', 'user_management', 'system_config', etc.
activity_description = Column(Text, nullable=False)
# Location and device
ip_address = Column(String(45), nullable=True)
user_agent = Column(Text, nullable=True)
country = Column(String(2), nullable=True)
city = Column(String(100), nullable=True)
# Risk indicators
risk_level = Column(String(20), default='low', nullable=False) # 'low', 'medium', 'high', 'critical'
is_unusual = Column(Boolean, default=False, nullable=False, index=True)
# Additional context
activity_metadata = Column(JSON, nullable=True)
# Timestamp
created_at = Column(DateTime, default=datetime.utcnow, nullable=False, index=True)
# Relationships
user = relationship('User', foreign_keys=[user_id])
session = relationship('AdminSession', foreign_keys=[session_id])
# Indexes
__table_args__ = (
Index('idx_admin_activity_user_date', 'user_id', 'created_at'),
Index('idx_admin_activity_unusual', 'is_unusual', 'risk_level', 'created_at'),
Index('idx_admin_activity_type', 'activity_type', 'created_at'),
)

View File

@@ -0,0 +1,259 @@
"""
Routes for admin security: step-up auth, session management, activity logs.
Separate from accountant security routes to maintain clear separation of concerns.
"""
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.orm import Session
from typing import Optional
from datetime import datetime
from ...shared.config.database import get_db
from ...shared.config.logging_config import get_logger
from ...security.middleware.auth import authorize_roles, get_current_user
from ..models.user import User
from ..services.admin_security_service import admin_security_service
from ...shared.utils.response_helpers import success_response
from ...auth.services.mfa_service import mfa_service
from ...shared.utils.role_helpers import is_admin
logger = get_logger(__name__)
router = APIRouter(prefix='/admin/security', tags=['admin-security'])
@router.post('/step-up/verify')
async def verify_step_up(
request: Request,
step_up_data: dict,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Verify step-up authentication for admins (MFA token or password re-entry)."""
if not is_admin(current_user, db):
raise HTTPException(status_code=403, detail='Forbidden: Admin access required')
try:
from ..models.admin_session import AdminSession
mfa_token = step_up_data.get('mfa_token')
password = step_up_data.get('password')
session_token = step_up_data.get('session_token')
if not session_token:
# Try to get from header or cookie
session_token = request.headers.get('X-Session-Token') or request.cookies.get('admin_session_token')
# If still no session token, try to find the most recent active session for this user
# If none exists, create a fresh session so password-based step-up can proceed
if not session_token:
active_session = db.query(AdminSession).filter(
AdminSession.user_id == current_user.id,
AdminSession.is_active == True,
AdminSession.expires_at > datetime.utcnow()
).order_by(AdminSession.last_activity.desc()).first()
if active_session:
session_token = active_session.session_token
else:
new_session = admin_security_service.create_session(
db=db,
user_id=current_user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get('User-Agent')
)
session_token = new_session.session_token
# Verify MFA if token provided
if mfa_token:
try:
is_valid = mfa_service.verify_mfa(db, current_user.id, mfa_token)
if not is_valid:
raise HTTPException(status_code=401, detail='Invalid MFA token')
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Or verify password if provided
elif password:
import bcrypt
if not bcrypt.checkpw(password.encode('utf-8'), current_user.password.encode('utf-8')):
raise HTTPException(status_code=401, detail='Invalid password')
else:
raise HTTPException(status_code=400, detail='Either mfa_token or password is required')
# Complete step-up authentication
success = admin_security_service.complete_step_up(
db=db,
session_token=session_token,
user_id=current_user.id
)
if not success:
raise HTTPException(status_code=400, detail='Failed to complete step-up authentication')
# Log step-up activity
client_ip = request.client.host if request.client else None
user_agent = request.headers.get('User-Agent')
admin_security_service.log_activity(
db=db,
user_id=current_user.id,
activity_type='step_up_authentication',
activity_description='Admin step-up authentication completed',
ip_address=client_ip,
user_agent=user_agent,
risk_level='low',
metadata={'method': 'mfa' if mfa_token else 'password'}
)
db.commit()
return success_response(
data={'step_up_completed': True},
message='Step-up authentication completed successfully'
)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f'Error verifying admin step-up: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get('/sessions')
async def get_active_sessions(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Get active admin sessions for current user."""
try:
from ..models.admin_session import AdminSession
sessions = db.query(AdminSession).filter(
AdminSession.user_id == current_user.id,
AdminSession.is_active == True
).order_by(AdminSession.last_activity.desc()).all()
session_list = []
for session in sessions:
session_list.append({
'id': session.id,
'ip_address': session.ip_address,
'user_agent': session.user_agent,
'country': session.country,
'city': session.city,
'last_activity': session.last_activity.isoformat() if session.last_activity else None,
'step_up_authenticated': session.step_up_authenticated,
'step_up_expires_at': session.step_up_expires_at.isoformat() if session.step_up_expires_at else None,
'created_at': session.created_at.isoformat() if session.created_at else None,
'expires_at': session.expires_at.isoformat() if session.expires_at else None
})
return success_response(data={'sessions': session_list})
except Exception as e:
logger.error(f'Error fetching admin sessions: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post('/sessions/{session_id}/revoke')
async def revoke_session(
session_id: int,
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Revoke a specific admin session."""
try:
from ..models.admin_session import AdminSession
session = db.query(AdminSession).filter(
AdminSession.id == session_id,
AdminSession.user_id == current_user.id
).first()
if not session:
raise HTTPException(status_code=404, detail='Session not found')
session.is_active = False
db.commit()
return success_response(message='Session revoked successfully')
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f'Error revoking admin session: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.post('/sessions/revoke-all')
async def revoke_all_sessions(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Revoke all active admin sessions for current user."""
try:
count = admin_security_service.revoke_all_user_sessions(db, current_user.id)
db.commit()
return success_response(
data={'revoked_count': count},
message=f'Successfully revoked {count} active session(s)'
)
except Exception as e:
db.rollback()
logger.error(f'Error revoking all admin sessions: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get('/activity-logs')
async def get_activity_logs(
page: int = Query(1, ge=1),
limit: int = Query(50, ge=1, le=100),
risk_level: Optional[str] = Query(None),
is_unusual: Optional[bool] = Query(None),
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Get activity logs for current admin user."""
try:
from ..models.admin_session import AdminActivityLog
query = db.query(AdminActivityLog).filter(
AdminActivityLog.user_id == current_user.id
)
if risk_level:
query = query.filter(AdminActivityLog.risk_level == risk_level)
if is_unusual is not None:
query = query.filter(AdminActivityLog.is_unusual == is_unusual)
total = query.count()
offset = (page - 1) * limit
logs = query.order_by(AdminActivityLog.created_at.desc()).offset(offset).limit(limit).all()
log_list = []
for log in logs:
log_list.append({
'id': log.id,
'activity_type': log.activity_type,
'activity_description': log.activity_description,
'ip_address': log.ip_address,
'user_agent': log.user_agent,
'country': log.country,
'city': log.city,
'risk_level': log.risk_level,
'is_unusual': log.is_unusual,
'activity_metadata': log.activity_metadata,
'created_at': log.created_at.isoformat() if log.created_at else None
})
return success_response(data={
'logs': log_list,
'pagination': {
'total': total,
'page': page,
'limit': limit,
'total_pages': (total + limit - 1) // limit
}
})
except Exception as e:
logger.error(f'Error fetching admin activity logs: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -41,98 +41,6 @@ def get_limiter(request: Request) -> Limiter:
return limiter
@router.post('/admin/step-up/verify')
async def verify_admin_step_up(
request: Request,
step_up_data: dict,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Step-up verification for admins: accept password or MFA token.
Uses the accountant security session store but bypasses accountant role checks.
"""
if not is_admin(current_user, db):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Forbidden')
try:
from ...payments.models.accountant_session import AccountantSession
mfa_token = step_up_data.get('mfa_token')
password = step_up_data.get('password')
session_token = step_up_data.get('session_token')
if not session_token:
session_token = request.headers.get('X-Session-Token') or request.cookies.get('session_token')
if not session_token:
active_session = db.query(AccountantSession).filter(
AccountantSession.user_id == current_user.id,
AccountantSession.is_active == True,
AccountantSession.expires_at > datetime.utcnow()
).order_by(AccountantSession.last_activity.desc()).first()
if active_session:
session_token = active_session.session_token
else:
new_session = accountant_security_service.create_session(
db=db,
user_id=current_user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get('User-Agent')
)
session_token = new_session.session_token
if mfa_token:
try:
is_valid = mfa_service.verify_mfa(db, current_user.id, mfa_token)
if not is_valid:
raise HTTPException(status_code=401, detail='Invalid MFA token')
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
elif password:
import bcrypt
if not bcrypt.checkpw(password.encode('utf-8'), current_user.password.encode('utf-8')):
raise HTTPException(status_code=401, detail='Invalid password')
else:
raise HTTPException(status_code=400, detail='Either mfa_token or password is required')
success = accountant_security_service.complete_step_up(
db=db,
session_token=session_token,
user_id=current_user.id
)
if not success:
raise HTTPException(status_code=400, detail='Failed to complete step-up authentication')
client_ip = request.client.host if request.client else None
user_agent = request.headers.get('User-Agent')
accountant_security_service.log_activity(
db=db,
user_id=current_user.id,
activity_type='admin_step_up_authentication',
activity_description='Admin step-up authentication completed',
ip_address=client_ip,
user_agent=user_agent,
risk_level='low',
metadata={'method': 'mfa' if mfa_token else 'password'}
)
db.commit()
return JSONResponse(
status_code=status.HTTP_200_OK,
content={'status': 'success', 'data': {'step_up_completed': True}, 'message': 'Step-up authentication completed successfully'}
)
except HTTPException:
raise
except Exception as e:
db.rollback()
logger.error(f'Error verifying admin step-up: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
def get_base_url(request: Request) -> str:
return os.getenv('SERVER_URL') or f'http://{request.headers.get('host', 'localhost:8000')}'
@@ -352,14 +260,17 @@ async def login(
)
return {'status': 'success', 'requires_mfa': True, 'user_id': result['user_id']}
# After successful login (MFA passed if required), check MFA for accountant role
# After successful login (MFA passed if required), handle role-specific security
if not requires_mfa_setup:
user = db.query(User).filter(User.id == result['user']['id']).first()
if user:
try:
from ...payments.services.accountant_security_service import accountant_security_service
from ...shared.utils.role_helpers import is_accountant
from ...auth.services.admin_security_service import admin_security_service
from ...shared.utils.role_helpers import is_accountant, is_admin
from ...shared.config.settings import settings
# Handle accountant role
if is_accountant(user, db):
# Check if MFA is required but not enabled
is_enforced, reason = accountant_security_service.is_mfa_enforced(user, db)
@@ -378,23 +289,20 @@ async def login(
status='success'
)
logger.info(f'User {user.id} logged in but MFA setup required: {reason}')
# Always create an accountant security session so step-up auth
# works even if MFA is not yet enabled (password re-auth fallback).
# Always create an accountant security session so step-up auth works
try:
accountant_session = accountant_security_service.create_session(
db=db,
user_id=user.id,
ip_address=client_ip,
user_agent=user_agent,
device_fingerprint=None # Can be enhanced with device fingerprinting
device_fingerprint=None
)
# Commit the session to database so it's available for step-up auth
db.commit()
# Store session_token in cookie for step-up authentication
from ...shared.config.settings import settings
session_max_age = 4 * 60 * 60 # 4 hours (matches ACCOUNTANT_SESSION_TIMEOUT_HOURS)
session_max_age = 4 * 60 * 60 # 4 hours
samesite_value = 'strict' if settings.is_production else 'lax'
response.set_cookie(
key='session_token',
@@ -417,7 +325,7 @@ async def login(
db=db,
user_id=user.id,
activity_type='login',
activity_description='Accountant/admin login successful',
activity_description='Accountant login successful',
ip_address=client_ip,
user_agent=user_agent,
risk_level='low',
@@ -426,8 +334,55 @@ async def login(
except Exception as e:
db.rollback()
logger.warning(f'Error creating accountant session: {e}')
# Handle admin role (separate from accountant)
elif is_admin(user, db):
try:
admin_session = admin_security_service.create_session(
db=db,
user_id=user.id,
ip_address=client_ip,
user_agent=user_agent,
device_fingerprint=None
)
db.commit()
# Store admin session_token in cookie for step-up authentication
session_max_age = 8 * 60 * 60 # 8 hours
samesite_value = 'strict' if settings.is_production else 'lax'
response.set_cookie(
key='admin_session_token',
value=admin_session.session_token,
httponly=True,
secure=settings.is_production,
samesite=samesite_value,
max_age=session_max_age,
path='/'
)
# Log login activity
is_unusual = admin_security_service.detect_unusual_activity(
db=db,
user_id=user.id,
ip_address=client_ip
)
admin_security_service.log_activity(
db=db,
user_id=user.id,
activity_type='login',
activity_description='Admin login successful',
ip_address=client_ip,
user_agent=user_agent,
risk_level='low',
is_unusual=is_unusual
)
except Exception as e:
db.rollback()
logger.warning(f'Error creating admin session: {e}')
except Exception as e:
logger.warning(f'Error enforcing MFA for accountant: {e}')
logger.warning(f'Error handling role-specific security: {e}')
from ...shared.config.settings import settings
max_age = 7 * 24 * 60 * 60 if login_request.rememberMe else 1 * 24 * 60 * 60

View File

@@ -0,0 +1,296 @@
"""
Service for admin-specific security controls: step-up auth, session management, activity logs.
Separate from accountant security to maintain clear separation of concerns.
"""
from sqlalchemy.orm import Session
from typing import Optional, Dict, Any, Tuple
from datetime import datetime, timedelta
from ...auth.models.user import User
from ..models.admin_session import AdminSession, AdminActivityLog
from ...shared.utils.role_helpers import is_admin
from ...shared.config.logging_config import get_logger
import secrets
logger = get_logger(__name__)
class AdminSecurityService:
"""Service for admin security controls."""
# Session timeout for admins (shorter than default user sessions)
ADMIN_SESSION_TIMEOUT_HOURS = 8 # 8 hours for admin sessions
ADMIN_IDLE_TIMEOUT_MINUTES = 60 # 60 minutes idle timeout
# Step-up authentication validity
STEP_UP_VALIDITY_MINUTES = 15 # Step-up auth valid for 15 minutes
@staticmethod
def requires_mfa(user: User, db: Session) -> bool:
"""Check if admin user requires MFA (optional for admins, not enforced)."""
# MFA is optional for admins, but recommended
return False # Not enforced, but can be enabled
@staticmethod
def is_mfa_enforced(user: User, db: Session) -> Tuple[bool, Optional[str]]:
"""
Check if MFA is enforced for admin user.
Returns (is_enforced: bool, reason: str | None)
Note: MFA is optional for admins, not enforced by default.
"""
# Admins can use MFA if enabled, but it's not required
return False, None
@staticmethod
def create_session(
db: Session,
user_id: int,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
device_fingerprint: Optional[str] = None,
country: Optional[str] = None,
city: Optional[str] = None
) -> AdminSession:
"""Create a new admin session."""
# Generate session token
session_token = secrets.token_urlsafe(32)
# Calculate expiration
expires_at = datetime.utcnow() + timedelta(hours=AdminSecurityService.ADMIN_SESSION_TIMEOUT_HOURS)
session = AdminSession(
user_id=user_id,
session_token=session_token,
ip_address=ip_address,
user_agent=user_agent,
device_fingerprint=device_fingerprint,
country=country,
city=city,
is_active=True,
last_activity=datetime.utcnow(),
step_up_authenticated=False,
expires_at=expires_at
)
db.add(session)
db.flush()
return session
@staticmethod
def validate_session(
db: Session,
session_token: str,
update_activity: bool = True
) -> Optional[AdminSession]:
"""Validate and update session activity."""
session = db.query(AdminSession).filter(
AdminSession.session_token == session_token,
AdminSession.is_active == True
).first()
if not session:
return None
# Check if session expired
if session.expires_at < datetime.utcnow():
session.is_active = False
db.flush()
return None
# Check idle timeout
idle_timeout = datetime.utcnow() - timedelta(minutes=AdminSecurityService.ADMIN_IDLE_TIMEOUT_MINUTES)
if session.last_activity < idle_timeout:
session.is_active = False
db.flush()
return None
# Update last activity
if update_activity:
session.last_activity = datetime.utcnow()
db.flush()
return session
@staticmethod
def require_step_up(
db: Session,
user_id: int,
session_token: Optional[str] = None,
action_description: str = "high-risk action"
) -> Tuple[bool, Optional[str]]:
"""
Check if step-up authentication is required for admin action.
Returns (requires_step_up: bool, reason: str | None)
"""
# If no session token provided, try to find the most recent active session for this user
if not session_token:
active_session = db.query(AdminSession).filter(
AdminSession.user_id == user_id,
AdminSession.is_active == True,
AdminSession.expires_at > datetime.utcnow()
).order_by(AdminSession.last_activity.desc()).first()
if active_session:
session_token = active_session.session_token
else:
return True, "Step-up authentication required for this action"
session = AdminSecurityService.validate_session(db, session_token, update_activity=False)
if not session:
return True, "Invalid or expired session"
if session.user_id != user_id:
return True, "Session user mismatch"
# Check if step-up is still valid
if session.step_up_authenticated and session.step_up_expires_at:
if session.step_up_expires_at > datetime.utcnow():
return False, None # Step-up still valid
else:
session.step_up_authenticated = False
db.flush()
return True, f"Step-up authentication required for {action_description}"
@staticmethod
def complete_step_up(
db: Session,
session_token: str,
user_id: int
) -> bool:
"""Mark step-up authentication as completed."""
session = db.query(AdminSession).filter(
AdminSession.session_token == session_token,
AdminSession.user_id == user_id,
AdminSession.is_active == True
).first()
if not session:
return False
session.step_up_authenticated = True
session.step_up_expires_at = datetime.utcnow() + timedelta(
minutes=AdminSecurityService.STEP_UP_VALIDITY_MINUTES
)
# Use flush to ensure changes are visible in the same transaction
db.flush()
return True
@staticmethod
def log_activity(
db: Session,
user_id: int,
activity_type: str,
activity_description: str,
session_id: Optional[int] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
country: Optional[str] = None,
city: Optional[str] = None,
risk_level: str = 'low',
is_unusual: bool = False,
metadata: Optional[Dict[str, Any]] = None
) -> AdminActivityLog:
"""Log admin activity for security monitoring."""
log = AdminActivityLog(
user_id=user_id,
session_id=session_id,
activity_type=activity_type,
activity_description=activity_description,
ip_address=ip_address,
user_agent=user_agent,
country=country,
city=city,
risk_level=risk_level,
is_unusual=is_unusual,
activity_metadata=metadata or {}
)
db.add(log)
db.flush()
# Alert on high-risk or unusual activity
if risk_level in ['high', 'critical'] or is_unusual:
logger.warning(
f"High-risk admin activity detected: {activity_type} by user {user_id}",
extra={
'user_id': user_id,
'activity_type': activity_type,
'risk_level': risk_level,
'is_unusual': is_unusual,
'ip_address': ip_address
}
)
return log
@staticmethod
def detect_unusual_activity(
db: Session,
user_id: int,
ip_address: Optional[str] = None,
country: Optional[str] = None
) -> bool:
"""Detect if current activity is unusual based on user history."""
# Get user's recent activity (last 30 days)
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
recent_activities = db.query(AdminActivityLog).filter(
AdminActivityLog.user_id == user_id,
AdminActivityLog.created_at >= thirty_days_ago
).all()
if not recent_activities:
# First activity - not unusual
return False
# Check for new IP address
if ip_address:
known_ips = set(act.ip_address for act in recent_activities if act.ip_address)
if ip_address not in known_ips and len(known_ips) > 0:
return True
# Check for new country
if country:
known_countries = set(act.country for act in recent_activities if act.country)
if country not in known_countries and len(known_countries) > 0:
return True
return False
@staticmethod
def revoke_session(
db: Session,
session_token: str
) -> bool:
"""Revoke an admin session."""
session = db.query(AdminSession).filter(
AdminSession.session_token == session_token
).first()
if not session:
return False
session.is_active = False
db.flush()
return True
@staticmethod
def revoke_all_user_sessions(
db: Session,
user_id: int
) -> int:
"""Revoke all active sessions for an admin user."""
count = db.query(AdminSession).filter(
AdminSession.user_id == user_id,
AdminSession.is_active == True
).update({'is_active': False})
db.flush()
return count
# Singleton instance
admin_security_service = AdminSecurityService()

View File

@@ -12,6 +12,8 @@ from ..models.staff_shift import (
StaffShift, StaffTask, ShiftType, ShiftStatus,
StaffTaskPriority, StaffTaskStatus
)
from ..services.shift_automation_service import shift_automation_service
from ..services.shift_scheduler import get_shift_scheduler
logger = get_logger(__name__)
router = APIRouter(prefix='/staff-shifts', tags=['staff-shifts'])
@@ -24,21 +26,27 @@ async def get_shifts(
status: Optional[str] = Query(None),
department: Optional[str] = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
limit: int = Query(20, ge=1),
current_user: User = Depends(authorize_roles('admin', 'staff')),
db: Session = Depends(get_db)
):
"""Get staff shifts with filtering"""
try:
# Check if user is admin or staff to set appropriate limit
role = db.query(Role).filter(Role.id == current_user.role_id).first()
is_admin = role and role.name == 'admin'
is_staff = role and role.name == 'staff'
# Admin can request more records for analytics, staff is limited to 100
max_limit = 1000 if is_admin else 100
if limit > max_limit:
limit = max_limit
query = db.query(StaffShift).options(
joinedload(StaffShift.staff),
joinedload(StaffShift.assigner)
)
# Check if user is staff (not admin) - staff should only see their own shifts
role = db.query(Role).filter(Role.id == current_user.role_id).first()
is_staff = role and role.name == 'staff'
if is_staff:
query = query.filter(StaffShift.staff_id == current_user.id)
elif staff_id:
@@ -117,13 +125,14 @@ async def create_shift(
if not staff_id:
staff_id = current_user.id
# Always create shifts as 'scheduled' - automation will handle status changes
shift = StaffShift(
staff_id=staff_id,
shift_date=datetime.fromisoformat(shift_data['shift_date'].replace('Z', '+00:00')),
shift_type=ShiftType(shift_data.get('shift_type', 'custom')),
start_time=time.fromisoformat(shift_data['start_time']) if isinstance(shift_data.get('start_time'), str) else shift_data.get('start_time'),
end_time=time.fromisoformat(shift_data['end_time']) if isinstance(shift_data.get('end_time'), str) else shift_data.get('end_time'),
status=ShiftStatus(shift_data.get('status', 'scheduled')),
status=ShiftStatus.scheduled, # Always start as scheduled - automation handles the rest
break_duration_minutes=shift_data.get('break_duration_minutes', 30),
department=shift_data.get('department'),
notes=shift_data.get('notes'),
@@ -167,7 +176,14 @@ async def update_shift(
if not is_admin and shift.staff_id != current_user.id:
raise HTTPException(status_code=403, detail='You can only update your own shifts')
# Update fields
# Update fields - status changes are handled automatically by scheduler
# Only allow editing of scheduled or cancelled shifts (not in_progress or completed)
if shift.status in [ShiftStatus.in_progress, ShiftStatus.completed]:
raise HTTPException(
status_code=400,
detail='Cannot edit shift that is in progress or completed. Only scheduled or cancelled shifts can be edited.'
)
if 'shift_date' in shift_data:
shift.shift_date = datetime.fromisoformat(shift_data['shift_date'].replace('Z', '+00:00'))
if 'shift_type' in shift_data:
@@ -176,12 +192,7 @@ async def update_shift(
shift.start_time = time.fromisoformat(shift_data['start_time']) if isinstance(shift_data['start_time'], str) else shift_data['start_time']
if 'end_time' in shift_data:
shift.end_time = time.fromisoformat(shift_data['end_time']) if isinstance(shift_data['end_time'], str) else shift_data['end_time']
if 'status' in shift_data:
shift.status = ShiftStatus(shift_data['status'])
if shift_data['status'] == 'in_progress' and not shift.actual_start_time:
shift.actual_start_time = datetime.utcnow()
elif shift_data['status'] == 'completed' and not shift.actual_end_time:
shift.actual_end_time = datetime.utcnow()
# Status changes are automatic - do not allow manual status updates via update endpoint
if 'break_duration_minutes' in shift_data:
shift.break_duration_minutes = shift_data['break_duration_minutes']
if 'department' in shift_data:
@@ -462,3 +473,180 @@ async def get_workload_summary(
logger.error(f'Error fetching workload: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=f'Failed to fetch workload: {str(e)}')
@router.delete('/{shift_id}')
async def delete_shift(
shift_id: int,
current_user: User = Depends(authorize_roles('admin', 'staff', 'housekeeping')),
db: Session = Depends(get_db)
):
"""Delete a shift - admin can delete any, staff/housekeeping can delete their own"""
try:
shift = db.query(StaffShift).filter(StaffShift.id == shift_id).first()
if not shift:
raise HTTPException(status_code=404, detail='Shift not found')
# Check permissions - admin can delete any, staff/housekeeping can delete their own
role = db.query(Role).filter(Role.id == current_user.role_id).first()
is_admin = role and role.name == 'admin'
is_staff = role and role.name in ['staff', 'housekeeping']
if not is_admin and (not is_staff or shift.staff_id != current_user.id):
raise HTTPException(
status_code=403,
detail='You can only delete your own shifts'
)
# Cannot delete completed shifts
if shift.status == ShiftStatus.completed:
raise HTTPException(
status_code=400,
detail='Cannot delete completed shifts. Completed shifts are archived for record keeping.'
)
# Log deletion in audit log
try:
from ...analytics.models.audit_log import AuditLog
audit_log = AuditLog(
user_id=current_user.id,
action="shift_deleted",
resource_type="staff_shift",
resource_id=shift_id,
details={
"shift_date": shift.shift_date.isoformat() if shift.shift_date else None,
"staff_id": shift.staff_id,
"status": shift.status.value,
"deleted_at": datetime.utcnow().isoformat()
},
status="success"
)
db.add(audit_log)
except Exception as e:
logger.warning(f"Failed to log shift deletion: {str(e)}")
db.delete(shift)
db.commit()
logger.info(f"Shift {shift_id} deleted by user {current_user.id}")
return {
'status': 'success',
'message': 'Shift deleted successfully'
}
except HTTPException:
raise
except Exception as e:
logger.error(f'Error deleting shift: {str(e)}', exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail=f'Failed to delete shift: {str(e)}')
# Removed manual start/complete endpoints - automation handles these automatically
@router.post('/{shift_id}/cancel')
async def cancel_shift(
shift_id: int,
cancel_data: dict = {},
current_user: User = Depends(authorize_roles('admin', 'staff', 'housekeeping')),
db: Session = Depends(get_db)
):
"""
Cancel a shift - only manual action allowed
Admin can cancel any shift, staff/housekeeping can cancel their own shifts
"""
try:
shift = db.query(StaffShift).filter(StaffShift.id == shift_id).first()
if not shift:
raise HTTPException(status_code=404, detail='Shift not found')
# Check permissions - admin can cancel any, staff/housekeeping can cancel their own
role = db.query(Role).filter(Role.id == current_user.role_id).first()
is_admin = role and role.name == 'admin'
is_staff = role and role.name in ['staff', 'housekeeping']
if not is_admin and (not is_staff or shift.staff_id != current_user.id):
raise HTTPException(
status_code=403,
detail='You can only cancel your own shifts'
)
# Only allow cancellation of scheduled or in_progress shifts
if shift.status not in [ShiftStatus.scheduled, ShiftStatus.in_progress]:
raise HTTPException(
status_code=400,
detail=f'Cannot cancel shift with status: {shift.status.value}. Only scheduled or in-progress shifts can be cancelled.'
)
reason = cancel_data.get('reason', 'MANUAL_CANCEL')
notes = cancel_data.get('notes')
success = shift_automation_service.manual_status_change(
db=db,
shift=shift,
new_status=ShiftStatus.cancelled,
user_id=current_user.id,
reason=reason,
notes=notes
)
if not success:
raise HTTPException(
status_code=400,
detail=f'Cannot cancel shift. Current status: {shift.status.value}'
)
db.refresh(shift)
return {
'status': 'success',
'message': 'Shift cancelled successfully',
'data': {
'shift_id': shift.id,
'status': shift.status.value,
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f'Error cancelling shift: {str(e)}', exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail=f'Failed to cancel shift: {str(e)}')
@router.post('/automation/trigger')
async def trigger_automation(
current_user: User = Depends(authorize_roles('admin')),
db: Session = Depends(get_db)
):
"""Manually trigger shift automation (admin only)"""
try:
stats = shift_automation_service.process_shift_automation(db)
return {
'status': 'success',
'message': 'Shift automation completed',
'data': stats
}
except Exception as e:
logger.error(f'Error triggering automation: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=f'Failed to trigger automation: {str(e)}')
@router.get('/automation/status')
async def get_automation_status(
current_user: User = Depends(authorize_roles('admin')),
):
"""Get shift automation scheduler status (admin only)"""
try:
scheduler = get_shift_scheduler()
status = scheduler.get_status()
return {
'status': 'success',
'data': status
}
except Exception as e:
logger.error(f'Error getting automation status: {str(e)}', exc_info=True)
raise HTTPException(status_code=500, detail=f'Failed to get automation status: {str(e)}')

View File

@@ -0,0 +1,603 @@
"""
Staff Shift Automation Service
Enterprise-grade automated shift status management with business logic
"""
from datetime import datetime, time, timedelta
from typing import List, Dict, Any, Optional, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func
from ...shared.config.logging_config import get_logger
from ..models.staff_shift import StaffShift, ShiftStatus, ShiftType
from ...analytics.models.audit_log import AuditLog
from ...notifications.services.notification_service import NotificationService
from ...notifications.models.notification import NotificationType, NotificationChannel
logger = get_logger(__name__)
class ShiftAutomationService:
"""Service for automated shift status management"""
# Configuration constants
NO_SHOW_THRESHOLD_MINUTES = 15 # Mark as no_show if 15 minutes past start time
AUTO_COMPLETE_BUFFER_MINUTES = 0 # Auto-complete at exact end time
AUTO_START_BUFFER_MINUTES = 0 # Auto-start at exact start time
@staticmethod
def get_shift_datetime(shift: StaffShift) -> Tuple[datetime, datetime]:
"""
Convert shift date and time to full datetime objects
Returns (start_datetime, end_datetime)
"""
shift_date = shift.shift_date.date() if isinstance(shift.shift_date, datetime) else shift.shift_date
# Combine date with time
start_datetime = datetime.combine(shift_date, shift.start_time)
end_datetime = datetime.combine(shift_date, shift.end_time)
# Handle overnight shifts (end time is next day)
if shift.end_time < shift.start_time:
end_datetime += timedelta(days=1)
return start_datetime, end_datetime
@staticmethod
def should_auto_start(shift: StaffShift, current_time: datetime) -> bool:
"""Check if shift should be automatically started - at exact start time"""
if shift.status != ShiftStatus.scheduled:
return False
start_datetime, _ = ShiftAutomationService.get_shift_datetime(shift)
# Auto-start at exact start time (or after)
return current_time >= start_datetime
@staticmethod
def should_auto_complete(shift: StaffShift, current_time: datetime) -> bool:
"""Check if shift should be automatically completed - at exact end time"""
if shift.status != ShiftStatus.in_progress:
return False
_, end_datetime = ShiftAutomationService.get_shift_datetime(shift)
# Auto-complete at exact end time (or after)
return current_time >= end_datetime
@staticmethod
def should_mark_no_show(shift: StaffShift, current_time: datetime) -> bool:
"""Check if shift should be marked as no_show"""
if shift.status != ShiftStatus.scheduled:
return False
start_datetime, _ = ShiftAutomationService.get_shift_datetime(shift)
# Mark as no_show if past start time + threshold and no actual start recorded
no_show_time = start_datetime + timedelta(minutes=ShiftAutomationService.NO_SHOW_THRESHOLD_MINUTES)
return current_time >= no_show_time and shift.actual_start_time is None
@staticmethod
def auto_start_shift(
db: Session,
shift: StaffShift,
current_time: Optional[datetime] = None,
triggered_by: Optional[int] = None
) -> bool:
"""
Automatically start a shift
Returns True if status was changed, False otherwise
"""
if shift.status != ShiftStatus.scheduled:
return False
current_time = current_time or datetime.utcnow()
try:
old_status = shift.status
shift.status = ShiftStatus.in_progress
shift.actual_start_time = current_time
shift.updated_at = current_time
db.commit()
# Log audit trail
ShiftAutomationService._log_status_change(
db=db,
shift_id=shift.id,
old_status=old_status,
new_status=ShiftStatus.in_progress,
user_id=triggered_by,
reason="AUTO_START",
details={"triggered_at": current_time.isoformat()}
)
# Send notification to staff (with fallback)
try:
try:
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.shift_update,
channel=NotificationChannel.in_app,
content=f"Your shift has automatically started at {current_time.strftime('%H:%M')}",
subject="Shift Started",
meta_data={
"shift_id": shift.id,
"status": "in_progress",
"auto_started": True
}
)
except Exception as enum_error:
# Fallback to system_alert if shift_update enum not in database yet
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.system_alert,
channel=NotificationChannel.in_app,
content=f"Your shift has automatically started at {current_time.strftime('%H:%M')}",
subject="Shift Started",
meta_data={
"shift_id": shift.id,
"status": "in_progress",
"auto_started": True
}
)
except Exception as e:
logger.warning(f"Failed to send notification for shift {shift.id}: {str(e)}")
try:
db.rollback()
except:
pass
logger.info(f"Shift {shift.id} automatically started at {current_time.isoformat()}")
return True
except Exception as e:
logger.error(f"Error auto-starting shift {shift.id}: {str(e)}", exc_info=True)
db.rollback()
return False
@staticmethod
def auto_complete_shift(
db: Session,
shift: StaffShift,
current_time: Optional[datetime] = None,
triggered_by: Optional[int] = None
) -> bool:
"""
Automatically complete a shift
Returns True if status was changed, False otherwise
"""
if shift.status != ShiftStatus.in_progress:
return False
current_time = current_time or datetime.utcnow()
try:
old_status = shift.status
shift.status = ShiftStatus.completed
shift.actual_end_time = current_time
shift.updated_at = current_time
# If no actual start time was recorded, set it to scheduled start
if not shift.actual_start_time:
start_datetime, _ = ShiftAutomationService.get_shift_datetime(shift)
shift.actual_start_time = start_datetime
db.commit()
# Log audit trail
ShiftAutomationService._log_status_change(
db=db,
shift_id=shift.id,
old_status=old_status,
new_status=ShiftStatus.completed,
user_id=triggered_by,
reason="AUTO_COMPLETE",
details={
"triggered_at": current_time.isoformat(),
"auto_completed": True
}
)
# Send notification to staff (with fallback)
try:
try:
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.shift_update,
channel=NotificationChannel.in_app,
content=f"Your shift has been automatically completed at {current_time.strftime('%H:%M')}",
subject="Shift Completed",
meta_data={
"shift_id": shift.id,
"status": "completed",
"auto_completed": True
}
)
except Exception as enum_error:
# Fallback to system_alert if shift_update enum not in database yet
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.system_alert,
channel=NotificationChannel.in_app,
content=f"Your shift has been automatically completed at {current_time.strftime('%H:%M')}",
subject="Shift Completed",
meta_data={
"shift_id": shift.id,
"status": "completed",
"auto_completed": True
}
)
except Exception as e:
logger.warning(f"Failed to send notification for shift {shift.id}: {str(e)}")
try:
db.rollback()
except:
pass
logger.info(f"Shift {shift.id} automatically completed at {current_time.isoformat()}")
return True
except Exception as e:
logger.error(f"Error auto-completing shift {shift.id}: {str(e)}", exc_info=True)
db.rollback()
return False
@staticmethod
def mark_no_show(
db: Session,
shift: StaffShift,
current_time: Optional[datetime] = None,
triggered_by: Optional[int] = None
) -> bool:
"""
Mark a shift as no_show
Returns True if status was changed, False otherwise
"""
if shift.status != ShiftStatus.scheduled:
return False
current_time = current_time or datetime.utcnow()
try:
old_status = shift.status
shift.status = ShiftStatus.no_show
shift.updated_at = current_time
db.commit()
# Log audit trail
ShiftAutomationService._log_status_change(
db=db,
shift_id=shift.id,
old_status=old_status,
new_status=ShiftStatus.no_show,
user_id=triggered_by,
reason="AUTO_NO_SHOW",
details={
"triggered_at": current_time.isoformat(),
"auto_marked": True
}
)
# Send notification to staff and admin (with fallback)
try:
try:
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.shift_update,
channel=NotificationChannel.in_app,
content=f"Your shift was marked as no-show. Please contact your supervisor.",
subject="Shift No-Show",
meta_data={
"shift_id": shift.id,
"status": "no_show",
"auto_marked": True
}
)
except Exception as enum_error:
# Fallback to system_alert if shift_update enum not in database yet
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.system_alert,
channel=NotificationChannel.in_app,
content=f"Your shift was marked as no-show. Please contact your supervisor.",
subject="Shift No-Show",
meta_data={
"shift_id": shift.id,
"status": "no_show",
"auto_marked": True
}
)
except Exception as e:
logger.warning(f"Failed to send notification for shift {shift.id}: {str(e)}")
try:
db.rollback()
except:
pass
logger.warning(f"Shift {shift.id} marked as no-show at {current_time.isoformat()}")
return True
except Exception as e:
logger.error(f"Error marking shift {shift.id} as no-show: {str(e)}", exc_info=True)
db.rollback()
return False
@staticmethod
def manual_status_change(
db: Session,
shift: StaffShift,
new_status: ShiftStatus,
user_id: int,
reason: Optional[str] = None,
notes: Optional[str] = None
) -> bool:
"""
Manually change shift status - ONLY for cancellation
All other status changes (start, complete) are handled automatically by scheduler
Returns True if status was changed, False otherwise
"""
old_status = shift.status
current_time = datetime.utcnow()
# Only allow manual cancellation - all other status changes are automatic
if new_status != ShiftStatus.cancelled:
logger.warning(
f"Manual status change to {new_status.value} not allowed. "
f"Only cancellation is allowed manually. Shift {shift.id} by user {user_id}"
)
return False
# Validate status transition (only cancellation allowed)
if not ShiftAutomationService._is_valid_transition(old_status, new_status):
logger.warning(
f"Invalid status transition from {old_status.value} to {new_status.value} "
f"for shift {shift.id} by user {user_id}"
)
return False
try:
shift.status = new_status
shift.updated_at = current_time
# Update timestamps based on status
if new_status == ShiftStatus.in_progress and not shift.actual_start_time:
shift.actual_start_time = current_time
elif new_status == ShiftStatus.completed and not shift.actual_end_time:
shift.actual_end_time = current_time
# If no actual start time, set it to scheduled start
if not shift.actual_start_time:
start_datetime, _ = ShiftAutomationService.get_shift_datetime(shift)
shift.actual_start_time = start_datetime
# Update notes if provided
if notes:
if shift.notes:
shift.notes += f"\n[{current_time.strftime('%Y-%m-%d %H:%M')}] {notes}"
else:
shift.notes = f"[{current_time.strftime('%Y-%m-%d %H:%M')}] {notes}"
db.commit()
# Log audit trail
ShiftAutomationService._log_status_change(
db=db,
shift_id=shift.id,
old_status=old_status,
new_status=new_status,
user_id=user_id,
reason=reason or "MANUAL",
details={
"notes": notes,
"changed_at": current_time.isoformat()
}
)
# Send notification for cancellation (use system_alert as fallback if shift_update not in DB)
try:
cancellation_message = f"Your shift has been cancelled"
if notes:
cancellation_message += f". Reason: {notes}"
# Try shift_update first, fallback to system_alert if enum not updated in DB
try:
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.shift_update,
channel=NotificationChannel.in_app,
content=cancellation_message,
subject="Shift Cancelled",
meta_data={
"shift_id": shift.id,
"status": "cancelled",
"changed_by": user_id,
"reason": reason,
"notes": notes
}
)
except Exception as enum_error:
# Fallback to system_alert if shift_update enum not in database yet
logger.warning(f"shift_update enum not available, using system_alert: {str(enum_error)}")
NotificationService.send_notification(
db=db,
user_id=shift.staff_id,
notification_type=NotificationType.system_alert,
channel=NotificationChannel.in_app,
content=cancellation_message,
subject="Shift Cancelled",
meta_data={
"shift_id": shift.id,
"status": "cancelled",
"changed_by": user_id,
"reason": reason,
"notes": notes
}
)
except Exception as e:
# Log but don't fail the operation if notification fails
logger.warning(f"Failed to send notification for shift cancellation: {str(e)}")
# Rollback notification transaction if it failed
try:
db.rollback()
except:
pass
logger.info(
f"Shift {shift.id} status changed from {old_status.value} to {new_status.value} "
f"by user {user_id}"
)
return True
except Exception as e:
shift_id_str = str(shift.id) if shift and hasattr(shift, 'id') else 'unknown'
logger.error(f"Error changing shift {shift_id_str} status: {str(e)}", exc_info=True)
try:
db.rollback()
except:
pass
return False
@staticmethod
def _is_valid_transition(old_status: ShiftStatus, new_status: ShiftStatus) -> bool:
"""
Validate if a status transition is allowed
Enterprise business rules for status transitions
"""
# Same status is always valid (no-op)
if old_status == new_status:
return True
# Define valid transitions
valid_transitions = {
ShiftStatus.scheduled: [
ShiftStatus.in_progress,
ShiftStatus.cancelled,
ShiftStatus.no_show
],
ShiftStatus.in_progress: [
ShiftStatus.completed,
ShiftStatus.cancelled
],
ShiftStatus.completed: [], # Terminal state - cannot transition from
ShiftStatus.cancelled: [], # Terminal state - cannot transition from
ShiftStatus.no_show: [
ShiftStatus.cancelled # Can cancel a no-show
]
}
allowed = valid_transitions.get(old_status, [])
return new_status in allowed
@staticmethod
def _log_status_change(
db: Session,
shift_id: int,
old_status: ShiftStatus,
new_status: ShiftStatus,
user_id: Optional[int],
reason: str,
details: Optional[Dict[str, Any]] = None
):
"""Log shift status change to audit log"""
try:
audit_log = AuditLog(
user_id=user_id,
action="shift_status_change",
resource_type="staff_shift",
resource_id=shift_id,
details={
"old_status": old_status.value,
"new_status": new_status.value,
"reason": reason,
**(details or {})
},
status="success"
)
db.add(audit_log)
db.commit()
except Exception as e:
logger.error(f"Failed to log status change for shift {shift_id}: {str(e)}")
# Don't fail the operation if audit logging fails
@staticmethod
def process_shift_automation(db: Session, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""
Process all shifts that need automatic status updates
This is the main method called by the scheduler
Returns statistics about processed shifts
"""
current_time = current_time or datetime.utcnow()
stats = {
"processed": 0,
"auto_started": 0,
"auto_completed": 0,
"marked_no_show": 0,
"errors": 0
}
try:
# Get all shifts that need processing
# Only process shifts from today and future (not too far in the past)
cutoff_date = current_time.date() - timedelta(days=1)
shifts_to_process = db.query(StaffShift).filter(
and_(
func.date(StaffShift.shift_date) >= cutoff_date,
StaffShift.status.in_([ShiftStatus.scheduled, ShiftStatus.in_progress])
)
).all()
logger.info(f"Processing {len(shifts_to_process)} shifts for automation")
for shift in shifts_to_process:
try:
stats["processed"] += 1
# Check for no-show first (highest priority)
if ShiftAutomationService.should_mark_no_show(shift, current_time):
if ShiftAutomationService.mark_no_show(db, shift, current_time):
stats["marked_no_show"] += 1
continue
# Check for auto-complete
if ShiftAutomationService.should_auto_complete(shift, current_time):
if ShiftAutomationService.auto_complete_shift(db, shift, current_time):
stats["auto_completed"] += 1
continue
# Check for auto-start
if ShiftAutomationService.should_auto_start(shift, current_time):
if ShiftAutomationService.auto_start_shift(db, shift, current_time):
stats["auto_started"] += 1
continue
except Exception as e:
logger.error(f"Error processing shift {shift.id}: {str(e)}", exc_info=True)
stats["errors"] += 1
logger.info(
f"Shift automation completed: {stats['auto_started']} started, "
f"{stats['auto_completed']} completed, {stats['marked_no_show']} no-shows, "
f"{stats['errors']} errors"
)
return stats
except Exception as e:
logger.error(f"Error in shift automation process: {str(e)}", exc_info=True)
stats["errors"] += 1
return stats
# Singleton instance
shift_automation_service = ShiftAutomationService()

View File

@@ -0,0 +1,120 @@
"""
Staff Shift Automation Scheduler
Background scheduler for automatic shift status management
"""
import threading
import time
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from ...shared.config.database import get_db
from ...shared.config.logging_config import get_logger
from .shift_automation_service import shift_automation_service
logger = get_logger(__name__)
class ShiftScheduler:
"""Background scheduler for automatic shift status updates"""
def __init__(self):
self.running = False
self.thread: Optional[threading.Thread] = None
self.check_interval_seconds = 60 # Check every minute
self.last_run_time: Optional[datetime] = None
def start(self):
"""Start the background shift scheduler"""
if self.running:
logger.warning("Shift Scheduler is already running")
return
self.running = True
self.thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.thread.start()
logger.info("Shift Scheduler started - automatic shift management enabled")
def stop(self):
"""Stop the background shift scheduler"""
if not self.running:
return
self.running = False
if self.thread:
self.thread.join(timeout=5.0)
logger.info("Shift Scheduler stopped")
def _scheduler_loop(self):
"""Main scheduler loop that runs shift automation periodically"""
logger.info("Shift Scheduler loop started")
# Wait a bit for the app to fully start
time.sleep(10)
while self.running:
try:
current_time = datetime.utcnow()
# Run shift automation
logger.debug("Running scheduled shift automation check...")
stats = self._run_shift_automation()
self.last_run_time = current_time
# Log summary if there were changes
if any([
stats.get("auto_started", 0) > 0,
stats.get("auto_completed", 0) > 0,
stats.get("marked_no_show", 0) > 0
]):
logger.info(
f"Shift automation run completed: "
f"{stats.get('auto_started', 0)} started, "
f"{stats.get('auto_completed', 0)} completed, "
f"{stats.get('marked_no_show', 0)} no-shows"
)
# Sleep until next check
time.sleep(self.check_interval_seconds)
except Exception as e:
logger.error(f"Error in shift scheduler loop: {str(e)}", exc_info=True)
# Sleep for 30 seconds before retrying on error
time.sleep(30)
def _run_shift_automation(self) -> dict:
"""Run shift automation process with database session management"""
db_gen = get_db()
db = next(db_gen)
try:
stats = shift_automation_service.process_shift_automation(db)
return stats
except Exception as e:
logger.error(f"Error running shift automation: {str(e)}", exc_info=True)
return {"processed": 0, "auto_started": 0, "auto_completed": 0, "marked_no_show": 0, "errors": 1}
finally:
db.close()
def get_status(self) -> dict:
"""Get scheduler status information"""
return {
"running": self.running,
"last_run_time": self.last_run_time.isoformat() if self.last_run_time else None,
"check_interval_seconds": self.check_interval_seconds
}
# Singleton instance
_shift_scheduler: Optional[ShiftScheduler] = None
def get_shift_scheduler() -> ShiftScheduler:
"""Get or create the singleton shift scheduler instance"""
global _shift_scheduler
if _shift_scheduler is None:
_shift_scheduler = ShiftScheduler()
return _shift_scheduler

View File

@@ -293,6 +293,7 @@ from .payments.routes.approval_routes import router as financial_approval_routes
from .payments.routes.gl_routes import router as gl_routes
from .payments.routes.reconciliation_routes import router as reconciliation_routes
from .payments.routes.accountant_security_routes import router as accountant_security_routes
from .auth.routes.admin_security_routes import router as admin_security_routes
from .hotel_services.routes import service_routes, service_booking_routes, inventory_routes, guest_request_routes, staff_shift_routes
from .content.routes import (
banner_routes, page_content_routes, home_routes, about_routes,
@@ -330,6 +331,7 @@ app.include_router(financial_approval_routes, prefix=api_prefix)
app.include_router(gl_routes, prefix=api_prefix)
app.include_router(reconciliation_routes, prefix=api_prefix)
app.include_router(accountant_security_routes, prefix=api_prefix)
app.include_router(admin_security_routes, prefix=api_prefix)
app.include_router(banner_routes.router, prefix=api_prefix)
app.include_router(favorite_routes.router, prefix=api_prefix)
app.include_router(service_routes.router, prefix=api_prefix)
@@ -439,6 +441,16 @@ async def startup_event():
logger.error(f'Failed to start AI Training Scheduler: {str(e)}', exc_info=True)
# Don't fail app startup if scheduler fails
# Start Shift Automation Scheduler for automatic shift management
try:
from .hotel_services.services.shift_scheduler import get_shift_scheduler
shift_scheduler = get_shift_scheduler()
shift_scheduler.start()
logger.info('Shift Automation Scheduler started - automatic shift management enabled')
except Exception as e:
logger.error(f'Failed to start Shift Automation Scheduler: {str(e)}', exc_info=True)
# Don't fail app startup if scheduler fails
logger.info(f'{settings.APP_NAME} started successfully')
logger.info(f'Environment: {settings.ENVIRONMENT}')
logger.info(f'Debug mode: {settings.DEBUG}')

View File

@@ -8,6 +8,7 @@ from ..auth.models.role import Role
from ..auth.models.user import User
from ..auth.models.refresh_token import RefreshToken
from ..auth.models.password_reset_token import PasswordResetToken
from ..auth.models.admin_session import AdminSession, AdminActivityLog
# Room models
from ..rooms.models.room_type import RoomType
@@ -111,7 +112,7 @@ from ..integrations.models.webhook import Webhook, WebhookDelivery, WebhookEvent
__all__ = [
# Auth
'Role', 'User', 'RefreshToken', 'PasswordResetToken',
'Role', 'User', 'RefreshToken', 'PasswordResetToken', 'AdminSession', 'AdminActivityLog',
# Rooms
'RoomType', 'Room', 'RoomAttribute', 'RoomMaintenance', 'MaintenanceType', 'MaintenanceStatus',
'RoomInspection', 'InspectionType', 'InspectionStatus', 'RatePlan', 'RatePlanRule', 'RatePlanType', 'RatePlanStatus',

View File

@@ -26,6 +26,7 @@ class NotificationType(str, enum.Enum):
check_out_reminder = 'check_out_reminder'
marketing_campaign = 'marketing_campaign'
loyalty_update = 'loyalty_update'
shift_update = 'shift_update'
system_alert = 'system_alert'
custom = 'custom'

View File

@@ -1,5 +1,6 @@
"""
Step-up authentication middleware for high-risk operations.
Separates accountant and admin security concerns.
"""
from fastapi import Depends, HTTPException, status, Request
from sqlalchemy.orm import Session
@@ -8,7 +9,8 @@ from ...shared.config.database import get_db
from ...security.middleware.auth import get_current_user
from ...auth.models.user import User
from ...payments.services.accountant_security_service import accountant_security_service
from ...shared.utils.role_helpers import is_accountant
from ...auth.services.admin_security_service import admin_security_service
from ...shared.utils.role_helpers import is_accountant, is_admin
from ...shared.config.logging_config import get_logger
logger = get_logger(__name__)
@@ -65,26 +67,25 @@ def require_admin_step_up_auth(
):
"""
Dependency to require step-up authentication for admin-only high-risk operations.
Uses the same step-up mechanism but bypasses accountant role checks.
Uses admin-specific security service, separate from accountant security.
"""
async def step_up_checker(
request: Request,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> User:
from ...shared.utils.role_helpers import is_admin
if not is_admin(current_user, db):
return current_user # Only admins are subject to this dependency
session_token = request.headers.get('X-Session-Token') or request.cookies.get('session_token')
# Try to get session token from header or cookie
session_token = request.headers.get('X-Session-Token') or request.cookies.get('admin_session_token')
requires_step_up, reason = accountant_security_service.require_step_up(
# Check if step-up is required using admin security service
requires_step_up, reason = admin_security_service.require_step_up(
db=db,
user_id=current_user.id,
session_token=session_token,
action_description=action_description,
enforce_role_check=False, # allow admin
action_description=action_description
)
if requires_step_up: