Enterprise Meeting Knowledge Management System with: Backend (FastAPI): - Authentication proxy with JWT (pj-auth-api integration) - MySQL database with 4 tables (users, meetings, conclusions, actions) - Meeting CRUD with system code generation (C-YYYYMMDD-XX, A-YYYYMMDD-XX) - Dify LLM integration for AI summarization - Excel export with openpyxl - 20 unit tests (all passing) Client (Electron): - Login page with company auth - Meeting list with create/delete - Meeting detail with real-time transcription - Editable transcript textarea (single block, easy editing) - AI summarization with conclusions/action items - 5-second segment recording (efficient for long meetings) Sidecar (Python): - faster-whisper medium model with int8 quantization - ONNX Runtime VAD (lightweight, ~20MB vs PyTorch ~2GB) - Chinese punctuation processing - OpenCC for Traditional Chinese conversion - Anti-hallucination parameters - Auto-cleanup of temp audio files OpenSpec: - add-meeting-assistant-mvp (47 tasks, archived) - add-realtime-transcription (29 tasks, archived) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
110 lines
3.6 KiB
Python
110 lines
3.6 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, Header
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
import httpx
|
|
from jose import jwt, JWTError
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from ..config import settings
|
|
from ..models import LoginRequest, LoginResponse, TokenPayload
|
|
|
|
router = APIRouter()
|
|
security = HTTPBearer()
|
|
|
|
|
|
def create_token(email: str, role: str) -> str:
|
|
"""Create a JWT token with email and role."""
|
|
payload = {
|
|
"email": email,
|
|
"role": role,
|
|
"exp": datetime.utcnow() + timedelta(hours=24),
|
|
}
|
|
return jwt.encode(payload, settings.JWT_SECRET, algorithm="HS256")
|
|
|
|
|
|
def decode_token(token: str) -> TokenPayload:
|
|
"""Decode and validate a JWT token."""
|
|
try:
|
|
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
|
|
return TokenPayload(**payload)
|
|
except JWTError:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
) -> TokenPayload:
|
|
"""Dependency to get current authenticated user."""
|
|
token = credentials.credentials
|
|
try:
|
|
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
|
|
return TokenPayload(**payload)
|
|
except jwt.ExpiredSignatureError:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail={"error": "token_expired", "message": "Token has expired"},
|
|
)
|
|
except JWTError:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail={"error": "invalid_token", "message": "Invalid token"},
|
|
)
|
|
|
|
|
|
def is_admin(user: TokenPayload) -> bool:
|
|
"""Check if user has admin role."""
|
|
return user.role == "admin"
|
|
|
|
|
|
@router.post("/login", response_model=LoginResponse)
|
|
async def login(request: LoginRequest):
|
|
"""
|
|
Proxy login to company Auth API.
|
|
Adds admin role for ymirliu@panjit.com.tw.
|
|
"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
settings.AUTH_API_URL,
|
|
json={"username": request.email, "password": request.password},
|
|
timeout=30.0,
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
|
|
if response.status_code != 200:
|
|
raise HTTPException(
|
|
status_code=response.status_code,
|
|
detail="Authentication service error",
|
|
)
|
|
|
|
# Parse response from external Auth API
|
|
auth_data = response.json()
|
|
|
|
# Check if authentication was successful
|
|
if not auth_data.get("success"):
|
|
error_msg = auth_data.get("error", "Authentication failed")
|
|
raise HTTPException(status_code=401, detail=error_msg)
|
|
|
|
# Determine role
|
|
role = "admin" if request.email == settings.ADMIN_EMAIL else "user"
|
|
|
|
# Create our own token with role info
|
|
token = create_token(request.email, role)
|
|
|
|
return LoginResponse(token=token, email=request.email, role=role)
|
|
|
|
except httpx.TimeoutException:
|
|
raise HTTPException(status_code=504, detail="Authentication service timeout")
|
|
except httpx.RequestError:
|
|
raise HTTPException(
|
|
status_code=503, detail="Authentication service unavailable"
|
|
)
|
|
|
|
|
|
@router.get("/me")
|
|
async def get_me(current_user: TokenPayload = Depends(get_current_user)):
|
|
"""Get current user information."""
|
|
return {"email": current_user.email, "role": current_user.role}
|