"""Membership service for managing room members Handles business logic for room membership operations """ from sqlalchemy.orm import Session from sqlalchemy import and_ from typing import List, Optional from datetime import datetime from app.modules.chat_room.models import RoomMember, IncidentRoom, MemberRole, RoomStatus class MembershipService: """Service for room membership operations""" # System admin email (hardcoded as per requirement) SYSTEM_ADMIN_EMAIL = "ymirliu@panjit.com.tw" def self_join_room( self, db: Session, room_id: str, user_id: str ) -> tuple[Optional[RoomMember], str, Optional[RoomMember]]: """Self-join a room as a viewer Allows any authenticated user to join a room without invitation. User joins as VIEWER role. Args: db: Database session room_id: Room ID user_id: User joining Returns: Tuple of (member, error_code, existing_member) - On success: (member, "", None) - If already member: (None, "already_member", existing_member) - If room archived: (None, "room_archived", None) - If room not found: (None, "room_not_found", None) """ # Check if room exists room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if not room: return None, "room_not_found", None # Check if room is archived if room.status == RoomStatus.ARCHIVED: return None, "room_archived", None # Check if already a member existing = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if existing: return None, "already_member", existing # Create membership as viewer member = RoomMember( room_id=room_id, user_id=user_id, role=MemberRole.VIEWER, added_by=user_id, # Self-added added_at=datetime.utcnow() ) db.add(member) # Update member count self._update_member_count(db, room_id) # Update room activity room.last_activity_at = datetime.utcnow() db.commit() db.refresh(member) return member, "", None def add_member( self, db: Session, room_id: str, user_id: str, role: MemberRole, added_by: str ) -> Optional[RoomMember]: """Add a member to a room Args: db: Database session room_id: Room ID user_id: User to add role: Role to assign added_by: User adding the member Returns: Created member or None if already exists """ # Check if member already exists (active) existing = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if existing: return None # Create new member member = RoomMember( room_id=room_id, user_id=user_id, role=role, added_by=added_by, added_at=datetime.utcnow() ) db.add(member) # Update member count self._update_member_count(db, room_id) db.commit() db.refresh(member) return member def remove_member( self, db: Session, room_id: str, user_id: str ) -> bool: """Remove a member from a room (soft delete) Args: db: Database session room_id: Room ID user_id: User to remove Returns: True if removed, False if not found """ member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if not member: return False # Soft delete member.removed_at = datetime.utcnow() # Update member count self._update_member_count(db, room_id) db.commit() return True def can_change_member_role( self, db: Session, room_id: str, changer_id: str, target_id: str, current_role: MemberRole, new_role: MemberRole ) -> tuple[bool, str]: """Check if a user can change another member's role Permission rules: - OWNER can change any role - EDITOR can upgrade VIEWER → EDITOR only - EDITOR cannot downgrade, remove, or set OWNER role - VIEWER cannot change roles Args: db: Database session room_id: Room ID changer_id: User attempting the change target_id: Target user current_role: Target's current role new_role: Requested new role Returns: Tuple of (allowed, error_message) """ # System admin can do anything if self.is_system_admin(changer_id): return True, "" changer_role = self.get_user_role_in_room(db, room_id, changer_id) if not changer_role: return False, "Not a member of this room" # Owner can change any role if changer_role == MemberRole.OWNER: return True, "" # Editor permissions if changer_role == MemberRole.EDITOR: # Cannot set owner role if new_role == MemberRole.OWNER: return False, "Only owner can transfer ownership" # Can only upgrade viewer to editor if current_role == MemberRole.VIEWER and new_role == MemberRole.EDITOR: return True, "" # Cannot downgrade if current_role == MemberRole.EDITOR and new_role == MemberRole.VIEWER: return False, "Editors can only upgrade members" return False, "Editors can only upgrade viewers to editor" # Viewer cannot change roles return False, "Insufficient permissions" def can_remove_member( self, db: Session, room_id: str, remover_id: str, target_id: str ) -> tuple[bool, str]: """Check if a user can remove another member Permission rules: - OWNER can remove any member - EDITOR cannot remove members - VIEWER cannot remove members - Users can remove themselves (leave room) Args: db: Database session room_id: Room ID remover_id: User attempting the removal target_id: Target user Returns: Tuple of (allowed, error_message) """ # System admin can do anything if self.is_system_admin(remover_id): return True, "" remover_role = self.get_user_role_in_room(db, room_id, remover_id) if not remover_role: return False, "Not a member of this room" # User can leave the room (remove themselves) if remover_id == target_id: # But owner cannot leave if they're the only owner if remover_role == MemberRole.OWNER: members = self.get_room_members(db, room_id) owner_count = sum(1 for m in members if m.role == MemberRole.OWNER) if owner_count == 1: return False, "Cannot leave: you are the only owner" return True, "" # Owner can remove any member if remover_role == MemberRole.OWNER: return True, "" # Editor cannot remove members if remover_role == MemberRole.EDITOR: return False, "Only owner can remove members" # Viewer cannot remove members return False, "Insufficient permissions" def update_member_role( self, db: Session, room_id: str, user_id: str, new_role: MemberRole ) -> Optional[RoomMember]: """Update a member's role Args: db: Database session room_id: Room ID user_id: User ID new_role: New role Returns: Updated member or None if not found """ member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if not member: return None member.role = new_role db.commit() db.refresh(member) return member def transfer_ownership( self, db: Session, room_id: str, current_owner_id: str, new_owner_id: str ) -> bool: """Transfer room ownership to another member Args: db: Database session room_id: Room ID current_owner_id: Current owner's user ID new_owner_id: New owner's user ID Returns: True if successful, False otherwise """ # Verify new owner is a member new_owner = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == new_owner_id, RoomMember.removed_at.is_(None) ) ).first() if not new_owner: return False # Get current owner current_owner = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == current_owner_id, RoomMember.role == MemberRole.OWNER, RoomMember.removed_at.is_(None) ) ).first() if not current_owner: return False # Transfer ownership new_owner.role = MemberRole.OWNER current_owner.role = MemberRole.EDITOR # Update room ownership transfer tracking room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if room: room.ownership_transferred_at = datetime.utcnow() room.ownership_transferred_by = current_owner_id room.last_updated_at = datetime.utcnow() room.last_activity_at = datetime.utcnow() db.commit() return True def get_room_members( self, db: Session, room_id: str ) -> List[RoomMember]: """Get all active members of a room Args: db: Database session room_id: Room ID Returns: List of active members """ return db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.removed_at.is_(None) ) ).all() def get_user_rooms( self, db: Session, user_id: str ) -> List[IncidentRoom]: """Get all rooms where user is a member Args: db: Database session user_id: User ID Returns: List of rooms """ return db.query(IncidentRoom).join(RoomMember).filter( and_( RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).all() def get_user_role_in_room( self, db: Session, room_id: str, user_id: str ) -> Optional[MemberRole]: """Get user's role in a specific room Args: db: Database session room_id: Room ID user_id: User ID Returns: User's role or None if not a member """ member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() return member.role if member else None def check_user_permission( self, db: Session, room_id: str, user_id: str, permission: str ) -> bool: """Check if user has specific permission in room Args: db: Database session room_id: Room ID user_id: User ID permission: Permission to check Returns: True if user has permission, False otherwise """ # Check if user is system admin if self.is_system_admin(user_id): return True # Get user role role = self.get_user_role_in_room(db, room_id, user_id) if not role: return False # Permission matrix permissions = { MemberRole.OWNER: [ "read", "write", "manage_members", "transfer_ownership", "update_status", "delete", "update_metadata" ], MemberRole.EDITOR: [ "read", "write", "add_viewer" ], MemberRole.VIEWER: [ "read" ] } return permission in permissions.get(role, []) def is_system_admin(self, user_email: str) -> bool: """Check if user is system administrator Args: user_email: User's email Returns: True if system admin, False otherwise """ return user_email == self.SYSTEM_ADMIN_EMAIL def _update_member_count(self, db: Session, room_id: str) -> None: """Update room's member count Args: db: Database session room_id: Room ID """ count = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.removed_at.is_(None) ) ).count() room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if room: room.member_count = count # Create singleton instance membership_service = MembershipService()