Complete implementation of the production line incident response system (生產線異常即時反應系統) including: Backend (FastAPI): - User authentication with AD integration and session management - Chat room management (create, list, update, members, roles) - Real-time messaging via WebSocket (typing indicators, reactions) - File storage with MinIO (upload, download, image preview) Frontend (React + Vite): - Authentication flow with token management - Room list with filtering, search, and pagination - Real-time chat interface with WebSocket - File upload with drag-and-drop and image preview - Member management and room settings - Breadcrumb navigation - 53 unit tests (Vitest) Specifications: - authentication: AD auth, sessions, JWT tokens - chat-room: rooms, members, templates - realtime-messaging: WebSocket, messages, reactions - file-storage: MinIO integration, file management - frontend-core: React SPA structure 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
164 lines
4.6 KiB
Python
164 lines
4.6 KiB
Python
"""Dependencies for chat room management
|
|
|
|
FastAPI dependency injection functions for authentication and authorization
|
|
"""
|
|
from fastapi import Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from typing import Optional
|
|
|
|
from app.core.database import get_db
|
|
from app.modules.auth import get_current_user
|
|
from app.modules.chat_room.models import IncidentRoom, MemberRole
|
|
from app.modules.chat_room.services.membership_service import membership_service
|
|
from app.modules.chat_room.services.room_service import room_service
|
|
|
|
|
|
def get_current_room(
|
|
room_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> IncidentRoom:
|
|
"""Get current room with access validation
|
|
|
|
Args:
|
|
room_id: Room ID from path parameter
|
|
db: Database session
|
|
current_user: Current authenticated user
|
|
|
|
Returns:
|
|
Room instance
|
|
|
|
Raises:
|
|
HTTPException: 404 if room not found, 403 if no access
|
|
"""
|
|
user_email = current_user["username"]
|
|
is_admin = membership_service.is_system_admin(user_email)
|
|
|
|
room = room_service.get_room(db, room_id, user_email, is_admin)
|
|
|
|
if not room:
|
|
# Check if room exists at all
|
|
room_exists = db.query(IncidentRoom).filter(
|
|
IncidentRoom.room_id == room_id
|
|
).first()
|
|
|
|
if not room_exists:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Room not found"
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not a member of this room"
|
|
)
|
|
|
|
return room
|
|
|
|
|
|
def require_room_permission(permission: str):
|
|
"""Create a dependency that requires specific permission in room
|
|
|
|
Args:
|
|
permission: Required permission
|
|
|
|
Returns:
|
|
Dependency function
|
|
"""
|
|
def permission_checker(
|
|
room_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: dict = Depends(get_current_user)
|
|
):
|
|
"""Check if user has required permission in room
|
|
|
|
Args:
|
|
room_id: Room ID from path parameter
|
|
db: Database session
|
|
current_user: Current authenticated user
|
|
|
|
Raises:
|
|
HTTPException: 403 if insufficient permissions
|
|
"""
|
|
user_email = current_user["username"]
|
|
|
|
if not membership_service.check_user_permission(db, room_id, user_email, permission):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Insufficient permissions: {permission} required"
|
|
)
|
|
|
|
return permission_checker
|
|
|
|
|
|
def validate_room_owner(
|
|
room_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: dict = Depends(get_current_user)
|
|
):
|
|
"""Validate that current user is room owner (or admin)
|
|
|
|
Args:
|
|
room_id: Room ID from path parameter
|
|
db: Database session
|
|
current_user: Current authenticated user
|
|
|
|
Raises:
|
|
HTTPException: 403 if not owner or admin
|
|
"""
|
|
user_email = current_user["username"]
|
|
|
|
# Check if admin
|
|
if membership_service.is_system_admin(user_email):
|
|
return
|
|
|
|
# Check if owner
|
|
role = membership_service.get_user_role_in_room(db, room_id, user_email)
|
|
|
|
if role != MemberRole.OWNER:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only room owner can perform this operation"
|
|
)
|
|
|
|
|
|
def require_admin(current_user: dict = Depends(get_current_user)):
|
|
"""Require system administrator privileges
|
|
|
|
Args:
|
|
current_user: Current authenticated user
|
|
|
|
Raises:
|
|
HTTPException: 403 if not system admin
|
|
"""
|
|
user_email = current_user["username"]
|
|
|
|
if not membership_service.is_system_admin(user_email):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Administrator privileges required"
|
|
)
|
|
|
|
|
|
def get_user_effective_role(
|
|
room_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Optional[MemberRole]:
|
|
"""Get user's effective role in room (considers admin override)
|
|
|
|
Args:
|
|
room_id: Room ID from path parameter
|
|
db: Database session
|
|
current_user: Current authenticated user
|
|
|
|
Returns:
|
|
User's role or None if not a member (admin always gets OWNER role)
|
|
"""
|
|
user_email = current_user["username"]
|
|
|
|
# Admin always has owner privileges
|
|
if membership_service.is_system_admin(user_email):
|
|
return MemberRole.OWNER
|
|
|
|
return membership_service.get_user_role_in_room(db, room_id, user_email) |