"""Dependencies for chat room management FastAPI dependency injection functions for authentication and authorization """ from fastapi import Depends, HTTPException, status from sqlalchemy.orm import Session from typing import Optional from app.core.database import get_db from app.modules.auth import get_current_user from app.modules.chat_room.models import IncidentRoom, MemberRole from app.modules.chat_room.services.membership_service import membership_service from app.modules.chat_room.services.room_service import room_service def get_current_room( room_id: str, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user) ) -> IncidentRoom: """Get current room with access validation Args: room_id: Room ID from path parameter db: Database session current_user: Current authenticated user Returns: Room instance Raises: HTTPException: 404 if room not found, 403 if no access """ user_email = current_user["username"] is_admin = membership_service.is_system_admin(user_email) room = room_service.get_room(db, room_id, user_email, is_admin) if not room: # Check if room exists at all room_exists = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if not room_exists: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Room not found" ) else: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not a member of this room" ) return room def require_room_permission(permission: str): """Create a dependency that requires specific permission in room Args: permission: Required permission Returns: Dependency function """ def permission_checker( room_id: str, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user) ): """Check if user has required permission in room Args: room_id: Room ID from path parameter db: Database session current_user: Current authenticated user Raises: HTTPException: 403 if insufficient permissions """ user_email = current_user["username"] if not membership_service.check_user_permission(db, room_id, user_email, permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Insufficient permissions: {permission} required" ) return permission_checker def validate_room_owner( room_id: str, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user) ): """Validate that current user is room owner (or admin) Args: room_id: Room ID from path parameter db: Database session current_user: Current authenticated user Raises: HTTPException: 403 if not owner or admin """ user_email = current_user["username"] # Check if admin if membership_service.is_system_admin(user_email): return # Check if owner role = membership_service.get_user_role_in_room(db, room_id, user_email) if role != MemberRole.OWNER: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only room owner can perform this operation" ) def require_admin(current_user: dict = Depends(get_current_user)): """Require system administrator privileges Args: current_user: Current authenticated user Raises: HTTPException: 403 if not system admin """ user_email = current_user["username"] if not membership_service.is_system_admin(user_email): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Administrator privileges required" ) def get_user_effective_role( room_id: str, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user) ) -> Optional[MemberRole]: """Get user's effective role in room (considers admin override) Args: room_id: Room ID from path parameter db: Database session current_user: Current authenticated user Returns: User's role or None if not a member (admin always gets OWNER role) """ user_email = current_user["username"] # Admin always has owner privileges if membership_service.is_system_admin(user_email): return MemberRole.OWNER return membership_service.get_user_role_in_room(db, room_id, user_email)