feat: Meeting Assistant MVP - Complete implementation

Enterprise Meeting Knowledge Management System with:

Backend (FastAPI):
- Authentication proxy with JWT (pj-auth-api integration)
- MySQL database with 4 tables (users, meetings, conclusions, actions)
- Meeting CRUD with system code generation (C-YYYYMMDD-XX, A-YYYYMMDD-XX)
- Dify LLM integration for AI summarization
- Excel export with openpyxl
- 20 unit tests (all passing)

Client (Electron):
- Login page with company auth
- Meeting list with create/delete
- Meeting detail with real-time transcription
- Editable transcript textarea (single block, easy editing)
- AI summarization with conclusions/action items
- 5-second segment recording (efficient for long meetings)

Sidecar (Python):
- faster-whisper medium model with int8 quantization
- ONNX Runtime VAD (lightweight, ~20MB vs PyTorch ~2GB)
- Chinese punctuation processing
- OpenCC for Traditional Chinese conversion
- Anti-hallucination parameters
- Auto-cleanup of temp audio files

OpenSpec:
- add-meeting-assistant-mvp (47 tasks, archived)
- add-realtime-transcription (29 tasks, archived)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
egg
2025-12-10 20:17:44 +08:00
commit 8b6184ecc5
65 changed files with 10510 additions and 0 deletions

15
backend/.env.example Normal file
View File

@@ -0,0 +1,15 @@
# Database Configuration
DB_HOST=mysql.theaken.com
DB_PORT=33306
DB_USER=A060
DB_PASS=your_password_here
DB_NAME=db_A060
# External APIs
AUTH_API_URL=https://pj-auth-api.vercel.app/api/auth/login
DIFY_API_URL=https://dify.theaken.com/v1
DIFY_API_KEY=app-xxxxxxxxxxx
# Application Settings
ADMIN_EMAIL=ymirliu@panjit.com.tw
JWT_SECRET=your_jwt_secret_here

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Meeting Assistant Backend

24
backend/app/config.py Normal file
View File

@@ -0,0 +1,24 @@
import os
from dotenv import load_dotenv
load_dotenv()
class Settings:
DB_HOST: str = os.getenv("DB_HOST", "mysql.theaken.com")
DB_PORT: int = int(os.getenv("DB_PORT", "33306"))
DB_USER: str = os.getenv("DB_USER", "A060")
DB_PASS: str = os.getenv("DB_PASS", "")
DB_NAME: str = os.getenv("DB_NAME", "db_A060")
AUTH_API_URL: str = os.getenv(
"AUTH_API_URL", "https://pj-auth-api.vercel.app/api/auth/login"
)
DIFY_API_URL: str = os.getenv("DIFY_API_URL", "https://dify.theaken.com/v1")
DIFY_API_KEY: str = os.getenv("DIFY_API_KEY", "")
ADMIN_EMAIL: str = os.getenv("ADMIN_EMAIL", "ymirliu@panjit.com.tw")
JWT_SECRET: str = os.getenv("JWT_SECRET", "meeting-assistant-secret")
settings = Settings()

96
backend/app/database.py Normal file
View File

@@ -0,0 +1,96 @@
import mysql.connector
from mysql.connector import pooling
from contextlib import contextmanager
from .config import settings
connection_pool = None
def init_db_pool():
global connection_pool
connection_pool = pooling.MySQLConnectionPool(
pool_name="meeting_pool",
pool_size=5,
host=settings.DB_HOST,
port=settings.DB_PORT,
user=settings.DB_USER,
password=settings.DB_PASS,
database=settings.DB_NAME,
)
return connection_pool
@contextmanager
def get_db_connection():
conn = connection_pool.get_connection()
try:
yield conn
finally:
conn.close()
@contextmanager
def get_db_cursor(commit=False):
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
try:
yield cursor
if commit:
conn.commit()
finally:
cursor.close()
def init_tables():
"""Create all required tables if they don't exist."""
create_statements = [
"""
CREATE TABLE IF NOT EXISTS meeting_users (
user_id INT PRIMARY KEY AUTO_INCREMENT,
email VARCHAR(100) UNIQUE NOT NULL,
display_name VARCHAR(50),
role ENUM('admin', 'user') DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS meeting_records (
meeting_id INT PRIMARY KEY AUTO_INCREMENT,
uuid VARCHAR(64) UNIQUE,
subject VARCHAR(200) NOT NULL,
meeting_time DATETIME NOT NULL,
location VARCHAR(100),
chairperson VARCHAR(50),
recorder VARCHAR(50),
attendees TEXT,
transcript_blob LONGTEXT,
created_by VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS meeting_conclusions (
conclusion_id INT PRIMARY KEY AUTO_INCREMENT,
meeting_id INT,
content TEXT,
system_code VARCHAR(20),
FOREIGN KEY (meeting_id) REFERENCES meeting_records(meeting_id) ON DELETE CASCADE
)
""",
"""
CREATE TABLE IF NOT EXISTS meeting_action_items (
action_id INT PRIMARY KEY AUTO_INCREMENT,
meeting_id INT,
content TEXT,
owner VARCHAR(50),
due_date DATE,
status ENUM('Open', 'In Progress', 'Done', 'Delayed') DEFAULT 'Open',
system_code VARCHAR(20),
FOREIGN KEY (meeting_id) REFERENCES meeting_records(meeting_id) ON DELETE CASCADE
)
""",
]
with get_db_cursor(commit=True) as cursor:
for statement in create_statements:
cursor.execute(statement)

44
backend/app/main.py Normal file
View File

@@ -0,0 +1,44 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from .database import init_db_pool, init_tables
from .routers import auth, meetings, ai, export
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
init_db_pool()
init_tables()
yield
# Shutdown (cleanup if needed)
app = FastAPI(
title="Meeting Assistant API",
description="Enterprise meeting knowledge management API",
version="1.0.0",
lifespan=lifespan,
)
# CORS configuration for Electron client
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth.router, prefix="/api", tags=["Authentication"])
app.include_router(meetings.router, prefix="/api", tags=["Meetings"])
app.include_router(ai.router, prefix="/api", tags=["AI"])
app.include_router(export.router, prefix="/api", tags=["Export"])
@app.get("/api/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "meeting-assistant"}

View File

@@ -0,0 +1,37 @@
from .schemas import (
LoginRequest,
LoginResponse,
TokenPayload,
MeetingCreate,
MeetingUpdate,
MeetingResponse,
MeetingListResponse,
ConclusionCreate,
ConclusionResponse,
ActionItemCreate,
ActionItemUpdate,
ActionItemResponse,
SummarizeRequest,
SummarizeResponse,
ActionItemStatus,
UserRole,
)
__all__ = [
"LoginRequest",
"LoginResponse",
"TokenPayload",
"MeetingCreate",
"MeetingUpdate",
"MeetingResponse",
"MeetingListResponse",
"ConclusionCreate",
"ConclusionResponse",
"ActionItemCreate",
"ActionItemUpdate",
"ActionItemResponse",
"SummarizeRequest",
"SummarizeResponse",
"ActionItemStatus",
"UserRole",
]

View File

@@ -0,0 +1,128 @@
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, date
from enum import Enum
class ActionItemStatus(str, Enum):
OPEN = "Open"
IN_PROGRESS = "In Progress"
DONE = "Done"
DELAYED = "Delayed"
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
# Auth schemas
class LoginRequest(BaseModel):
email: str
password: str
class LoginResponse(BaseModel):
token: str
email: str
role: str
class TokenPayload(BaseModel):
email: str
role: str
exp: Optional[int] = None
# Meeting schemas
class ConclusionCreate(BaseModel):
content: str
class ConclusionResponse(BaseModel):
conclusion_id: int
meeting_id: int
content: str
system_code: Optional[str] = None
class ActionItemCreate(BaseModel):
content: str
owner: Optional[str] = ""
due_date: Optional[date] = None
class ActionItemUpdate(BaseModel):
content: Optional[str] = None
owner: Optional[str] = None
due_date: Optional[date] = None
status: Optional[ActionItemStatus] = None
class ActionItemResponse(BaseModel):
action_id: int
meeting_id: int
content: str
owner: Optional[str] = None
due_date: Optional[date] = None
status: ActionItemStatus
system_code: Optional[str] = None
class MeetingCreate(BaseModel):
subject: str
meeting_time: datetime
location: Optional[str] = ""
chairperson: Optional[str] = ""
recorder: Optional[str] = ""
attendees: Optional[str] = ""
transcript_blob: Optional[str] = ""
conclusions: Optional[List[ConclusionCreate]] = []
actions: Optional[List[ActionItemCreate]] = []
class MeetingUpdate(BaseModel):
subject: Optional[str] = None
meeting_time: Optional[datetime] = None
location: Optional[str] = None
chairperson: Optional[str] = None
recorder: Optional[str] = None
attendees: Optional[str] = None
transcript_blob: Optional[str] = None
conclusions: Optional[List[ConclusionCreate]] = None
actions: Optional[List[ActionItemCreate]] = None
class MeetingResponse(BaseModel):
meeting_id: int
uuid: str
subject: str
meeting_time: datetime
location: Optional[str] = None
chairperson: Optional[str] = None
recorder: Optional[str] = None
attendees: Optional[str] = None
transcript_blob: Optional[str] = None
created_by: Optional[str] = None
created_at: datetime
conclusions: List[ConclusionResponse] = []
actions: List[ActionItemResponse] = []
class MeetingListResponse(BaseModel):
meeting_id: int
uuid: str
subject: str
meeting_time: datetime
chairperson: Optional[str] = None
created_at: datetime
# AI schemas
class SummarizeRequest(BaseModel):
transcript: str
class SummarizeResponse(BaseModel):
conclusions: List[str]
action_items: List[ActionItemCreate]

View File

@@ -0,0 +1 @@
# Router modules

102
backend/app/routers/ai.py Normal file
View File

@@ -0,0 +1,102 @@
from fastapi import APIRouter, HTTPException, Depends
import httpx
import json
from ..config import settings
from ..models import SummarizeRequest, SummarizeResponse, ActionItemCreate, TokenPayload
from .auth import get_current_user
router = APIRouter()
@router.post("/ai/summarize", response_model=SummarizeResponse)
async def summarize_transcript(
request: SummarizeRequest, current_user: TokenPayload = Depends(get_current_user)
):
"""
Send transcript to Dify for AI summarization.
Returns structured conclusions and action items.
"""
if not settings.DIFY_API_KEY:
raise HTTPException(status_code=503, detail="Dify API not configured")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.DIFY_API_URL}/chat-messages",
headers={
"Authorization": f"Bearer {settings.DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {},
"query": request.transcript,
"response_mode": "blocking",
"user": current_user.email,
},
timeout=120.0, # Long timeout for LLM processing
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Dify API error: {response.text}",
)
data = response.json()
answer = data.get("answer", "")
# Try to parse structured JSON from Dify response
parsed = parse_dify_response(answer)
return SummarizeResponse(
conclusions=parsed["conclusions"],
action_items=[
ActionItemCreate(
content=item.get("content", ""),
owner=item.get("owner", ""),
due_date=item.get("due_date"),
)
for item in parsed["action_items"]
],
)
except httpx.TimeoutException:
raise HTTPException(
status_code=504, detail="Dify API timeout - transcript may be too long"
)
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Dify API unavailable: {str(e)}")
def parse_dify_response(answer: str) -> dict:
"""
Parse Dify response to extract conclusions and action items.
Attempts JSON parsing first, then falls back to text parsing.
"""
# Try to find JSON in the response
try:
# Look for JSON block
if "```json" in answer:
json_start = answer.index("```json") + 7
json_end = answer.index("```", json_start)
json_str = answer[json_start:json_end].strip()
elif "{" in answer and "}" in answer:
# Try to find JSON object
json_start = answer.index("{")
json_end = answer.rindex("}") + 1
json_str = answer[json_start:json_end]
else:
raise ValueError("No JSON found")
data = json.loads(json_str)
return {
"conclusions": data.get("conclusions", []),
"action_items": data.get("action_items", []),
}
except (ValueError, json.JSONDecodeError):
# Fallback: return raw answer as single conclusion
return {
"conclusions": [answer] if answer else [],
"action_items": [],
}

109
backend/app/routers/auth.py Normal file
View File

@@ -0,0 +1,109 @@
from fastapi import APIRouter, HTTPException, Depends, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import httpx
from jose import jwt, JWTError
from datetime import datetime, timedelta
from typing import Optional
from ..config import settings
from ..models import LoginRequest, LoginResponse, TokenPayload
router = APIRouter()
security = HTTPBearer()
def create_token(email: str, role: str) -> str:
"""Create a JWT token with email and role."""
payload = {
"email": email,
"role": role,
"exp": datetime.utcnow() + timedelta(hours=24),
}
return jwt.encode(payload, settings.JWT_SECRET, algorithm="HS256")
def decode_token(token: str) -> TokenPayload:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
return TokenPayload(**payload)
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> TokenPayload:
"""Dependency to get current authenticated user."""
token = credentials.credentials
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
return TokenPayload(**payload)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail={"error": "token_expired", "message": "Token has expired"},
)
except JWTError:
raise HTTPException(
status_code=401,
detail={"error": "invalid_token", "message": "Invalid token"},
)
def is_admin(user: TokenPayload) -> bool:
"""Check if user has admin role."""
return user.role == "admin"
@router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest):
"""
Proxy login to company Auth API.
Adds admin role for ymirliu@panjit.com.tw.
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
settings.AUTH_API_URL,
json={"username": request.email, "password": request.password},
timeout=30.0,
)
if response.status_code == 401:
raise HTTPException(status_code=401, detail="Invalid credentials")
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail="Authentication service error",
)
# Parse response from external Auth API
auth_data = response.json()
# Check if authentication was successful
if not auth_data.get("success"):
error_msg = auth_data.get("error", "Authentication failed")
raise HTTPException(status_code=401, detail=error_msg)
# Determine role
role = "admin" if request.email == settings.ADMIN_EMAIL else "user"
# Create our own token with role info
token = create_token(request.email, role)
return LoginResponse(token=token, email=request.email, role=role)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Authentication service timeout")
except httpx.RequestError:
raise HTTPException(
status_code=503, detail="Authentication service unavailable"
)
@router.get("/me")
async def get_me(current_user: TokenPayload = Depends(get_current_user)):
"""Get current user information."""
return {"email": current_user.email, "role": current_user.role}

View File

@@ -0,0 +1,177 @@
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, Alignment, Border, Side
import io
import os
from ..database import get_db_cursor
from ..models import TokenPayload
from .auth import get_current_user, is_admin
router = APIRouter()
TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "..", "templates")
def create_default_workbook(meeting: dict, conclusions: list, actions: list) -> Workbook:
"""Create Excel workbook with meeting data."""
wb = Workbook()
ws = wb.active
ws.title = "Meeting Record"
# Styles
header_font = Font(bold=True, size=14)
label_font = Font(bold=True)
thin_border = Border(
left=Side(style="thin"),
right=Side(style="thin"),
top=Side(style="thin"),
bottom=Side(style="thin"),
)
# Title
ws.merge_cells("A1:F1")
ws["A1"] = "Meeting Record"
ws["A1"].font = Font(bold=True, size=16)
ws["A1"].alignment = Alignment(horizontal="center")
# Metadata section
row = 3
metadata = [
("Subject", meeting.get("subject", "")),
("Date/Time", str(meeting.get("meeting_time", ""))),
("Location", meeting.get("location", "")),
("Chairperson", meeting.get("chairperson", "")),
("Recorder", meeting.get("recorder", "")),
("Attendees", meeting.get("attendees", "")),
]
for label, value in metadata:
ws[f"A{row}"] = label
ws[f"A{row}"].font = label_font
ws.merge_cells(f"B{row}:F{row}")
ws[f"B{row}"] = value
row += 1
# Conclusions section
row += 1
ws.merge_cells(f"A{row}:F{row}")
ws[f"A{row}"] = "Conclusions"
ws[f"A{row}"].font = header_font
row += 1
ws[f"A{row}"] = "Code"
ws[f"B{row}"] = "Content"
ws[f"A{row}"].font = label_font
ws[f"B{row}"].font = label_font
row += 1
for c in conclusions:
ws[f"A{row}"] = c.get("system_code", "")
ws.merge_cells(f"B{row}:F{row}")
ws[f"B{row}"] = c.get("content", "")
row += 1
# Action Items section
row += 1
ws.merge_cells(f"A{row}:F{row}")
ws[f"A{row}"] = "Action Items"
ws[f"A{row}"].font = header_font
row += 1
headers = ["Code", "Content", "Owner", "Due Date", "Status"]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=row, column=col, value=header)
cell.font = label_font
cell.border = thin_border
row += 1
for a in actions:
ws.cell(row=row, column=1, value=a.get("system_code", "")).border = thin_border
ws.cell(row=row, column=2, value=a.get("content", "")).border = thin_border
ws.cell(row=row, column=3, value=a.get("owner", "")).border = thin_border
ws.cell(row=row, column=4, value=str(a.get("due_date", "") or "")).border = thin_border
ws.cell(row=row, column=5, value=a.get("status", "")).border = thin_border
row += 1
# Adjust column widths
ws.column_dimensions["A"].width = 18
ws.column_dimensions["B"].width = 40
ws.column_dimensions["C"].width = 15
ws.column_dimensions["D"].width = 12
ws.column_dimensions["E"].width = 12
ws.column_dimensions["F"].width = 12
return wb
@router.get("/meetings/{meeting_id}/export")
async def export_meeting(
meeting_id: int, current_user: TokenPayload = Depends(get_current_user)
):
"""Export meeting to Excel file."""
with get_db_cursor() as cursor:
cursor.execute(
"SELECT * FROM meeting_records WHERE meeting_id = %s", (meeting_id,)
)
meeting = cursor.fetchone()
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Check access
if not is_admin(current_user):
if (
meeting["created_by"] != current_user.email
and meeting["recorder"] != current_user.email
and current_user.email not in (meeting["attendees"] or "")
):
raise HTTPException(status_code=403, detail="Access denied")
# Get conclusions
cursor.execute(
"SELECT * FROM meeting_conclusions WHERE meeting_id = %s", (meeting_id,)
)
conclusions = cursor.fetchall()
# Get action items
cursor.execute(
"SELECT * FROM meeting_action_items WHERE meeting_id = %s", (meeting_id,)
)
actions = cursor.fetchall()
# Check for custom template
template_path = os.path.join(TEMPLATE_DIR, "template.xlsx")
if os.path.exists(template_path):
wb = load_workbook(template_path)
ws = wb.active
# Replace placeholders
for row in ws.iter_rows():
for cell in row:
if cell.value and isinstance(cell.value, str):
cell.value = (
cell.value.replace("{{subject}}", meeting.get("subject", ""))
.replace("{{time}}", str(meeting.get("meeting_time", "")))
.replace("{{location}}", meeting.get("location", ""))
.replace("{{chair}}", meeting.get("chairperson", ""))
.replace("{{recorder}}", meeting.get("recorder", ""))
.replace("{{attendees}}", meeting.get("attendees", ""))
)
else:
# Use default template
wb = create_default_workbook(meeting, conclusions, actions)
# Save to bytes buffer
buffer = io.BytesIO()
wb.save(buffer)
buffer.seek(0)
filename = f"meeting_{meeting.get('uuid', meeting_id)}.xlsx"
return StreamingResponse(
buffer,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)

View File

@@ -0,0 +1,372 @@
from fastapi import APIRouter, HTTPException, Depends
from typing import List
import uuid
from datetime import date
from ..database import get_db_cursor
from ..models import (
MeetingCreate,
MeetingUpdate,
MeetingResponse,
MeetingListResponse,
ConclusionResponse,
ActionItemResponse,
ActionItemUpdate,
TokenPayload,
)
from .auth import get_current_user, is_admin
router = APIRouter()
def generate_system_code(prefix: str, meeting_date: date, sequence: int) -> str:
"""Generate system code like C-20251210-01 or A-20251210-01."""
date_str = meeting_date.strftime("%Y%m%d")
return f"{prefix}-{date_str}-{sequence:02d}"
def get_next_sequence(cursor, prefix: str, date_str: str) -> int:
"""Get next sequence number for a given prefix and date."""
pattern = f"{prefix}-{date_str}-%"
cursor.execute(
"""
SELECT system_code FROM meeting_conclusions WHERE system_code LIKE %s
UNION
SELECT system_code FROM meeting_action_items WHERE system_code LIKE %s
ORDER BY system_code DESC LIMIT 1
""",
(pattern, pattern),
)
result = cursor.fetchone()
if result:
last_code = result["system_code"]
last_seq = int(last_code.split("-")[-1])
return last_seq + 1
return 1
@router.post("/meetings", response_model=MeetingResponse)
async def create_meeting(
meeting: MeetingCreate, current_user: TokenPayload = Depends(get_current_user)
):
"""Create a new meeting with optional conclusions and action items."""
meeting_uuid = str(uuid.uuid4())
recorder = meeting.recorder or current_user.email
meeting_date = meeting.meeting_time.date()
date_str = meeting_date.strftime("%Y%m%d")
with get_db_cursor(commit=True) as cursor:
# Insert meeting record
cursor.execute(
"""
INSERT INTO meeting_records
(uuid, subject, meeting_time, location, chairperson, recorder, attendees, transcript_blob, created_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
meeting_uuid,
meeting.subject,
meeting.meeting_time,
meeting.location,
meeting.chairperson,
recorder,
meeting.attendees,
meeting.transcript_blob,
current_user.email,
),
)
meeting_id = cursor.lastrowid
# Insert conclusions
conclusions = []
seq = get_next_sequence(cursor, "C", date_str)
for conclusion in meeting.conclusions or []:
system_code = generate_system_code("C", meeting_date, seq)
cursor.execute(
"""
INSERT INTO meeting_conclusions (meeting_id, content, system_code)
VALUES (%s, %s, %s)
""",
(meeting_id, conclusion.content, system_code),
)
conclusions.append(
ConclusionResponse(
conclusion_id=cursor.lastrowid,
meeting_id=meeting_id,
content=conclusion.content,
system_code=system_code,
)
)
seq += 1
# Insert action items
actions = []
seq = get_next_sequence(cursor, "A", date_str)
for action in meeting.actions or []:
system_code = generate_system_code("A", meeting_date, seq)
cursor.execute(
"""
INSERT INTO meeting_action_items (meeting_id, content, owner, due_date, system_code)
VALUES (%s, %s, %s, %s, %s)
""",
(meeting_id, action.content, action.owner, action.due_date, system_code),
)
actions.append(
ActionItemResponse(
action_id=cursor.lastrowid,
meeting_id=meeting_id,
content=action.content,
owner=action.owner,
due_date=action.due_date,
status="Open",
system_code=system_code,
)
)
seq += 1
# Fetch created meeting
cursor.execute(
"SELECT * FROM meeting_records WHERE meeting_id = %s", (meeting_id,)
)
record = cursor.fetchone()
return MeetingResponse(
meeting_id=record["meeting_id"],
uuid=record["uuid"],
subject=record["subject"],
meeting_time=record["meeting_time"],
location=record["location"],
chairperson=record["chairperson"],
recorder=record["recorder"],
attendees=record["attendees"],
transcript_blob=record["transcript_blob"],
created_by=record["created_by"],
created_at=record["created_at"],
conclusions=conclusions,
actions=actions,
)
@router.get("/meetings", response_model=List[MeetingListResponse])
async def list_meetings(current_user: TokenPayload = Depends(get_current_user)):
"""List meetings. Admin sees all, users see only their own."""
with get_db_cursor() as cursor:
if is_admin(current_user):
cursor.execute(
"""
SELECT meeting_id, uuid, subject, meeting_time, chairperson, created_at
FROM meeting_records ORDER BY meeting_time DESC
"""
)
else:
cursor.execute(
"""
SELECT meeting_id, uuid, subject, meeting_time, chairperson, created_at
FROM meeting_records
WHERE created_by = %s OR recorder = %s OR attendees LIKE %s
ORDER BY meeting_time DESC
""",
(
current_user.email,
current_user.email,
f"%{current_user.email}%",
),
)
records = cursor.fetchall()
return [MeetingListResponse(**record) for record in records]
@router.get("/meetings/{meeting_id}", response_model=MeetingResponse)
async def get_meeting(
meeting_id: int, current_user: TokenPayload = Depends(get_current_user)
):
"""Get meeting details with conclusions and action items."""
with get_db_cursor() as cursor:
cursor.execute(
"SELECT * FROM meeting_records WHERE meeting_id = %s", (meeting_id,)
)
record = cursor.fetchone()
if not record:
raise HTTPException(status_code=404, detail="Meeting not found")
# Check access
if not is_admin(current_user):
if (
record["created_by"] != current_user.email
and record["recorder"] != current_user.email
and current_user.email not in (record["attendees"] or "")
):
raise HTTPException(status_code=403, detail="Access denied")
# Get conclusions
cursor.execute(
"SELECT * FROM meeting_conclusions WHERE meeting_id = %s", (meeting_id,)
)
conclusions = [ConclusionResponse(**c) for c in cursor.fetchall()]
# Get action items
cursor.execute(
"SELECT * FROM meeting_action_items WHERE meeting_id = %s", (meeting_id,)
)
actions = [ActionItemResponse(**a) for a in cursor.fetchall()]
return MeetingResponse(
meeting_id=record["meeting_id"],
uuid=record["uuid"],
subject=record["subject"],
meeting_time=record["meeting_time"],
location=record["location"],
chairperson=record["chairperson"],
recorder=record["recorder"],
attendees=record["attendees"],
transcript_blob=record["transcript_blob"],
created_by=record["created_by"],
created_at=record["created_at"],
conclusions=conclusions,
actions=actions,
)
@router.put("/meetings/{meeting_id}", response_model=MeetingResponse)
async def update_meeting(
meeting_id: int,
meeting: MeetingUpdate,
current_user: TokenPayload = Depends(get_current_user),
):
"""Update meeting details."""
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"SELECT * FROM meeting_records WHERE meeting_id = %s", (meeting_id,)
)
record = cursor.fetchone()
if not record:
raise HTTPException(status_code=404, detail="Meeting not found")
# Check access
if not is_admin(current_user) and record["created_by"] != current_user.email:
raise HTTPException(status_code=403, detail="Access denied")
# Build update query dynamically
updates = []
values = []
for field in ["subject", "meeting_time", "location", "chairperson", "recorder", "attendees", "transcript_blob"]:
value = getattr(meeting, field)
if value is not None:
updates.append(f"{field} = %s")
values.append(value)
if updates:
values.append(meeting_id)
cursor.execute(
f"UPDATE meeting_records SET {', '.join(updates)} WHERE meeting_id = %s",
values,
)
# Update conclusions if provided
if meeting.conclusions is not None:
cursor.execute(
"DELETE FROM meeting_conclusions WHERE meeting_id = %s", (meeting_id,)
)
meeting_date = (meeting.meeting_time or record["meeting_time"]).date() if hasattr(meeting.meeting_time or record["meeting_time"], 'date') else date.today()
date_str = meeting_date.strftime("%Y%m%d")
seq = get_next_sequence(cursor, "C", date_str)
for conclusion in meeting.conclusions:
system_code = generate_system_code("C", meeting_date, seq)
cursor.execute(
"""
INSERT INTO meeting_conclusions (meeting_id, content, system_code)
VALUES (%s, %s, %s)
""",
(meeting_id, conclusion.content, system_code),
)
seq += 1
# Update action items if provided
if meeting.actions is not None:
cursor.execute(
"DELETE FROM meeting_action_items WHERE meeting_id = %s", (meeting_id,)
)
meeting_date = (meeting.meeting_time or record["meeting_time"]).date() if hasattr(meeting.meeting_time or record["meeting_time"], 'date') else date.today()
date_str = meeting_date.strftime("%Y%m%d")
seq = get_next_sequence(cursor, "A", date_str)
for action in meeting.actions:
system_code = generate_system_code("A", meeting_date, seq)
cursor.execute(
"""
INSERT INTO meeting_action_items (meeting_id, content, owner, due_date, system_code)
VALUES (%s, %s, %s, %s, %s)
""",
(meeting_id, action.content, action.owner, action.due_date, system_code),
)
seq += 1
# Return updated meeting
return await get_meeting(meeting_id, current_user)
@router.delete("/meetings/{meeting_id}")
async def delete_meeting(
meeting_id: int, current_user: TokenPayload = Depends(get_current_user)
):
"""Delete meeting and all related data (cascade)."""
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"SELECT * FROM meeting_records WHERE meeting_id = %s", (meeting_id,)
)
record = cursor.fetchone()
if not record:
raise HTTPException(status_code=404, detail="Meeting not found")
# Check access - admin or creator can delete
if not is_admin(current_user) and record["created_by"] != current_user.email:
raise HTTPException(status_code=403, detail="Access denied")
# Delete (cascade will handle conclusions and action items)
cursor.execute("DELETE FROM meeting_records WHERE meeting_id = %s", (meeting_id,))
return {"message": "Meeting deleted successfully"}
@router.put("/meetings/{meeting_id}/actions/{action_id}")
async def update_action_item(
meeting_id: int,
action_id: int,
action: ActionItemUpdate,
current_user: TokenPayload = Depends(get_current_user),
):
"""Update a specific action item's status, owner, or due date."""
with get_db_cursor(commit=True) as cursor:
cursor.execute(
"SELECT * FROM meeting_action_items WHERE action_id = %s AND meeting_id = %s",
(action_id, meeting_id),
)
record = cursor.fetchone()
if not record:
raise HTTPException(status_code=404, detail="Action item not found")
updates = []
values = []
for field in ["content", "owner", "due_date", "status"]:
value = getattr(action, field)
if value is not None:
updates.append(f"{field} = %s")
values.append(value.value if hasattr(value, "value") else value)
if updates:
values.append(action_id)
cursor.execute(
f"UPDATE meeting_action_items SET {', '.join(updates)} WHERE action_id = %s",
values,
)
cursor.execute(
"SELECT * FROM meeting_action_items WHERE action_id = %s", (action_id,)
)
updated = cursor.fetchone()
return ActionItemResponse(**updated)

3
backend/pytest.ini Normal file
View File

@@ -0,0 +1,3 @@
[pytest]
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function

10
backend/requirements.txt Normal file
View File

@@ -0,0 +1,10 @@
fastapi>=0.115.0
uvicorn[standard]>=0.32.0
python-dotenv>=1.0.0
mysql-connector-python>=9.0.0
pydantic>=2.10.0
httpx>=0.27.0
python-jose[cryptography]>=3.3.0
openpyxl>=3.1.2
pytest>=8.0.0
pytest-asyncio>=0.24.0

View File

@@ -0,0 +1 @@
# Tests package

48
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,48 @@
"""
Pytest configuration and fixtures.
"""
import pytest
import sys
import os
# Add the backend directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
@pytest.fixture(autouse=True)
def mock_env(monkeypatch):
"""Set up mock environment variables for all tests."""
monkeypatch.setenv("DB_HOST", "localhost")
monkeypatch.setenv("DB_PORT", "3306")
monkeypatch.setenv("DB_USER", "test")
monkeypatch.setenv("DB_PASS", "test")
monkeypatch.setenv("DB_NAME", "test_db")
monkeypatch.setenv("AUTH_API_URL", "https://auth.test.com/login")
monkeypatch.setenv("DIFY_API_URL", "https://dify.test.com/v1")
monkeypatch.setenv("DIFY_API_KEY", "test-api-key")
monkeypatch.setenv("ADMIN_EMAIL", "admin@test.com")
monkeypatch.setenv("JWT_SECRET", "test-jwt-secret")
@pytest.fixture
def sample_meeting():
"""Sample meeting data for tests."""
return {
"subject": "Test Meeting",
"meeting_time": "2025-01-15T10:00:00",
"location": "Conference Room A",
"chairperson": "John Doe",
"recorder": "Jane Smith",
"attendees": "alice@test.com, bob@test.com",
}
@pytest.fixture
def sample_transcript():
"""Sample transcript for AI tests."""
return """
今天的會議主要討論了Q1預算和新員工招聘計劃。
決定將行銷預算增加10%
小明負責在下週五前提交最終報告。
"""

191
backend/tests/test_ai.py Normal file
View File

@@ -0,0 +1,191 @@
"""
Unit tests for AI summarization with mock Dify responses.
"""
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
import json
pytestmark = pytest.mark.asyncio
class TestDifyResponseParsing:
"""Tests for parsing Dify LLM responses."""
def test_parse_json_response(self):
"""Test parsing valid JSON response from Dify."""
from app.routers.ai import parse_dify_response
response = '''Here is the summary:
```json
{
"conclusions": ["Agreed on Q1 budget", "New hire approved"],
"action_items": [
{"content": "Submit budget report", "owner": "John", "due_date": "2025-01-15"},
{"content": "Post job listing", "owner": "", "due_date": null}
]
}
```
'''
result = parse_dify_response(response)
assert len(result["conclusions"]) == 2
assert "Q1 budget" in result["conclusions"][0]
assert len(result["action_items"]) == 2
assert result["action_items"][0]["owner"] == "John"
def test_parse_inline_json_response(self):
"""Test parsing inline JSON without code blocks."""
from app.routers.ai import parse_dify_response
response = '{"conclusions": ["Budget approved"], "action_items": []}'
result = parse_dify_response(response)
assert len(result["conclusions"]) == 1
assert result["conclusions"][0] == "Budget approved"
def test_parse_non_json_response(self):
"""Test fallback when response is not JSON."""
from app.routers.ai import parse_dify_response
response = "The meeting discussed Q1 budget and hiring plans."
result = parse_dify_response(response)
# Should return the raw response as a single conclusion
assert len(result["conclusions"]) == 1
assert "Q1 budget" in result["conclusions"][0]
assert len(result["action_items"]) == 0
def test_parse_empty_response(self):
"""Test handling empty response."""
from app.routers.ai import parse_dify_response
result = parse_dify_response("")
assert result["conclusions"] == []
assert result["action_items"] == []
class TestSummarizeEndpoint:
"""Tests for the AI summarization endpoint."""
@patch("app.routers.ai.httpx.AsyncClient")
@patch("app.routers.ai.settings")
async def test_summarize_success(self, mock_settings, mock_client_class):
"""Test successful summarization."""
mock_settings.DIFY_API_URL = "https://dify.test.com/v1"
mock_settings.DIFY_API_KEY = "test-key"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"answer": json.dumps({
"conclusions": ["Decision made"],
"action_items": [{"content": "Follow up", "owner": "Alice", "due_date": "2025-01-20"}]
})
}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = None
mock_client_class.return_value = mock_client
from app.routers.ai import summarize_transcript
from app.models import SummarizeRequest, TokenPayload
mock_user = TokenPayload(email="test@test.com", role="user")
result = await summarize_transcript(
SummarizeRequest(transcript="Test meeting transcript"),
current_user=mock_user
)
assert len(result.conclusions) == 1
assert len(result.action_items) == 1
assert result.action_items[0].owner == "Alice"
@patch("app.routers.ai.httpx.AsyncClient")
@patch("app.routers.ai.settings")
async def test_summarize_handles_timeout(self, mock_settings, mock_client_class):
"""Test handling Dify timeout."""
import httpx
from fastapi import HTTPException
mock_settings.DIFY_API_URL = "https://dify.test.com/v1"
mock_settings.DIFY_API_KEY = "test-key"
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.TimeoutException("Timeout")
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = None
mock_client_class.return_value = mock_client
from app.routers.ai import summarize_transcript
from app.models import SummarizeRequest, TokenPayload
mock_user = TokenPayload(email="test@test.com", role="user")
with pytest.raises(HTTPException) as exc_info:
await summarize_transcript(
SummarizeRequest(transcript="Test"),
current_user=mock_user
)
assert exc_info.value.status_code == 504
@patch("app.routers.ai.settings")
async def test_summarize_no_api_key(self, mock_settings):
"""Test error when Dify API key is not configured."""
from fastapi import HTTPException
mock_settings.DIFY_API_KEY = ""
from app.routers.ai import summarize_transcript
from app.models import SummarizeRequest, TokenPayload
mock_user = TokenPayload(email="test@test.com", role="user")
with pytest.raises(HTTPException) as exc_info:
await summarize_transcript(
SummarizeRequest(transcript="Test"),
current_user=mock_user
)
assert exc_info.value.status_code == 503
class TestPartialDataHandling:
"""Tests for handling partial data from AI."""
def test_action_item_with_empty_owner(self):
"""Test action items with empty owner are handled."""
from app.routers.ai import parse_dify_response
response = json.dumps({
"conclusions": [],
"action_items": [
{"content": "Task 1", "owner": "", "due_date": None},
{"content": "Task 2", "owner": "Bob", "due_date": "2025-02-01"}
]
})
result = parse_dify_response(response)
assert result["action_items"][0]["owner"] == ""
assert result["action_items"][1]["owner"] == "Bob"
def test_action_item_with_missing_fields(self):
"""Test action items with missing fields."""
from app.routers.ai import parse_dify_response
response = json.dumps({
"conclusions": ["Done"],
"action_items": [
{"content": "Task only"}
]
})
result = parse_dify_response(response)
# Should have content but other fields may be missing
assert result["action_items"][0]["content"] == "Task only"

138
backend/tests/test_auth.py Normal file
View File

@@ -0,0 +1,138 @@
"""
Unit tests for authentication functionality.
"""
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from fastapi.testclient import TestClient
from jose import jwt
pytestmark = pytest.mark.asyncio
class TestAdminRoleDetection:
"""Tests for admin role detection."""
def test_admin_email_gets_admin_role(self):
"""Test that admin email is correctly identified."""
from app.config import settings
admin_email = settings.ADMIN_EMAIL
test_email = "regular@example.com"
# Admin email should be set (either from env or default)
assert admin_email is not None
assert len(admin_email) > 0
assert test_email != admin_email
@patch("app.routers.auth.settings")
def test_create_token_includes_role(self, mock_settings):
"""Test that created tokens include the role."""
mock_settings.JWT_SECRET = "test-secret"
mock_settings.ADMIN_EMAIL = "admin@test.com"
from app.routers.auth import create_token
# Test admin token
admin_token = create_token("admin@test.com", "admin")
admin_payload = jwt.decode(admin_token, "test-secret", algorithms=["HS256"])
assert admin_payload["role"] == "admin"
# Test user token
user_token = create_token("user@test.com", "user")
user_payload = jwt.decode(user_token, "test-secret", algorithms=["HS256"])
assert user_payload["role"] == "user"
class TestTokenValidation:
"""Tests for JWT token validation."""
@patch("app.routers.auth.settings")
def test_decode_valid_token(self, mock_settings):
"""Test decoding a valid token."""
mock_settings.JWT_SECRET = "test-secret"
from app.routers.auth import create_token, decode_token
token = create_token("test@example.com", "user")
payload = decode_token(token)
assert payload.email == "test@example.com"
assert payload.role == "user"
@patch("app.routers.auth.settings")
def test_decode_invalid_token_raises_error(self, mock_settings):
"""Test that invalid tokens raise an error."""
mock_settings.JWT_SECRET = "test-secret"
from app.routers.auth import decode_token
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
decode_token("invalid-token")
assert exc_info.value.status_code == 401
class TestLoginEndpoint:
"""Tests for the login endpoint."""
@pytest.fixture
def client(self):
"""Create test client."""
from app.main import app
# Skip lifespan for tests
app.router.lifespan_context = None
return TestClient(app, raise_server_exceptions=False)
@patch("app.routers.auth.httpx.AsyncClient")
@patch("app.routers.auth.settings")
async def test_login_success(self, mock_settings, mock_client_class):
"""Test successful login."""
mock_settings.AUTH_API_URL = "https://auth.test.com/login"
mock_settings.ADMIN_EMAIL = "admin@test.com"
mock_settings.JWT_SECRET = "test-secret"
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"token": "external-token"}
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = None
mock_client_class.return_value = mock_client
from app.routers.auth import login
from app.models import LoginRequest
result = await login(LoginRequest(email="user@test.com", password="password"))
assert result.email == "user@test.com"
assert result.role == "user"
assert result.token is not None
@patch("app.routers.auth.httpx.AsyncClient")
@patch("app.routers.auth.settings")
async def test_login_admin_gets_admin_role(self, mock_settings, mock_client_class):
"""Test that admin email gets admin role."""
mock_settings.AUTH_API_URL = "https://auth.test.com/login"
mock_settings.ADMIN_EMAIL = "admin@test.com"
mock_settings.JWT_SECRET = "test-secret"
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = None
mock_client_class.return_value = mock_client
from app.routers.auth import login
from app.models import LoginRequest
result = await login(LoginRequest(email="admin@test.com", password="password"))
assert result.role == "admin"

View File

@@ -0,0 +1,95 @@
"""
Unit tests for database connection and table initialization.
"""
import pytest
from unittest.mock import patch, MagicMock
class TestDatabaseConnection:
"""Tests for database connectivity."""
@patch("mysql.connector.pooling.MySQLConnectionPool")
def test_init_db_pool_success(self, mock_pool):
"""Test successful database pool initialization."""
mock_pool.return_value = MagicMock()
from app.database import init_db_pool
pool = init_db_pool()
assert pool is not None
mock_pool.assert_called_once()
@patch("mysql.connector.pooling.MySQLConnectionPool")
def test_init_db_pool_with_correct_config(self, mock_pool):
"""Test database pool is created with correct configuration."""
from app.database import init_db_pool
from app.config import settings
init_db_pool()
call_args = mock_pool.call_args
assert call_args.kwargs["host"] == settings.DB_HOST
assert call_args.kwargs["port"] == settings.DB_PORT
assert call_args.kwargs["user"] == settings.DB_USER
assert call_args.kwargs["database"] == settings.DB_NAME
class TestTableInitialization:
"""Tests for table creation."""
@patch("app.database.get_db_cursor")
def test_init_tables_creates_required_tables(self, mock_cursor_context):
"""Test that all required tables are created."""
mock_cursor = MagicMock()
mock_cursor_context.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_cursor_context.return_value.__exit__ = MagicMock(return_value=False)
from app.database import init_tables
init_tables()
# Verify execute was called for each table
assert mock_cursor.execute.call_count == 4
# Check table names in SQL
calls = mock_cursor.execute.call_args_list
sql_statements = [call[0][0] for call in calls]
assert any("meeting_users" in sql for sql in sql_statements)
assert any("meeting_records" in sql for sql in sql_statements)
assert any("meeting_conclusions" in sql for sql in sql_statements)
assert any("meeting_action_items" in sql for sql in sql_statements)
class TestDatabaseHelpers:
"""Tests for database helper functions."""
@patch("app.database.connection_pool")
def test_get_db_connection_returns_connection(self, mock_pool):
"""Test that get_db_connection returns a valid connection."""
mock_conn = MagicMock()
mock_pool.get_connection.return_value = mock_conn
from app.database import get_db_connection
with get_db_connection() as conn:
assert conn == mock_conn
mock_conn.close.assert_called_once()
@patch("app.database.connection_pool")
def test_get_db_cursor_with_commit(self, mock_pool):
"""Test that get_db_cursor commits when specified."""
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_pool.get_connection.return_value = mock_conn
mock_conn.cursor.return_value = mock_cursor
from app.database import get_db_cursor
with get_db_cursor(commit=True) as cursor:
cursor.execute("SELECT 1")
mock_conn.commit.assert_called_once()
mock_cursor.close.assert_called_once()