"""Membership service for managing room members Handles business logic for room membership operations """ from sqlalchemy.orm import Session from sqlalchemy import and_ from typing import List, Optional from datetime import datetime from app.modules.chat_room.models import RoomMember, IncidentRoom, MemberRole class MembershipService: """Service for room membership operations""" # System admin email (hardcoded as per requirement) SYSTEM_ADMIN_EMAIL = "ymirliu@panjit.com.tw" def add_member( self, db: Session, room_id: str, user_id: str, role: MemberRole, added_by: str ) -> Optional[RoomMember]: """Add a member to a room Args: db: Database session room_id: Room ID user_id: User to add role: Role to assign added_by: User adding the member Returns: Created member or None if already exists """ # Check if member already exists (active) existing = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if existing: return None # Create new member member = RoomMember( room_id=room_id, user_id=user_id, role=role, added_by=added_by, added_at=datetime.utcnow() ) db.add(member) # Update member count self._update_member_count(db, room_id) db.commit() db.refresh(member) return member def remove_member( self, db: Session, room_id: str, user_id: str ) -> bool: """Remove a member from a room (soft delete) Args: db: Database session room_id: Room ID user_id: User to remove Returns: True if removed, False if not found """ member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if not member: return False # Soft delete member.removed_at = datetime.utcnow() # Update member count self._update_member_count(db, room_id) db.commit() return True def update_member_role( self, db: Session, room_id: str, user_id: str, new_role: MemberRole ) -> Optional[RoomMember]: """Update a member's role Args: db: Database session room_id: Room ID user_id: User ID new_role: New role Returns: Updated member or None if not found """ member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() if not member: return None member.role = new_role db.commit() db.refresh(member) return member def transfer_ownership( self, db: Session, room_id: str, current_owner_id: str, new_owner_id: str ) -> bool: """Transfer room ownership to another member Args: db: Database session room_id: Room ID current_owner_id: Current owner's user ID new_owner_id: New owner's user ID Returns: True if successful, False otherwise """ # Verify new owner is a member new_owner = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == new_owner_id, RoomMember.removed_at.is_(None) ) ).first() if not new_owner: return False # Get current owner current_owner = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == current_owner_id, RoomMember.role == MemberRole.OWNER, RoomMember.removed_at.is_(None) ) ).first() if not current_owner: return False # Transfer ownership new_owner.role = MemberRole.OWNER current_owner.role = MemberRole.EDITOR # Update room ownership transfer tracking room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if room: room.ownership_transferred_at = datetime.utcnow() room.ownership_transferred_by = current_owner_id room.last_updated_at = datetime.utcnow() room.last_activity_at = datetime.utcnow() db.commit() return True def get_room_members( self, db: Session, room_id: str ) -> List[RoomMember]: """Get all active members of a room Args: db: Database session room_id: Room ID Returns: List of active members """ return db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.removed_at.is_(None) ) ).all() def get_user_rooms( self, db: Session, user_id: str ) -> List[IncidentRoom]: """Get all rooms where user is a member Args: db: Database session user_id: User ID Returns: List of rooms """ return db.query(IncidentRoom).join(RoomMember).filter( and_( RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).all() def get_user_role_in_room( self, db: Session, room_id: str, user_id: str ) -> Optional[MemberRole]: """Get user's role in a specific room Args: db: Database session room_id: Room ID user_id: User ID Returns: User's role or None if not a member """ member = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.user_id == user_id, RoomMember.removed_at.is_(None) ) ).first() return member.role if member else None def check_user_permission( self, db: Session, room_id: str, user_id: str, permission: str ) -> bool: """Check if user has specific permission in room Args: db: Database session room_id: Room ID user_id: User ID permission: Permission to check Returns: True if user has permission, False otherwise """ # Check if user is system admin if self.is_system_admin(user_id): return True # Get user role role = self.get_user_role_in_room(db, room_id, user_id) if not role: return False # Permission matrix permissions = { MemberRole.OWNER: [ "read", "write", "manage_members", "transfer_ownership", "update_status", "delete", "update_metadata" ], MemberRole.EDITOR: [ "read", "write", "add_viewer" ], MemberRole.VIEWER: [ "read" ] } return permission in permissions.get(role, []) def is_system_admin(self, user_email: str) -> bool: """Check if user is system administrator Args: user_email: User's email Returns: True if system admin, False otherwise """ return user_email == self.SYSTEM_ADMIN_EMAIL def _update_member_count(self, db: Session, room_id: str) -> None: """Update room's member count Args: db: Database session room_id: Room ID """ count = db.query(RoomMember).filter( and_( RoomMember.room_id == room_id, RoomMember.removed_at.is_(None) ) ).count() room = db.query(IncidentRoom).filter( IncidentRoom.room_id == room_id ).first() if room: room.member_count = count # Create singleton instance membership_service = MembershipService()