""" 認證 API 端點 """ from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from app.db.session import get_db from app.core.security import ( verify_password, get_password_hash, create_access_token, decode_access_token, verify_ldap_credentials ) from app.models import User, Role from app.schemas.user import LoginRequest, LoginResponse, UserResponse router = APIRouter() security = HTTPBearer() def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """取得當前登入用戶(依賴注入)""" token = credentials.credentials payload = decode_access_token(token) if not payload: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="無效的認證憑證", headers={"WWW-Authenticate": "Bearer"} ) user_id = payload.get("user_id") user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用戶不存在" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="用戶已停用" ) return user def require_roles(*roles: str): """角色權限檢查裝飾器""" def role_checker(current_user: User = Depends(get_current_user)) -> User: if current_user.role.code not in roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="權限不足" ) return current_user return role_checker @router.post("/login", response_model=LoginResponse) def login(request: LoginRequest, db: Session = Depends(get_db)): """用戶登入""" user = db.query(User).filter(User.username == request.username).first() if request.auth_type == "ad": # AD/LDAP 認證 ldap_result = verify_ldap_credentials(request.username, request.password) if not ldap_result: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="AD 認證失敗" ) # 如果用戶不存在,自動建立(首次 AD 登入) if not user: # 取得預設讀者角色 reader_role = db.query(Role).filter(Role.code == "reader").first() if not reader_role: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="系統角色未初始化" ) user = User( username=request.username, display_name=ldap_result.get("display_name", request.username), email=ldap_result.get("email"), auth_type="ad", role_id=reader_role.id ) db.add(user) db.commit() db.refresh(user) else: # 本地認證 if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="帳號或密碼錯誤" ) if user.auth_type.value != "local": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="此帳號請使用 AD 登入" ) if not verify_password(request.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="帳號或密碼錯誤" ) # 更新最後登入時間 user.last_login_at = datetime.utcnow() db.commit() # 產生 Token token = create_access_token({ "user_id": user.id, "username": user.username, "role": user.role.code }) return LoginResponse( token=token, user=UserResponse.model_validate(user) ) @router.post("/logout") def logout(current_user: User = Depends(get_current_user)): """用戶登出""" # JWT 為無狀態,登出僅做記錄 return {"message": "登出成功"} @router.get("/me", response_model=UserResponse) def get_current_user_info(current_user: User = Depends(get_current_user)): """取得當前用戶資訊""" return current_user