Files
Task_Reporter/app/modules/chat_room/services/membership_service.py
egg 1d5d4d447d feat: Add mobile responsive layout, open room access, and admin room management
Mobile Responsive Layout:
- Add useMediaQuery, useIsMobile, useIsTablet, useIsDesktop hooks for device detection
- Create MobileHeader component with hamburger menu and action drawer
- Create BottomToolbar for mobile navigation (Files, Members)
- Create SlidePanel component for full-screen mobile sidebars
- Update RoomDetail.tsx with mobile/desktop conditional rendering
- Update RoomList.tsx with single-column grid and touch-friendly buttons
- Add CSS custom properties for safe areas and touch targets (min 44px)
- Add mobile viewport meta tags for notched devices

Open Room Access:
- All authenticated users can view all rooms (not just their own)
- Users can join active rooms they're not members of
- Add is_member field to room responses
- Update room list API to return all rooms by default

Admin Room Management:
- Add permanent delete functionality for system admins
- Add delete confirmation dialog with room title verification
- Broadcast room deletion via WebSocket to connected users
- Add users search API for adding members

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-05 09:12:10 +08:00

526 lines
14 KiB
Python

"""Membership service for managing room members
Handles business logic for room membership operations
"""
from sqlalchemy.orm import Session
from sqlalchemy import and_
from typing import List, Optional
from datetime import datetime
from app.modules.chat_room.models import RoomMember, IncidentRoom, MemberRole, RoomStatus
class MembershipService:
"""Service for room membership operations"""
# System admin email (hardcoded as per requirement)
SYSTEM_ADMIN_EMAIL = "ymirliu@panjit.com.tw"
def self_join_room(
self,
db: Session,
room_id: str,
user_id: str
) -> tuple[Optional[RoomMember], str, Optional[RoomMember]]:
"""Self-join a room as a viewer
Allows any authenticated user to join a room without invitation.
User joins as VIEWER role.
Args:
db: Database session
room_id: Room ID
user_id: User joining
Returns:
Tuple of (member, error_code, existing_member)
- On success: (member, "", None)
- If already member: (None, "already_member", existing_member)
- If room archived: (None, "room_archived", None)
- If room not found: (None, "room_not_found", None)
"""
# Check if room exists
room = db.query(IncidentRoom).filter(
IncidentRoom.room_id == room_id
).first()
if not room:
return None, "room_not_found", None
# Check if room is archived
if room.status == RoomStatus.ARCHIVED:
return None, "room_archived", None
# Check if already a member
existing = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == user_id,
RoomMember.removed_at.is_(None)
)
).first()
if existing:
return None, "already_member", existing
# Create membership as viewer
member = RoomMember(
room_id=room_id,
user_id=user_id,
role=MemberRole.VIEWER,
added_by=user_id, # Self-added
added_at=datetime.utcnow()
)
db.add(member)
# Update member count
self._update_member_count(db, room_id)
# Update room activity
room.last_activity_at = datetime.utcnow()
db.commit()
db.refresh(member)
return member, "", None
def add_member(
self,
db: Session,
room_id: str,
user_id: str,
role: MemberRole,
added_by: str
) -> Optional[RoomMember]:
"""Add a member to a room
Args:
db: Database session
room_id: Room ID
user_id: User to add
role: Role to assign
added_by: User adding the member
Returns:
Created member or None if already exists
"""
# Check if member already exists (active)
existing = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == user_id,
RoomMember.removed_at.is_(None)
)
).first()
if existing:
return None
# Create new member
member = RoomMember(
room_id=room_id,
user_id=user_id,
role=role,
added_by=added_by,
added_at=datetime.utcnow()
)
db.add(member)
# Update member count
self._update_member_count(db, room_id)
db.commit()
db.refresh(member)
return member
def remove_member(
self,
db: Session,
room_id: str,
user_id: str
) -> bool:
"""Remove a member from a room (soft delete)
Args:
db: Database session
room_id: Room ID
user_id: User to remove
Returns:
True if removed, False if not found
"""
member = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == user_id,
RoomMember.removed_at.is_(None)
)
).first()
if not member:
return False
# Soft delete
member.removed_at = datetime.utcnow()
# Update member count
self._update_member_count(db, room_id)
db.commit()
return True
def can_change_member_role(
self,
db: Session,
room_id: str,
changer_id: str,
target_id: str,
current_role: MemberRole,
new_role: MemberRole
) -> tuple[bool, str]:
"""Check if a user can change another member's role
Permission rules:
- OWNER can change any role
- EDITOR can upgrade VIEWER → EDITOR only
- EDITOR cannot downgrade, remove, or set OWNER role
- VIEWER cannot change roles
Args:
db: Database session
room_id: Room ID
changer_id: User attempting the change
target_id: Target user
current_role: Target's current role
new_role: Requested new role
Returns:
Tuple of (allowed, error_message)
"""
# System admin can do anything
if self.is_system_admin(changer_id):
return True, ""
changer_role = self.get_user_role_in_room(db, room_id, changer_id)
if not changer_role:
return False, "Not a member of this room"
# Owner can change any role
if changer_role == MemberRole.OWNER:
return True, ""
# Editor permissions
if changer_role == MemberRole.EDITOR:
# Cannot set owner role
if new_role == MemberRole.OWNER:
return False, "Only owner can transfer ownership"
# Can only upgrade viewer to editor
if current_role == MemberRole.VIEWER and new_role == MemberRole.EDITOR:
return True, ""
# Cannot downgrade
if current_role == MemberRole.EDITOR and new_role == MemberRole.VIEWER:
return False, "Editors can only upgrade members"
return False, "Editors can only upgrade viewers to editor"
# Viewer cannot change roles
return False, "Insufficient permissions"
def can_remove_member(
self,
db: Session,
room_id: str,
remover_id: str,
target_id: str
) -> tuple[bool, str]:
"""Check if a user can remove another member
Permission rules:
- OWNER can remove any member
- EDITOR cannot remove members
- VIEWER cannot remove members
- Users can remove themselves (leave room)
Args:
db: Database session
room_id: Room ID
remover_id: User attempting the removal
target_id: Target user
Returns:
Tuple of (allowed, error_message)
"""
# System admin can do anything
if self.is_system_admin(remover_id):
return True, ""
remover_role = self.get_user_role_in_room(db, room_id, remover_id)
if not remover_role:
return False, "Not a member of this room"
# User can leave the room (remove themselves)
if remover_id == target_id:
# But owner cannot leave if they're the only owner
if remover_role == MemberRole.OWNER:
members = self.get_room_members(db, room_id)
owner_count = sum(1 for m in members if m.role == MemberRole.OWNER)
if owner_count == 1:
return False, "Cannot leave: you are the only owner"
return True, ""
# Owner can remove any member
if remover_role == MemberRole.OWNER:
return True, ""
# Editor cannot remove members
if remover_role == MemberRole.EDITOR:
return False, "Only owner can remove members"
# Viewer cannot remove members
return False, "Insufficient permissions"
def update_member_role(
self,
db: Session,
room_id: str,
user_id: str,
new_role: MemberRole
) -> Optional[RoomMember]:
"""Update a member's role
Args:
db: Database session
room_id: Room ID
user_id: User ID
new_role: New role
Returns:
Updated member or None if not found
"""
member = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == user_id,
RoomMember.removed_at.is_(None)
)
).first()
if not member:
return None
member.role = new_role
db.commit()
db.refresh(member)
return member
def transfer_ownership(
self,
db: Session,
room_id: str,
current_owner_id: str,
new_owner_id: str
) -> bool:
"""Transfer room ownership to another member
Args:
db: Database session
room_id: Room ID
current_owner_id: Current owner's user ID
new_owner_id: New owner's user ID
Returns:
True if successful, False otherwise
"""
# Verify new owner is a member
new_owner = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == new_owner_id,
RoomMember.removed_at.is_(None)
)
).first()
if not new_owner:
return False
# Get current owner
current_owner = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == current_owner_id,
RoomMember.role == MemberRole.OWNER,
RoomMember.removed_at.is_(None)
)
).first()
if not current_owner:
return False
# Transfer ownership
new_owner.role = MemberRole.OWNER
current_owner.role = MemberRole.EDITOR
# Update room ownership transfer tracking
room = db.query(IncidentRoom).filter(
IncidentRoom.room_id == room_id
).first()
if room:
room.ownership_transferred_at = datetime.utcnow()
room.ownership_transferred_by = current_owner_id
room.last_updated_at = datetime.utcnow()
room.last_activity_at = datetime.utcnow()
db.commit()
return True
def get_room_members(
self,
db: Session,
room_id: str
) -> List[RoomMember]:
"""Get all active members of a room
Args:
db: Database session
room_id: Room ID
Returns:
List of active members
"""
return db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.removed_at.is_(None)
)
).all()
def get_user_rooms(
self,
db: Session,
user_id: str
) -> List[IncidentRoom]:
"""Get all rooms where user is a member
Args:
db: Database session
user_id: User ID
Returns:
List of rooms
"""
return db.query(IncidentRoom).join(RoomMember).filter(
and_(
RoomMember.user_id == user_id,
RoomMember.removed_at.is_(None)
)
).all()
def get_user_role_in_room(
self,
db: Session,
room_id: str,
user_id: str
) -> Optional[MemberRole]:
"""Get user's role in a specific room
Args:
db: Database session
room_id: Room ID
user_id: User ID
Returns:
User's role or None if not a member
"""
member = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.user_id == user_id,
RoomMember.removed_at.is_(None)
)
).first()
return member.role if member else None
def check_user_permission(
self,
db: Session,
room_id: str,
user_id: str,
permission: str
) -> bool:
"""Check if user has specific permission in room
Args:
db: Database session
room_id: Room ID
user_id: User ID
permission: Permission to check
Returns:
True if user has permission, False otherwise
"""
# Check if user is system admin
if self.is_system_admin(user_id):
return True
# Get user role
role = self.get_user_role_in_room(db, room_id, user_id)
if not role:
return False
# Permission matrix
permissions = {
MemberRole.OWNER: [
"read", "write", "manage_members", "transfer_ownership",
"update_status", "delete", "update_metadata"
],
MemberRole.EDITOR: [
"read", "write", "add_viewer"
],
MemberRole.VIEWER: [
"read"
]
}
return permission in permissions.get(role, [])
def is_system_admin(self, user_email: str) -> bool:
"""Check if user is system administrator
Args:
user_email: User's email
Returns:
True if system admin, False otherwise
"""
return user_email == self.SYSTEM_ADMIN_EMAIL
def _update_member_count(self, db: Session, room_id: str) -> None:
"""Update room's member count
Args:
db: Database session
room_id: Room ID
"""
count = db.query(RoomMember).filter(
and_(
RoomMember.room_id == room_id,
RoomMember.removed_at.is_(None)
)
).count()
room = db.query(IncidentRoom).filter(
IncidentRoom.room_id == room_id
).first()
if room:
room.member_count = count
# Create singleton instance
membership_service = MembershipService()