from fastapi import APIRouter, HTTPException, Depends, UploadFile, File from fastapi.responses import StreamingResponse import httpx import json import os import tempfile import subprocess import shutil import asyncio from typing import Optional, AsyncGenerator from ..config import settings from ..models import SummarizeRequest, SummarizeResponse, ActionItemCreate, TokenPayload from .auth import get_current_user # Supported audio formats SUPPORTED_AUDIO_FORMATS = {".mp3", ".wav", ".m4a", ".webm", ".ogg", ".flac", ".aac"} MAX_FILE_SIZE = 500 * 1024 * 1024 # 500MB router = APIRouter() @router.post("/ai/summarize", response_model=SummarizeResponse) async def summarize_transcript( request: SummarizeRequest, current_user: TokenPayload = Depends(get_current_user) ): """ Send transcript to Dify for AI summarization. Returns structured conclusions and action items. """ if not settings.DIFY_API_KEY: raise HTTPException(status_code=503, detail="Dify API not configured") async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.DIFY_API_URL}/chat-messages", headers={ "Authorization": f"Bearer {settings.DIFY_API_KEY}", "Content-Type": "application/json", }, json={ "inputs": {}, "query": request.transcript, "response_mode": "blocking", "user": current_user.email, }, timeout=120.0, # Long timeout for LLM processing ) if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=f"Dify API error: {response.text}", ) data = response.json() answer = data.get("answer", "") # Try to parse structured JSON from Dify response parsed = parse_dify_response(answer) return SummarizeResponse( conclusions=parsed["conclusions"], action_items=[ ActionItemCreate( content=item.get("content", ""), owner=item.get("owner", ""), due_date=item.get("due_date"), ) for item in parsed["action_items"] ], ) except httpx.TimeoutException: raise HTTPException( status_code=504, detail="Dify API timeout - transcript may be too long" ) except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Dify API unavailable: {str(e)}") def parse_dify_response(answer: str) -> dict: """ Parse Dify response to extract conclusions and action items. Attempts JSON parsing first, then falls back to text parsing. """ print(f"[Dify Summarize] Raw answer length: {len(answer)} chars") print(f"[Dify Summarize] Raw answer preview: {answer[:500]}...") # Try to find JSON in the response try: # Look for JSON block if "```json" in answer: json_start = answer.index("```json") + 7 json_end = answer.index("```", json_start) json_str = answer[json_start:json_end].strip() elif "{" in answer and "}" in answer: # Try to find JSON object json_start = answer.index("{") json_end = answer.rindex("}") + 1 json_str = answer[json_start:json_end] else: raise ValueError("No JSON found") data = json.loads(json_str) print(f"[Dify Summarize] Parsed JSON keys: {list(data.keys())}") print(f"[Dify Summarize] conclusions count: {len(data.get('conclusions', []))}") print(f"[Dify Summarize] action_items count: {len(data.get('action_items', []))}") return { "conclusions": data.get("conclusions", []), "action_items": data.get("action_items", []), } except (ValueError, json.JSONDecodeError) as e: print(f"[Dify Summarize] JSON parse failed: {e}") # Fallback: return raw answer as single conclusion return { "conclusions": [answer] if answer else [], "action_items": [], } @router.post("/ai/transcribe-audio") async def transcribe_audio( file: UploadFile = File(...), current_user: TokenPayload = Depends(get_current_user) ): """ Transcribe an uploaded audio file using Dify STT service. Large files are automatically chunked using VAD segmentation. """ if not settings.DIFY_STT_API_KEY: raise HTTPException(status_code=503, detail="Dify STT API not configured") # Validate file extension file_ext = os.path.splitext(file.filename or "")[1].lower() if file_ext not in SUPPORTED_AUDIO_FORMATS: raise HTTPException( status_code=400, detail=f"Unsupported audio format. Supported: {', '.join(SUPPORTED_AUDIO_FORMATS)}" ) # Create temp directory for processing temp_dir = tempfile.mkdtemp(prefix="transcribe_") temp_file_path = os.path.join(temp_dir, f"upload{file_ext}") try: # Save uploaded file file_size = 0 with open(temp_file_path, "wb") as f: while chunk := await file.read(1024 * 1024): # 1MB chunks file_size += len(chunk) if file_size > MAX_FILE_SIZE: raise HTTPException( status_code=413, detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB" ) f.write(chunk) print(f"[Transcribe] Saved uploaded file: {temp_file_path}, size: {file_size} bytes") # Call sidecar to segment audio segments = await segment_audio_with_sidecar(temp_file_path, temp_dir) if "error" in segments: raise HTTPException(status_code=500, detail=segments["error"]) segment_list = segments.get("segments", []) total_segments = len(segment_list) print(f"[Transcribe] Segmentation complete: {total_segments} chunks created") for seg in segment_list: print(f" - Chunk {seg.get('index')}: {seg.get('path')} ({seg.get('duration', 0):.1f}s)") if total_segments == 0: raise HTTPException(status_code=400, detail="No audio content detected") # Transcribe each chunk via Dify STT transcriptions = [] failed_chunks = [] async with httpx.AsyncClient() as client: for i, segment in enumerate(segment_list): chunk_path = segment.get("path") chunk_index = segment.get("index", i) print(f"[Transcribe] Processing chunk {chunk_index + 1}/{total_segments}: {chunk_path}") if not chunk_path: print(f"[Transcribe] ERROR: Chunk {chunk_index} has no path!") failed_chunks.append(chunk_index) continue if not os.path.exists(chunk_path): print(f"[Transcribe] ERROR: Chunk file does not exist: {chunk_path}") failed_chunks.append(chunk_index) continue chunk_size = os.path.getsize(chunk_path) print(f"[Transcribe] Chunk {chunk_index} exists, size: {chunk_size} bytes") # Call Dify STT API with retry text = await transcribe_chunk_with_dify( client, chunk_path, current_user.email ) if text: print(f"[Transcribe] Chunk {chunk_index} transcribed: {len(text)} chars") transcriptions.append(text) else: print(f"[Transcribe] Chunk {chunk_index} transcription failed (no text returned)") failed_chunks.append(chunk_index) # Concatenate all transcriptions final_transcript = " ".join(transcriptions) print(f"[Transcribe] Complete: {len(transcriptions)}/{total_segments} chunks transcribed") if failed_chunks: print(f"[Transcribe] Failed chunks: {failed_chunks}") return { "transcript": final_transcript, "chunks_processed": len(transcriptions), "chunks_total": total_segments, "chunks_failed": len(failed_chunks), "total_duration_seconds": segments.get("total_duration", 0), "language": "zh" } finally: # Clean up temp files shutil.rmtree(temp_dir, ignore_errors=True) @router.post("/ai/transcribe-audio-stream") async def transcribe_audio_stream( file: UploadFile = File(...), current_user: TokenPayload = Depends(get_current_user) ): """ Transcribe an uploaded audio file with real-time progress via SSE. Returns Server-Sent Events for progress updates. """ if not settings.DIFY_STT_API_KEY: raise HTTPException(status_code=503, detail="Dify STT API not configured") # Validate file extension file_ext = os.path.splitext(file.filename or "")[1].lower() if file_ext not in SUPPORTED_AUDIO_FORMATS: raise HTTPException( status_code=400, detail=f"Unsupported audio format. Supported: {', '.join(SUPPORTED_AUDIO_FORMATS)}" ) # Read file into memory for streaming file_content = await file.read() if len(file_content) > MAX_FILE_SIZE: raise HTTPException( status_code=413, detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB" ) async def generate_progress() -> AsyncGenerator[str, None]: temp_dir = tempfile.mkdtemp(prefix="transcribe_") temp_file_path = os.path.join(temp_dir, f"upload{file_ext}") try: # Save file with open(temp_file_path, "wb") as f: f.write(file_content) yield f"data: {json.dumps({'event': 'start', 'message': '音訊檔案已接收,開始處理...'})}\n\n" # Segment audio yield f"data: {json.dumps({'event': 'segmenting', 'message': '正在分析音訊並分割片段...'})}\n\n" segments = await segment_audio_with_sidecar(temp_file_path, temp_dir) if "error" in segments: yield f"data: {json.dumps({'event': 'error', 'message': segments['error']})}\n\n" return segment_list = segments.get("segments", []) total_segments = len(segment_list) total_duration = segments.get("total_duration", 0) if total_segments == 0: yield f"data: {json.dumps({'event': 'error', 'message': '未檢測到音訊內容'})}\n\n" return yield f"data: {json.dumps({'event': 'segments_ready', 'total': total_segments, 'duration': total_duration, 'message': f'分割完成,共 {total_segments} 個片段'})}\n\n" # Transcribe each chunk transcriptions = [] async with httpx.AsyncClient() as client: for i, segment in enumerate(segment_list): chunk_path = segment.get("path") chunk_index = segment.get("index", i) chunk_duration = segment.get("duration", 0) yield f"data: {json.dumps({'event': 'chunk_start', 'chunk': chunk_index + 1, 'total': total_segments, 'duration': chunk_duration, 'message': f'正在轉錄片段 {chunk_index + 1}/{total_segments}...'})}\n\n" if not chunk_path or not os.path.exists(chunk_path): yield f"data: {json.dumps({'event': 'chunk_error', 'chunk': chunk_index + 1, 'message': f'片段 {chunk_index + 1} 檔案不存在'})}\n\n" continue text = await transcribe_chunk_with_dify( client, chunk_path, current_user.email ) if text: transcriptions.append(text) yield f"data: {json.dumps({'event': 'chunk_done', 'chunk': chunk_index + 1, 'total': total_segments, 'text_length': len(text), 'message': f'片段 {chunk_index + 1} 完成'})}\n\n" else: yield f"data: {json.dumps({'event': 'chunk_error', 'chunk': chunk_index + 1, 'message': f'片段 {chunk_index + 1} 轉錄失敗'})}\n\n" # Final result final_transcript = " ".join(transcriptions) yield f"data: {json.dumps({'event': 'complete', 'transcript': final_transcript, 'chunks_processed': len(transcriptions), 'chunks_total': total_segments, 'duration': total_duration})}\n\n" finally: shutil.rmtree(temp_dir, ignore_errors=True) return StreamingResponse( generate_progress(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) async def segment_audio_with_sidecar(audio_path: str, output_dir: str) -> dict: """Call sidecar to segment audio file using VAD.""" # Find sidecar script sidecar_dir = os.path.join(os.path.dirname(__file__), "..", "..", "..", "sidecar") sidecar_script = os.path.join(sidecar_dir, "transcriber.py") venv_python = os.path.join(sidecar_dir, "venv", "bin", "python") # Use venv python if available, otherwise system python python_cmd = venv_python if os.path.exists(venv_python) else "python3" if not os.path.exists(sidecar_script): return {"error": "Sidecar not found"} try: # Prepare command cmd_input = json.dumps({ "action": "segment_audio", "file_path": audio_path, "max_chunk_seconds": 180, # 3 minutes (smaller chunks for reliable upload) "min_silence_ms": 500, "output_dir": output_dir }) # Run sidecar process process = await asyncio.create_subprocess_exec( python_cmd, sidecar_script, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=sidecar_dir ) # Send command and wait for response stdout, stderr = await asyncio.wait_for( process.communicate(input=f"{cmd_input}\n{{\"action\": \"quit\"}}\n".encode()), timeout=600 # 10 minutes timeout for large files ) # Parse response (skip status messages, find the segment result) for line in stdout.decode().strip().split('\n'): if line: try: data = json.loads(line) if data.get("status") == "success" or "segments" in data: return data if "error" in data: return data except json.JSONDecodeError: continue return {"error": "No valid response from sidecar"} except asyncio.TimeoutError: return {"error": "Sidecar timeout during segmentation"} except Exception as e: return {"error": f"Sidecar error: {str(e)}"} async def upload_file_to_dify( client: httpx.AsyncClient, file_path: str, user_email: str ) -> Optional[str]: """Upload a file to Dify and return the file ID.""" try: upload_url = f"{settings.DIFY_API_URL}/files/upload" file_size = os.path.getsize(file_path) print(f"[Upload] File: {file_path}, size: {file_size / (1024*1024):.1f} MB") # Adjust timeout based on file size (minimum 60s, ~1MB per 5 seconds) timeout_seconds = max(60.0, file_size / (1024 * 1024) * 5) print(f"[Upload] Using timeout: {timeout_seconds:.0f}s") with open(file_path, "rb") as f: files = {"file": (os.path.basename(file_path), f, "audio/wav")} response = await client.post( upload_url, headers={ "Authorization": f"Bearer {settings.DIFY_STT_API_KEY}", }, files=files, data={"user": user_email}, timeout=timeout_seconds, ) print(f"[Upload] Response: {response.status_code}") if response.status_code == 201 or response.status_code == 200: data = response.json() file_id = data.get("id") print(f"[Upload] Success, file_id: {file_id}") return file_id print(f"[Upload] Error: {response.status_code} - {response.text[:500]}") return None except httpx.ReadError as e: print(f"[Upload] Network read error (connection reset): {e}") return None except httpx.TimeoutException as e: print(f"[Upload] Timeout: {e}") return None except Exception as e: import traceback print(f"[Upload] Error: {e}") print(traceback.format_exc()) return None async def transcribe_chunk_with_dify( client: httpx.AsyncClient, chunk_path: str, user_email: str, max_retries: int = 3 ) -> Optional[str]: """Transcribe a single audio chunk via Dify chat API with file upload.""" for attempt in range(max_retries): try: print(f"[Dify] Attempt {attempt + 1}/{max_retries} for chunk: {chunk_path}") # Step 1: Upload file to Dify (with retry inside this attempt) file_id = None for upload_attempt in range(2): # 2 upload attempts per main attempt file_id = await upload_file_to_dify(client, chunk_path, user_email) if file_id: break print(f"[Dify] Upload attempt {upload_attempt + 1} failed, retrying...") await asyncio.sleep(1) if not file_id: print(f"[Dify] Failed to upload file after retries: {chunk_path}") if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) continue return None print(f"[Dify] File uploaded, file_id: {file_id}") # Step 2: Send chat message with file to request transcription response = await client.post( f"{settings.DIFY_API_URL}/chat-messages", headers={ "Authorization": f"Bearer {settings.DIFY_STT_API_KEY}", "Content-Type": "application/json", }, json={ "inputs": {}, "query": "請將這段音檔轉錄成文字,只回傳轉錄的文字內容,不要加任何額外說明。", "response_mode": "blocking", "user": user_email, "files": [ { "type": "audio", "transfer_method": "local_file", "upload_file_id": file_id } ] }, timeout=300.0, # 5 minutes per chunk (increased for longer segments) ) print(f"[Dify] Chat response: {response.status_code}") if response.status_code == 200: data = response.json() answer = data.get("answer", "") print(f"[Dify] Transcription success, length: {len(answer)} chars") return answer # Retry on server errors or rate limits if response.status_code >= 500 or response.status_code == 429: print(f"[Dify] Server error {response.status_code}, will retry...") if attempt < max_retries - 1: wait_time = 2 ** attempt if response.status_code == 429: wait_time = 10 # Wait longer for rate limits await asyncio.sleep(wait_time) continue # Log error but don't fail entire transcription print(f"[Dify] Chat error for chunk: {response.status_code} - {response.text[:500]}") return None except httpx.TimeoutException: if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) continue return None except Exception as e: print(f"Chunk transcription error: {e}") return None return None