Files
donald db0f0bbfe7 Initial commit: Daily News App
企業內部新聞彙整與分析系統
- 自動新聞抓取 (Digitimes, 經濟日報, 工商時報)
- AI 智慧摘要 (OpenAI/Claude/Ollama)
- 群組管理與訂閱通知
- 已清理 Python 快取檔案

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 23:53:24 +08:00

152 lines
4.5 KiB
Python

"""
認證 API 端點
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.core.security import (
verify_password,
get_password_hash,
create_access_token,
decode_access_token,
verify_ldap_credentials
)
from app.models import User, Role
from app.schemas.user import LoginRequest, LoginResponse, UserResponse
router = APIRouter()
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""取得當前登入用戶(依賴注入)"""
token = credentials.credentials
payload = decode_access_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無效的認證憑證",
headers={"WWW-Authenticate": "Bearer"}
)
user_id = payload.get("user_id")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用戶不存在"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="用戶已停用"
)
return user
def require_roles(*roles: str):
"""角色權限檢查裝飾器"""
def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role.code not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="權限不足"
)
return current_user
return role_checker
@router.post("/login", response_model=LoginResponse)
def login(request: LoginRequest, db: Session = Depends(get_db)):
"""用戶登入"""
user = db.query(User).filter(User.username == request.username).first()
if request.auth_type == "ad":
# AD/LDAP 認證
ldap_result = verify_ldap_credentials(request.username, request.password)
if not ldap_result:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="AD 認證失敗"
)
# 如果用戶不存在,自動建立(首次 AD 登入)
if not user:
# 取得預設讀者角色
reader_role = db.query(Role).filter(Role.code == "reader").first()
if not reader_role:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="系統角色未初始化"
)
user = User(
username=request.username,
display_name=ldap_result.get("display_name", request.username),
email=ldap_result.get("email"),
auth_type="ad",
role_id=reader_role.id
)
db.add(user)
db.commit()
db.refresh(user)
else:
# 本地認證
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="帳號或密碼錯誤"
)
if user.auth_type.value != "local":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="此帳號請使用 AD 登入"
)
if not verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="帳號或密碼錯誤"
)
# 更新最後登入時間
user.last_login_at = datetime.utcnow()
db.commit()
# 產生 Token
token = create_access_token({
"user_id": user.id,
"username": user.username,
"role": user.role.code
})
return LoginResponse(
token=token,
user=UserResponse.model_validate(user)
)
@router.post("/logout")
def logout(current_user: User = Depends(get_current_user)):
"""用戶登出"""
# JWT 為無狀態,登出僅做記錄
return {"message": "登出成功"}
@router.get("/me", response_model=UserResponse)
def get_current_user_info(current_user: User = Depends(get_current_user)):
"""取得當前用戶資訊"""
return current_user