""" 用戶管理 API 端點 """ from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from sqlalchemy import func from app.db.session import get_db from app.core.security import get_password_hash from app.models import User, Role from app.schemas.user import ( UserCreate, UserUpdate, UserResponse, UserListResponse, PaginationResponse ) from app.api.v1.endpoints.auth import get_current_user, require_roles router = APIRouter() @router.get("", response_model=UserListResponse) def list_users( page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), role: Optional[str] = None, auth_type: Optional[str] = None, search: Optional[str] = None, current_user: User = Depends(require_roles("admin")) ): """取得用戶列表(僅管理員)""" db = next(get_db()) query = db.query(User) # 篩選條件 if role: query = query.join(Role).filter(Role.code == role) if auth_type: query = query.filter(User.auth_type == auth_type) if search: # 清理輸入,移除特殊字元,防止注入 safe_search = search.strip()[:100] # 限制長度 # SQLAlchemy 的 ilike 已經使用參數化查詢,相對安全 # 但為了額外安全,轉義 SQL 萬用字元 safe_search = safe_search.replace('%', '\\%').replace('_', '\\_') query = query.filter( (User.username.ilike(f"%{safe_search}%")) | (User.display_name.ilike(f"%{safe_search}%")) ) # 總數 total = query.count() # 分頁 users = query.offset((page - 1) * limit).limit(limit).all() return UserListResponse( data=[UserResponse.model_validate(u) for u in users], pagination=PaginationResponse( page=page, limit=limit, total=total, total_pages=(total + limit - 1) // limit ) ) @router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user_in: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(require_roles("admin")) ): """新增用戶(僅管理員)""" # 檢查帳號是否重複 existing = db.query(User).filter(User.username == user_in.username).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="帳號已存在" ) # 檢查角色 role = db.query(Role).filter(Role.id == user_in.role_id).first() if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="角色不存在" ) # 本地帳號必須有密碼 if user_in.auth_type == "local" and not user_in.password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="本地帳號必須設定密碼" ) user = User( username=user_in.username, display_name=user_in.display_name, email=user_in.email, auth_type=user_in.auth_type, role_id=user_in.role_id, password_hash=get_password_hash(user_in.password) if user_in.password else None ) db.add(user) db.commit() db.refresh(user) return user @router.get("/{user_id}", response_model=UserResponse) def get_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_roles("admin")) ): """取得單一用戶(僅管理員)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用戶不存在" ) return user @router.put("/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_in: UserUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_roles("admin")) ): """更新用戶(僅管理員)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用戶不存在" ) # 更新欄位 if user_in.display_name is not None: user.display_name = user_in.display_name if user_in.email is not None: user.email = user_in.email if user_in.role_id is not None: role = db.query(Role).filter(Role.id == user_in.role_id).first() if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="角色不存在" ) user.role_id = user_in.role_id if user_in.is_active is not None: user.is_active = user_in.is_active if user_in.password is not None: if user.auth_type.value != "local": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="AD 帳號無法修改密碼" ) user.password_hash = get_password_hash(user_in.password) db.commit() db.refresh(user) return user @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(require_roles("admin")) ): """刪除用戶(僅管理員)""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="用戶不存在" ) # 不能刪除自己 if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="無法刪除自己的帳號" ) db.delete(user) db.commit()