"""Authentication API endpoints 提供: - POST /api/auth/login - 使用者登入 - POST /api/auth/logout - 使用者登出 """ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.core.database import get_db from app.modules.auth.schemas import LoginRequest, LoginResponse, LogoutResponse, ErrorResponse from app.modules.auth.services.ad_client import ad_auth_service from app.modules.auth.services.encryption import encryption_service from app.modules.auth.services.session_service import session_service from fastapi import Header from typing import Optional router = APIRouter(prefix="/api/auth", tags=["Authentication"]) @router.post( "/login", response_model=LoginResponse, responses={ 401: {"model": ErrorResponse, "description": "Invalid credentials"}, 503: {"model": ErrorResponse, "description": "Authentication service unavailable"}, }, ) async def login(request: LoginRequest, db: Session = Depends(get_db)): """使用者登入 流程: 1. 呼叫 AD API 驗證憑證 2. 加密密碼(用於自動刷新) 3. 生成 internal token (UUID) 4. 儲存 session 到資料庫 5. 回傳 internal token 和 display_name """ try: # Step 1: Authenticate with AD API ad_result = await ad_auth_service.authenticate(request.username, request.password) except ValueError as e: # Invalid credentials raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials" ) except ConnectionError as e: # AD API unavailable raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Authentication service unavailable", ) # Step 2: Encrypt password for future auto-refresh encrypted_password = encryption_service.encrypt_password(request.password) # Step 3 & 4: Generate internal token and create session user_session = session_service.create_session( db=db, username=request.username, display_name=ad_result["username"], ad_token=ad_result["token"], encrypted_password=encrypted_password, ad_token_expires_at=ad_result["expires_at"], ) # Step 5: Return internal token to client return LoginResponse(token=user_session.internal_token, display_name=user_session.display_name) @router.post( "/logout", response_model=LogoutResponse, responses={401: {"model": ErrorResponse, "description": "No authentication token provided"}}, ) async def logout(authorization: Optional[str] = Header(None), db: Session = Depends(get_db)): """使用者登出 刪除 session 記錄,使 token 失效 """ if not authorization or not authorization.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="No authentication token provided" ) # Extract token internal_token = authorization.replace("Bearer ", "") # Find and delete session user_session = session_service.get_session_by_token(db, internal_token) if user_session: session_service.delete_session(db, user_session.id) return LogoutResponse(message="Logout successful")