Files
Meeting_Assistant/backend/app/routers/ai.py
egg 01aee1fd0d feat: Extract hardcoded configs to environment variables
- Add environment variable configuration for backend and frontend
- Backend: DB_POOL_SIZE, JWT_EXPIRE_HOURS, timeout configs, directory paths
- Frontend: VITE_API_BASE_URL, VITE_UPLOAD_TIMEOUT, Whisper configs
- Create deployment script (scripts/deploy-backend.sh)
- Create 1Panel deployment guide (docs/1panel-deployment.md)
- Update DEPLOYMENT.md with env var documentation
- Create README.md with project overview
- Remove obsolete PRD.md, SDD.md, TDD.md (replaced by OpenSpec)
- Keep CORS allow_origins=["*"] for Electron EXE distribution

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-14 14:31:55 +08:00

524 lines
20 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, UploadFile, File
from fastapi.responses import StreamingResponse
import httpx
import json
import os
import tempfile
import subprocess
import shutil
import asyncio
from typing import Optional, AsyncGenerator
from ..config import settings
from ..models import SummarizeRequest, SummarizeResponse, ActionItemCreate, TokenPayload
from .auth import get_current_user
router = APIRouter()
@router.post("/ai/summarize", response_model=SummarizeResponse)
async def summarize_transcript(
request: SummarizeRequest, current_user: TokenPayload = Depends(get_current_user)
):
"""
Send transcript to Dify for AI summarization.
Returns structured conclusions and action items.
"""
if not settings.DIFY_API_KEY:
raise HTTPException(status_code=503, detail="Dify API not configured")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.DIFY_API_URL}/chat-messages",
headers={
"Authorization": f"Bearer {settings.DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {},
"query": request.transcript,
"response_mode": "blocking",
"user": current_user.email,
},
timeout=settings.llm_timeout_seconds,
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Dify API error: {response.text}",
)
data = response.json()
answer = data.get("answer", "")
# Try to parse structured JSON from Dify response
parsed = parse_dify_response(answer)
return SummarizeResponse(
conclusions=parsed["conclusions"],
action_items=[
ActionItemCreate(
content=item.get("content", ""),
owner=item.get("owner", ""),
due_date=item.get("due_date"),
)
for item in parsed["action_items"]
],
)
except httpx.TimeoutException:
raise HTTPException(
status_code=504, detail="Dify API timeout - transcript may be too long"
)
except httpx.RequestError as e:
raise HTTPException(status_code=503, detail=f"Dify API unavailable: {str(e)}")
def parse_dify_response(answer: str) -> dict:
"""
Parse Dify response to extract conclusions and action items.
Attempts JSON parsing first, then falls back to text parsing.
"""
print(f"[Dify Summarize] Raw answer length: {len(answer)} chars")
print(f"[Dify Summarize] Raw answer preview: {answer[:500]}...")
# Try to find JSON in the response
try:
# Look for JSON block
if "```json" in answer:
json_start = answer.index("```json") + 7
json_end = answer.index("```", json_start)
json_str = answer[json_start:json_end].strip()
elif "{" in answer and "}" in answer:
# Try to find JSON object
json_start = answer.index("{")
json_end = answer.rindex("}") + 1
json_str = answer[json_start:json_end]
else:
raise ValueError("No JSON found")
data = json.loads(json_str)
print(f"[Dify Summarize] Parsed JSON keys: {list(data.keys())}")
print(f"[Dify Summarize] conclusions count: {len(data.get('conclusions', []))}")
print(f"[Dify Summarize] action_items count: {len(data.get('action_items', []))}")
return {
"conclusions": data.get("conclusions", []),
"action_items": data.get("action_items", []),
}
except (ValueError, json.JSONDecodeError) as e:
print(f"[Dify Summarize] JSON parse failed: {e}")
# Fallback: return raw answer as single conclusion
return {
"conclusions": [answer] if answer else [],
"action_items": [],
}
@router.post("/ai/transcribe-audio")
async def transcribe_audio(
file: UploadFile = File(...),
current_user: TokenPayload = Depends(get_current_user)
):
"""
Transcribe an uploaded audio file using Dify STT service.
Large files are automatically chunked using VAD segmentation.
"""
if not settings.DIFY_STT_API_KEY:
raise HTTPException(status_code=503, detail="Dify STT API not configured")
# Validate file extension
file_ext = os.path.splitext(file.filename or "")[1].lower()
if file_ext not in settings.supported_audio_formats_set:
raise HTTPException(
status_code=400,
detail=f"Unsupported audio format. Supported: {settings.SUPPORTED_AUDIO_FORMATS}"
)
# Create temp directory for processing
temp_dir = tempfile.mkdtemp(prefix="transcribe_")
temp_file_path = os.path.join(temp_dir, f"upload{file_ext}")
try:
# Save uploaded file
file_size = 0
with open(temp_file_path, "wb") as f:
while chunk := await file.read(1024 * 1024): # 1MB chunks
file_size += len(chunk)
if file_size > settings.MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size: {settings.MAX_FILE_SIZE // (1024*1024)}MB"
)
f.write(chunk)
print(f"[Transcribe] Saved uploaded file: {temp_file_path}, size: {file_size} bytes")
# Call sidecar to segment audio
segments = await segment_audio_with_sidecar(temp_file_path, temp_dir)
if "error" in segments:
raise HTTPException(status_code=500, detail=segments["error"])
segment_list = segments.get("segments", [])
total_segments = len(segment_list)
print(f"[Transcribe] Segmentation complete: {total_segments} chunks created")
for seg in segment_list:
print(f" - Chunk {seg.get('index')}: {seg.get('path')} ({seg.get('duration', 0):.1f}s)")
if total_segments == 0:
raise HTTPException(status_code=400, detail="No audio content detected")
# Transcribe each chunk via Dify STT
transcriptions = []
failed_chunks = []
async with httpx.AsyncClient() as client:
for i, segment in enumerate(segment_list):
chunk_path = segment.get("path")
chunk_index = segment.get("index", i)
print(f"[Transcribe] Processing chunk {chunk_index + 1}/{total_segments}: {chunk_path}")
if not chunk_path:
print(f"[Transcribe] ERROR: Chunk {chunk_index} has no path!")
failed_chunks.append(chunk_index)
continue
if not os.path.exists(chunk_path):
print(f"[Transcribe] ERROR: Chunk file does not exist: {chunk_path}")
failed_chunks.append(chunk_index)
continue
chunk_size = os.path.getsize(chunk_path)
print(f"[Transcribe] Chunk {chunk_index} exists, size: {chunk_size} bytes")
# Call Dify STT API with retry
text = await transcribe_chunk_with_dify(
client, chunk_path, current_user.email
)
if text:
print(f"[Transcribe] Chunk {chunk_index} transcribed: {len(text)} chars")
transcriptions.append(text)
else:
print(f"[Transcribe] Chunk {chunk_index} transcription failed (no text returned)")
failed_chunks.append(chunk_index)
# Concatenate all transcriptions
final_transcript = " ".join(transcriptions)
print(f"[Transcribe] Complete: {len(transcriptions)}/{total_segments} chunks transcribed")
if failed_chunks:
print(f"[Transcribe] Failed chunks: {failed_chunks}")
return {
"transcript": final_transcript,
"chunks_processed": len(transcriptions),
"chunks_total": total_segments,
"chunks_failed": len(failed_chunks),
"total_duration_seconds": segments.get("total_duration", 0),
"language": "zh"
}
finally:
# Clean up temp files
shutil.rmtree(temp_dir, ignore_errors=True)
@router.post("/ai/transcribe-audio-stream")
async def transcribe_audio_stream(
file: UploadFile = File(...),
current_user: TokenPayload = Depends(get_current_user)
):
"""
Transcribe an uploaded audio file with real-time progress via SSE.
Returns Server-Sent Events for progress updates.
"""
if not settings.DIFY_STT_API_KEY:
raise HTTPException(status_code=503, detail="Dify STT API not configured")
# Validate file extension
file_ext = os.path.splitext(file.filename or "")[1].lower()
if file_ext not in settings.supported_audio_formats_set:
raise HTTPException(
status_code=400,
detail=f"Unsupported audio format. Supported: {settings.SUPPORTED_AUDIO_FORMATS}"
)
# Read file into memory for streaming
file_content = await file.read()
if len(file_content) > settings.MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size: {settings.MAX_FILE_SIZE // (1024*1024)}MB"
)
async def generate_progress() -> AsyncGenerator[str, None]:
temp_dir = tempfile.mkdtemp(prefix="transcribe_")
temp_file_path = os.path.join(temp_dir, f"upload{file_ext}")
try:
# Save file
with open(temp_file_path, "wb") as f:
f.write(file_content)
yield f"data: {json.dumps({'event': 'start', 'message': '音訊檔案已接收,開始處理...'})}\n\n"
# Segment audio
yield f"data: {json.dumps({'event': 'segmenting', 'message': '正在分析音訊並分割片段...'})}\n\n"
segments = await segment_audio_with_sidecar(temp_file_path, temp_dir)
if "error" in segments:
yield f"data: {json.dumps({'event': 'error', 'message': segments['error']})}\n\n"
return
segment_list = segments.get("segments", [])
total_segments = len(segment_list)
total_duration = segments.get("total_duration", 0)
if total_segments == 0:
yield f"data: {json.dumps({'event': 'error', 'message': '未檢測到音訊內容'})}\n\n"
return
yield f"data: {json.dumps({'event': 'segments_ready', 'total': total_segments, 'duration': total_duration, 'message': f'分割完成,共 {total_segments} 個片段'})}\n\n"
# Transcribe each chunk
transcriptions = []
async with httpx.AsyncClient() as client:
for i, segment in enumerate(segment_list):
chunk_path = segment.get("path")
chunk_index = segment.get("index", i)
chunk_duration = segment.get("duration", 0)
yield f"data: {json.dumps({'event': 'chunk_start', 'chunk': chunk_index + 1, 'total': total_segments, 'duration': chunk_duration, 'message': f'正在轉錄片段 {chunk_index + 1}/{total_segments}...'})}\n\n"
if not chunk_path or not os.path.exists(chunk_path):
yield f"data: {json.dumps({'event': 'chunk_error', 'chunk': chunk_index + 1, 'message': f'片段 {chunk_index + 1} 檔案不存在'})}\n\n"
continue
text = await transcribe_chunk_with_dify(
client, chunk_path, current_user.email
)
if text:
transcriptions.append(text)
yield f"data: {json.dumps({'event': 'chunk_done', 'chunk': chunk_index + 1, 'total': total_segments, 'text_length': len(text), 'message': f'片段 {chunk_index + 1} 完成'})}\n\n"
else:
yield f"data: {json.dumps({'event': 'chunk_error', 'chunk': chunk_index + 1, 'message': f'片段 {chunk_index + 1} 轉錄失敗'})}\n\n"
# Final result
final_transcript = " ".join(transcriptions)
yield f"data: {json.dumps({'event': 'complete', 'transcript': final_transcript, 'chunks_processed': len(transcriptions), 'chunks_total': total_segments, 'duration': total_duration})}\n\n"
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
return StreamingResponse(
generate_progress(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
async def segment_audio_with_sidecar(audio_path: str, output_dir: str) -> dict:
"""Call sidecar to segment audio file using VAD."""
# Find sidecar script
sidecar_dir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "sidecar")
sidecar_script = os.path.join(sidecar_dir, "transcriber.py")
venv_python = os.path.join(sidecar_dir, "venv", "bin", "python")
# Use venv python if available, otherwise system python
python_cmd = venv_python if os.path.exists(venv_python) else "python3"
if not os.path.exists(sidecar_script):
return {"error": "Sidecar not found"}
try:
# Prepare command
cmd_input = json.dumps({
"action": "segment_audio",
"file_path": audio_path,
"max_chunk_seconds": 180, # 3 minutes (smaller chunks for reliable upload)
"min_silence_ms": 500,
"output_dir": output_dir
})
# Run sidecar process
process = await asyncio.create_subprocess_exec(
python_cmd, sidecar_script,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=sidecar_dir
)
# Send command and wait for response
stdout, stderr = await asyncio.wait_for(
process.communicate(input=f"{cmd_input}\n{{\"action\": \"quit\"}}\n".encode()),
timeout=settings.upload_timeout_seconds
)
# Parse response (skip status messages, find the segment result)
for line in stdout.decode().strip().split('\n'):
if line:
try:
data = json.loads(line)
if data.get("status") == "success" or "segments" in data:
return data
if "error" in data:
return data
except json.JSONDecodeError:
continue
return {"error": "No valid response from sidecar"}
except asyncio.TimeoutError:
return {"error": "Sidecar timeout during segmentation"}
except Exception as e:
return {"error": f"Sidecar error: {str(e)}"}
async def upload_file_to_dify(
client: httpx.AsyncClient,
file_path: str,
user_email: str
) -> Optional[str]:
"""Upload a file to Dify and return the file ID."""
try:
upload_url = f"{settings.DIFY_API_URL}/files/upload"
file_size = os.path.getsize(file_path)
print(f"[Upload] File: {file_path}, size: {file_size / (1024*1024):.1f} MB")
# Adjust timeout based on file size (minimum 60s, ~1MB per 5 seconds)
timeout_seconds = max(60.0, file_size / (1024 * 1024) * 5)
print(f"[Upload] Using timeout: {timeout_seconds:.0f}s")
with open(file_path, "rb") as f:
files = {"file": (os.path.basename(file_path), f, "audio/wav")}
response = await client.post(
upload_url,
headers={
"Authorization": f"Bearer {settings.DIFY_STT_API_KEY}",
},
files=files,
data={"user": user_email},
timeout=timeout_seconds,
)
print(f"[Upload] Response: {response.status_code}")
if response.status_code == 201 or response.status_code == 200:
data = response.json()
file_id = data.get("id")
print(f"[Upload] Success, file_id: {file_id}")
return file_id
print(f"[Upload] Error: {response.status_code} - {response.text[:500]}")
return None
except httpx.ReadError as e:
print(f"[Upload] Network read error (connection reset): {e}")
return None
except httpx.TimeoutException as e:
print(f"[Upload] Timeout: {e}")
return None
except Exception as e:
import traceback
print(f"[Upload] Error: {e}")
print(traceback.format_exc())
return None
async def transcribe_chunk_with_dify(
client: httpx.AsyncClient,
chunk_path: str,
user_email: str,
max_retries: int = 3
) -> Optional[str]:
"""Transcribe a single audio chunk via Dify chat API with file upload."""
for attempt in range(max_retries):
try:
print(f"[Dify] Attempt {attempt + 1}/{max_retries} for chunk: {chunk_path}")
# Step 1: Upload file to Dify (with retry inside this attempt)
file_id = None
for upload_attempt in range(2): # 2 upload attempts per main attempt
file_id = await upload_file_to_dify(client, chunk_path, user_email)
if file_id:
break
print(f"[Dify] Upload attempt {upload_attempt + 1} failed, retrying...")
await asyncio.sleep(1)
if not file_id:
print(f"[Dify] Failed to upload file after retries: {chunk_path}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return None
print(f"[Dify] File uploaded, file_id: {file_id}")
# Step 2: Send chat message with file to request transcription
response = await client.post(
f"{settings.DIFY_API_URL}/chat-messages",
headers={
"Authorization": f"Bearer {settings.DIFY_STT_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {},
"query": "請將這段音檔轉錄成文字,只回傳轉錄的文字內容,不要加任何額外說明。",
"response_mode": "blocking",
"user": user_email,
"files": [
{
"type": "audio",
"transfer_method": "local_file",
"upload_file_id": file_id
}
]
},
timeout=settings.dify_stt_timeout_seconds,
)
print(f"[Dify] Chat response: {response.status_code}")
if response.status_code == 200:
data = response.json()
answer = data.get("answer", "")
print(f"[Dify] Transcription success, length: {len(answer)} chars")
return answer
# Retry on server errors or rate limits
if response.status_code >= 500 or response.status_code == 429:
print(f"[Dify] Server error {response.status_code}, will retry...")
if attempt < max_retries - 1:
wait_time = 2 ** attempt
if response.status_code == 429:
wait_time = 10 # Wait longer for rate limits
await asyncio.sleep(wait_time)
continue
# Log error but don't fail entire transcription
print(f"[Dify] Chat error for chunk: {response.status_code} - {response.text[:500]}")
return None
except httpx.TimeoutException:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return None
except Exception as e:
print(f"Chunk transcription error: {e}")
return None
return None