feat: Add Dify audio transcription with VAD chunking and SSE progress
- Add audio file upload transcription via Dify STT API - Implement VAD-based audio segmentation in sidecar (3-min chunks) - Add SSE endpoint for real-time transcription progress updates - Fix chunk size enforcement for reliable uploads - Add retry logic with exponential backoff for API calls - Support Python 3.13+ with audioop-lts package - Update frontend with Chinese progress messages and chunk display - Improve start.sh health check with retry loop 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -16,6 +16,7 @@ class Settings:
|
||||
)
|
||||
DIFY_API_URL: str = os.getenv("DIFY_API_URL", "https://dify.theaken.com/v1")
|
||||
DIFY_API_KEY: str = os.getenv("DIFY_API_KEY", "")
|
||||
DIFY_STT_API_KEY: str = os.getenv("DIFY_STT_API_KEY", "")
|
||||
|
||||
ADMIN_EMAIL: str = os.getenv("ADMIN_EMAIL", "ymirliu@panjit.com.tw")
|
||||
JWT_SECRET: str = os.getenv("JWT_SECRET", "meeting-assistant-secret")
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
from fastapi import APIRouter, HTTPException, Depends
|
||||
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File
|
||||
from fastapi.responses import StreamingResponse
|
||||
import httpx
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import subprocess
|
||||
import shutil
|
||||
import asyncio
|
||||
from typing import Optional, AsyncGenerator
|
||||
|
||||
from ..config import settings
|
||||
from ..models import SummarizeRequest, SummarizeResponse, ActionItemCreate, TokenPayload
|
||||
from .auth import get_current_user
|
||||
|
||||
# Supported audio formats
|
||||
SUPPORTED_AUDIO_FORMATS = {".mp3", ".wav", ".m4a", ".webm", ".ogg", ".flac", ".aac"}
|
||||
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500MB
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@@ -74,6 +85,9 @@ def parse_dify_response(answer: str) -> dict:
|
||||
Parse Dify response to extract conclusions and action items.
|
||||
Attempts JSON parsing first, then falls back to text parsing.
|
||||
"""
|
||||
print(f"[Dify Summarize] Raw answer length: {len(answer)} chars")
|
||||
print(f"[Dify Summarize] Raw answer preview: {answer[:500]}...")
|
||||
|
||||
# Try to find JSON in the response
|
||||
try:
|
||||
# Look for JSON block
|
||||
@@ -90,13 +104,424 @@ def parse_dify_response(answer: str) -> dict:
|
||||
raise ValueError("No JSON found")
|
||||
|
||||
data = json.loads(json_str)
|
||||
print(f"[Dify Summarize] Parsed JSON keys: {list(data.keys())}")
|
||||
print(f"[Dify Summarize] conclusions count: {len(data.get('conclusions', []))}")
|
||||
print(f"[Dify Summarize] action_items count: {len(data.get('action_items', []))}")
|
||||
|
||||
return {
|
||||
"conclusions": data.get("conclusions", []),
|
||||
"action_items": data.get("action_items", []),
|
||||
}
|
||||
except (ValueError, json.JSONDecodeError):
|
||||
except (ValueError, json.JSONDecodeError) as e:
|
||||
print(f"[Dify Summarize] JSON parse failed: {e}")
|
||||
# Fallback: return raw answer as single conclusion
|
||||
return {
|
||||
"conclusions": [answer] if answer else [],
|
||||
"action_items": [],
|
||||
}
|
||||
|
||||
|
||||
@router.post("/ai/transcribe-audio")
|
||||
async def transcribe_audio(
|
||||
file: UploadFile = File(...),
|
||||
current_user: TokenPayload = Depends(get_current_user)
|
||||
):
|
||||
"""
|
||||
Transcribe an uploaded audio file using Dify STT service.
|
||||
Large files are automatically chunked using VAD segmentation.
|
||||
"""
|
||||
if not settings.DIFY_STT_API_KEY:
|
||||
raise HTTPException(status_code=503, detail="Dify STT API not configured")
|
||||
|
||||
# Validate file extension
|
||||
file_ext = os.path.splitext(file.filename or "")[1].lower()
|
||||
if file_ext not in SUPPORTED_AUDIO_FORMATS:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Unsupported audio format. Supported: {', '.join(SUPPORTED_AUDIO_FORMATS)}"
|
||||
)
|
||||
|
||||
# Create temp directory for processing
|
||||
temp_dir = tempfile.mkdtemp(prefix="transcribe_")
|
||||
temp_file_path = os.path.join(temp_dir, f"upload{file_ext}")
|
||||
|
||||
try:
|
||||
# Save uploaded file
|
||||
file_size = 0
|
||||
with open(temp_file_path, "wb") as f:
|
||||
while chunk := await file.read(1024 * 1024): # 1MB chunks
|
||||
file_size += len(chunk)
|
||||
if file_size > MAX_FILE_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB"
|
||||
)
|
||||
f.write(chunk)
|
||||
|
||||
print(f"[Transcribe] Saved uploaded file: {temp_file_path}, size: {file_size} bytes")
|
||||
|
||||
# Call sidecar to segment audio
|
||||
segments = await segment_audio_with_sidecar(temp_file_path, temp_dir)
|
||||
|
||||
if "error" in segments:
|
||||
raise HTTPException(status_code=500, detail=segments["error"])
|
||||
|
||||
segment_list = segments.get("segments", [])
|
||||
total_segments = len(segment_list)
|
||||
|
||||
print(f"[Transcribe] Segmentation complete: {total_segments} chunks created")
|
||||
for seg in segment_list:
|
||||
print(f" - Chunk {seg.get('index')}: {seg.get('path')} ({seg.get('duration', 0):.1f}s)")
|
||||
|
||||
if total_segments == 0:
|
||||
raise HTTPException(status_code=400, detail="No audio content detected")
|
||||
|
||||
# Transcribe each chunk via Dify STT
|
||||
transcriptions = []
|
||||
failed_chunks = []
|
||||
async with httpx.AsyncClient() as client:
|
||||
for i, segment in enumerate(segment_list):
|
||||
chunk_path = segment.get("path")
|
||||
chunk_index = segment.get("index", i)
|
||||
|
||||
print(f"[Transcribe] Processing chunk {chunk_index + 1}/{total_segments}: {chunk_path}")
|
||||
|
||||
if not chunk_path:
|
||||
print(f"[Transcribe] ERROR: Chunk {chunk_index} has no path!")
|
||||
failed_chunks.append(chunk_index)
|
||||
continue
|
||||
|
||||
if not os.path.exists(chunk_path):
|
||||
print(f"[Transcribe] ERROR: Chunk file does not exist: {chunk_path}")
|
||||
failed_chunks.append(chunk_index)
|
||||
continue
|
||||
|
||||
chunk_size = os.path.getsize(chunk_path)
|
||||
print(f"[Transcribe] Chunk {chunk_index} exists, size: {chunk_size} bytes")
|
||||
|
||||
# Call Dify STT API with retry
|
||||
text = await transcribe_chunk_with_dify(
|
||||
client, chunk_path, current_user.email
|
||||
)
|
||||
if text:
|
||||
print(f"[Transcribe] Chunk {chunk_index} transcribed: {len(text)} chars")
|
||||
transcriptions.append(text)
|
||||
else:
|
||||
print(f"[Transcribe] Chunk {chunk_index} transcription failed (no text returned)")
|
||||
failed_chunks.append(chunk_index)
|
||||
|
||||
# Concatenate all transcriptions
|
||||
final_transcript = " ".join(transcriptions)
|
||||
|
||||
print(f"[Transcribe] Complete: {len(transcriptions)}/{total_segments} chunks transcribed")
|
||||
if failed_chunks:
|
||||
print(f"[Transcribe] Failed chunks: {failed_chunks}")
|
||||
|
||||
return {
|
||||
"transcript": final_transcript,
|
||||
"chunks_processed": len(transcriptions),
|
||||
"chunks_total": total_segments,
|
||||
"chunks_failed": len(failed_chunks),
|
||||
"total_duration_seconds": segments.get("total_duration", 0),
|
||||
"language": "zh"
|
||||
}
|
||||
|
||||
finally:
|
||||
# Clean up temp files
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
|
||||
@router.post("/ai/transcribe-audio-stream")
|
||||
async def transcribe_audio_stream(
|
||||
file: UploadFile = File(...),
|
||||
current_user: TokenPayload = Depends(get_current_user)
|
||||
):
|
||||
"""
|
||||
Transcribe an uploaded audio file with real-time progress via SSE.
|
||||
Returns Server-Sent Events for progress updates.
|
||||
"""
|
||||
if not settings.DIFY_STT_API_KEY:
|
||||
raise HTTPException(status_code=503, detail="Dify STT API not configured")
|
||||
|
||||
# Validate file extension
|
||||
file_ext = os.path.splitext(file.filename or "")[1].lower()
|
||||
if file_ext not in SUPPORTED_AUDIO_FORMATS:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Unsupported audio format. Supported: {', '.join(SUPPORTED_AUDIO_FORMATS)}"
|
||||
)
|
||||
|
||||
# Read file into memory for streaming
|
||||
file_content = await file.read()
|
||||
if len(file_content) > MAX_FILE_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=413,
|
||||
detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB"
|
||||
)
|
||||
|
||||
async def generate_progress() -> AsyncGenerator[str, None]:
|
||||
temp_dir = tempfile.mkdtemp(prefix="transcribe_")
|
||||
temp_file_path = os.path.join(temp_dir, f"upload{file_ext}")
|
||||
|
||||
try:
|
||||
# Save file
|
||||
with open(temp_file_path, "wb") as f:
|
||||
f.write(file_content)
|
||||
|
||||
yield f"data: {json.dumps({'event': 'start', 'message': '音訊檔案已接收,開始處理...'})}\n\n"
|
||||
|
||||
# Segment audio
|
||||
yield f"data: {json.dumps({'event': 'segmenting', 'message': '正在分析音訊並分割片段...'})}\n\n"
|
||||
|
||||
segments = await segment_audio_with_sidecar(temp_file_path, temp_dir)
|
||||
|
||||
if "error" in segments:
|
||||
yield f"data: {json.dumps({'event': 'error', 'message': segments['error']})}\n\n"
|
||||
return
|
||||
|
||||
segment_list = segments.get("segments", [])
|
||||
total_segments = len(segment_list)
|
||||
total_duration = segments.get("total_duration", 0)
|
||||
|
||||
if total_segments == 0:
|
||||
yield f"data: {json.dumps({'event': 'error', 'message': '未檢測到音訊內容'})}\n\n"
|
||||
return
|
||||
|
||||
yield f"data: {json.dumps({'event': 'segments_ready', 'total': total_segments, 'duration': total_duration, 'message': f'分割完成,共 {total_segments} 個片段'})}\n\n"
|
||||
|
||||
# Transcribe each chunk
|
||||
transcriptions = []
|
||||
async with httpx.AsyncClient() as client:
|
||||
for i, segment in enumerate(segment_list):
|
||||
chunk_path = segment.get("path")
|
||||
chunk_index = segment.get("index", i)
|
||||
chunk_duration = segment.get("duration", 0)
|
||||
|
||||
yield f"data: {json.dumps({'event': 'chunk_start', 'chunk': chunk_index + 1, 'total': total_segments, 'duration': chunk_duration, 'message': f'正在轉錄片段 {chunk_index + 1}/{total_segments}...'})}\n\n"
|
||||
|
||||
if not chunk_path or not os.path.exists(chunk_path):
|
||||
yield f"data: {json.dumps({'event': 'chunk_error', 'chunk': chunk_index + 1, 'message': f'片段 {chunk_index + 1} 檔案不存在'})}\n\n"
|
||||
continue
|
||||
|
||||
text = await transcribe_chunk_with_dify(
|
||||
client, chunk_path, current_user.email
|
||||
)
|
||||
|
||||
if text:
|
||||
transcriptions.append(text)
|
||||
yield f"data: {json.dumps({'event': 'chunk_done', 'chunk': chunk_index + 1, 'total': total_segments, 'text_length': len(text), 'message': f'片段 {chunk_index + 1} 完成'})}\n\n"
|
||||
else:
|
||||
yield f"data: {json.dumps({'event': 'chunk_error', 'chunk': chunk_index + 1, 'message': f'片段 {chunk_index + 1} 轉錄失敗'})}\n\n"
|
||||
|
||||
# Final result
|
||||
final_transcript = " ".join(transcriptions)
|
||||
yield f"data: {json.dumps({'event': 'complete', 'transcript': final_transcript, 'chunks_processed': len(transcriptions), 'chunks_total': total_segments, 'duration': total_duration})}\n\n"
|
||||
|
||||
finally:
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
return StreamingResponse(
|
||||
generate_progress(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no"
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
async def segment_audio_with_sidecar(audio_path: str, output_dir: str) -> dict:
|
||||
"""Call sidecar to segment audio file using VAD."""
|
||||
# Find sidecar script
|
||||
sidecar_dir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "sidecar")
|
||||
sidecar_script = os.path.join(sidecar_dir, "transcriber.py")
|
||||
venv_python = os.path.join(sidecar_dir, "venv", "bin", "python")
|
||||
|
||||
# Use venv python if available, otherwise system python
|
||||
python_cmd = venv_python if os.path.exists(venv_python) else "python3"
|
||||
|
||||
if not os.path.exists(sidecar_script):
|
||||
return {"error": "Sidecar not found"}
|
||||
|
||||
try:
|
||||
# Prepare command
|
||||
cmd_input = json.dumps({
|
||||
"action": "segment_audio",
|
||||
"file_path": audio_path,
|
||||
"max_chunk_seconds": 180, # 3 minutes (smaller chunks for reliable upload)
|
||||
"min_silence_ms": 500,
|
||||
"output_dir": output_dir
|
||||
})
|
||||
|
||||
# Run sidecar process
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
python_cmd, sidecar_script,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=sidecar_dir
|
||||
)
|
||||
|
||||
# Send command and wait for response
|
||||
stdout, stderr = await asyncio.wait_for(
|
||||
process.communicate(input=f"{cmd_input}\n{{\"action\": \"quit\"}}\n".encode()),
|
||||
timeout=600 # 10 minutes timeout for large files
|
||||
)
|
||||
|
||||
# Parse response (skip status messages, find the segment result)
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
if line:
|
||||
try:
|
||||
data = json.loads(line)
|
||||
if data.get("status") == "success" or "segments" in data:
|
||||
return data
|
||||
if "error" in data:
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return {"error": "No valid response from sidecar"}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return {"error": "Sidecar timeout during segmentation"}
|
||||
except Exception as e:
|
||||
return {"error": f"Sidecar error: {str(e)}"}
|
||||
|
||||
|
||||
async def upload_file_to_dify(
|
||||
client: httpx.AsyncClient,
|
||||
file_path: str,
|
||||
user_email: str
|
||||
) -> Optional[str]:
|
||||
"""Upload a file to Dify and return the file ID."""
|
||||
try:
|
||||
upload_url = f"{settings.DIFY_API_URL}/files/upload"
|
||||
|
||||
file_size = os.path.getsize(file_path)
|
||||
print(f"[Upload] File: {file_path}, size: {file_size / (1024*1024):.1f} MB")
|
||||
|
||||
# Adjust timeout based on file size (minimum 60s, ~1MB per 5 seconds)
|
||||
timeout_seconds = max(60.0, file_size / (1024 * 1024) * 5)
|
||||
print(f"[Upload] Using timeout: {timeout_seconds:.0f}s")
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
files = {"file": (os.path.basename(file_path), f, "audio/wav")}
|
||||
response = await client.post(
|
||||
upload_url,
|
||||
headers={
|
||||
"Authorization": f"Bearer {settings.DIFY_STT_API_KEY}",
|
||||
},
|
||||
files=files,
|
||||
data={"user": user_email},
|
||||
timeout=timeout_seconds,
|
||||
)
|
||||
|
||||
print(f"[Upload] Response: {response.status_code}")
|
||||
|
||||
if response.status_code == 201 or response.status_code == 200:
|
||||
data = response.json()
|
||||
file_id = data.get("id")
|
||||
print(f"[Upload] Success, file_id: {file_id}")
|
||||
return file_id
|
||||
|
||||
print(f"[Upload] Error: {response.status_code} - {response.text[:500]}")
|
||||
return None
|
||||
|
||||
except httpx.ReadError as e:
|
||||
print(f"[Upload] Network read error (connection reset): {e}")
|
||||
return None
|
||||
except httpx.TimeoutException as e:
|
||||
print(f"[Upload] Timeout: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"[Upload] Error: {e}")
|
||||
print(traceback.format_exc())
|
||||
return None
|
||||
|
||||
|
||||
async def transcribe_chunk_with_dify(
|
||||
client: httpx.AsyncClient,
|
||||
chunk_path: str,
|
||||
user_email: str,
|
||||
max_retries: int = 3
|
||||
) -> Optional[str]:
|
||||
"""Transcribe a single audio chunk via Dify chat API with file upload."""
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
print(f"[Dify] Attempt {attempt + 1}/{max_retries} for chunk: {chunk_path}")
|
||||
|
||||
# Step 1: Upload file to Dify (with retry inside this attempt)
|
||||
file_id = None
|
||||
for upload_attempt in range(2): # 2 upload attempts per main attempt
|
||||
file_id = await upload_file_to_dify(client, chunk_path, user_email)
|
||||
if file_id:
|
||||
break
|
||||
print(f"[Dify] Upload attempt {upload_attempt + 1} failed, retrying...")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
if not file_id:
|
||||
print(f"[Dify] Failed to upload file after retries: {chunk_path}")
|
||||
if attempt < max_retries - 1:
|
||||
await asyncio.sleep(2 ** attempt)
|
||||
continue
|
||||
return None
|
||||
|
||||
print(f"[Dify] File uploaded, file_id: {file_id}")
|
||||
|
||||
# Step 2: Send chat message with file to request transcription
|
||||
response = await client.post(
|
||||
f"{settings.DIFY_API_URL}/chat-messages",
|
||||
headers={
|
||||
"Authorization": f"Bearer {settings.DIFY_STT_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
json={
|
||||
"inputs": {},
|
||||
"query": "請將這段音檔轉錄成文字,只回傳轉錄的文字內容,不要加任何額外說明。",
|
||||
"response_mode": "blocking",
|
||||
"user": user_email,
|
||||
"files": [
|
||||
{
|
||||
"type": "audio",
|
||||
"transfer_method": "local_file",
|
||||
"upload_file_id": file_id
|
||||
}
|
||||
]
|
||||
},
|
||||
timeout=300.0, # 5 minutes per chunk (increased for longer segments)
|
||||
)
|
||||
|
||||
print(f"[Dify] Chat response: {response.status_code}")
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
answer = data.get("answer", "")
|
||||
print(f"[Dify] Transcription success, length: {len(answer)} chars")
|
||||
return answer
|
||||
|
||||
# Retry on server errors or rate limits
|
||||
if response.status_code >= 500 or response.status_code == 429:
|
||||
print(f"[Dify] Server error {response.status_code}, will retry...")
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = 2 ** attempt
|
||||
if response.status_code == 429:
|
||||
wait_time = 10 # Wait longer for rate limits
|
||||
await asyncio.sleep(wait_time)
|
||||
continue
|
||||
|
||||
# Log error but don't fail entire transcription
|
||||
print(f"[Dify] Chat error for chunk: {response.status_code} - {response.text[:500]}")
|
||||
return None
|
||||
|
||||
except httpx.TimeoutException:
|
||||
if attempt < max_retries - 1:
|
||||
await asyncio.sleep(2 ** attempt)
|
||||
continue
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Chunk transcription error: {e}")
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user