Files
egg 01aee1fd0d feat: Extract hardcoded configs to environment variables
- Add environment variable configuration for backend and frontend
- Backend: DB_POOL_SIZE, JWT_EXPIRE_HOURS, timeout configs, directory paths
- Frontend: VITE_API_BASE_URL, VITE_UPLOAD_TIMEOUT, Whisper configs
- Create deployment script (scripts/deploy-backend.sh)
- Create 1Panel deployment guide (docs/1panel-deployment.md)
- Update DEPLOYMENT.md with env var documentation
- Create README.md with project overview
- Remove obsolete PRD.md, SDD.md, TDD.md (replaced by OpenSpec)
- Keep CORS allow_origins=["*"] for Electron EXE distribution

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-14 14:31:55 +08:00

110 lines
3.7 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import httpx
from jose import jwt, JWTError
from datetime import datetime, timedelta
from typing import Optional
from ..config import settings
from ..models import LoginRequest, LoginResponse, TokenPayload
router = APIRouter()
security = HTTPBearer()
def create_token(email: str, role: str) -> str:
"""Create a JWT token with email and role."""
payload = {
"email": email,
"role": role,
"exp": datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRE_HOURS),
}
return jwt.encode(payload, settings.JWT_SECRET, algorithm="HS256")
def decode_token(token: str) -> TokenPayload:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
return TokenPayload(**payload)
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> TokenPayload:
"""Dependency to get current authenticated user."""
token = credentials.credentials
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
return TokenPayload(**payload)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail={"error": "token_expired", "message": "Token has expired"},
)
except JWTError:
raise HTTPException(
status_code=401,
detail={"error": "invalid_token", "message": "Invalid token"},
)
def is_admin(user: TokenPayload) -> bool:
"""Check if user has admin role."""
return user.role == "admin"
@router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest):
"""
Proxy login to company Auth API.
Adds admin role for ymirliu@panjit.com.tw.
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
settings.AUTH_API_URL,
json={"username": request.email, "password": request.password},
timeout=settings.auth_timeout_seconds,
)
if response.status_code == 401:
raise HTTPException(status_code=401, detail="Invalid credentials")
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail="Authentication service error",
)
# Parse response from external Auth API
auth_data = response.json()
# Check if authentication was successful
if not auth_data.get("success"):
error_msg = auth_data.get("error", "Authentication failed")
raise HTTPException(status_code=401, detail=error_msg)
# Determine role
role = "admin" if request.email == settings.ADMIN_EMAIL else "user"
# Create our own token with role info
token = create_token(request.email, role)
return LoginResponse(token=token, email=request.email, role=role)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Authentication service timeout")
except httpx.RequestError:
raise HTTPException(
status_code=503, detail="Authentication service unavailable"
)
@router.get("/me")
async def get_me(current_user: TokenPayload = Depends(get_current_user)):
"""Get current user information."""
return {"email": current_user.email, "role": current_user.role}