import os import uuid import requests import json import re import io from datetime import timedelta from pydub import AudioSegment from pydub.silence import split_on_silence from moviepy import VideoFileClip from celery_app import celery from models import db, Meeting import math from celery import Task class ProgressTask(Task): def update_progress(self, current, total, status_msg, extra_info=None): meta = {'current': current, 'total': total, 'status_msg': status_msg} if extra_info and isinstance(extra_info, dict): meta.update(extra_info) self.update_state(state='PROGRESS', meta=meta) # --- Dify Helper Functions --- def ask_dify(api_key: str, query: str, **kwargs): from flask import current_app DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL") if not api_key or not DIFY_API_BASE_URL: return {"answer": "Error: DIFY API settings not configured."} url = f"{DIFY_API_BASE_URL}/chat-messages" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = { "inputs": kwargs.get("inputs", {}), "query": query, "user": kwargs.get("user_id", "default-tk-user"), "response_mode": kwargs.get("response_mode", "blocking"), "conversation_id": kwargs.get("conversation_id") } try: response = requests.post(url, headers=headers, json=payload, timeout=kwargs.get("timeout_seconds", 1200)) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: return {"answer": f"Dify API request error: {e}"} def upload_chunk_to_dify(blob: bytes, filename="chunk.mp4"): from flask import current_app DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL") API_KEY = current_app.config.get("DIFY_STT_API_KEY") print(f"DEBUG: In upload_chunk_to_dify, using DIFY_STT_API_KEY: '{API_KEY}'") # FINAL DEBUG r = requests.post( f"{DIFY_API_BASE_URL}/files/upload", headers={"Authorization": f"Bearer {API_KEY}"}, files={"file": (filename, io.BytesIO(blob), "audio/mp4")}, data={"user": "ai-meeting-assistant-user"}, timeout=300 ) print("Dify File Upload API Response:", r.status_code, r.text) # DEBUG PRINT r.raise_for_status() return r.json()["id"] def run_dify_stt_chat_app(file_id: str) -> str: """ 透過呼叫 Dify 的 chat-messages API 來執行語音轉文字。 適用於「進階對話型」應用。 """ from flask import current_app DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL") API_KEY = current_app.config.get("DIFY_STT_API_KEY") payload = { "inputs": {}, "query": "請將音檔轉換為文字", "user": "ai-meeting-assistant-user", "response_mode": "blocking", "files": [ { "type": "audio", "transfer_method": "local_file", "upload_file_id": file_id } ] } r = requests.post( f"{DIFY_API_BASE_URL}/chat-messages", headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}, json=payload, timeout=1800 ) print("Dify STT API Response:", r.status_code, r.text) # DEBUG PRINT r.raise_for_status() j = r.json() # 在對話型應用中,答案通常在 'answer' 欄位 return j.get("answer", "").strip() # --- Timestamp & Audio Chunking Helpers --- def format_timestamp_from_ms(ms: int) -> str: td = timedelta(milliseconds=ms) total_seconds = td.total_seconds() hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) milliseconds = td.microseconds // 1000 return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}.{milliseconds:03d}" def export_audio_bytes(seg: AudioSegment) -> bytes: buf = io.BytesIO() seg.export(buf, format="mp4", bitrate="64k") buf.seek(0) return buf.read() def ensure_chunk_under_limits(seg: AudioSegment): MAX_SEG_MS = 20 * 60 * 1000 # Max duration 20 minutes MAX_BYTES = 24 * 1024 * 1024 # Dify file limit is 25MB, use 24MB as a safe buffer if len(export_audio_bytes(seg)) < MAX_BYTES: return [seg] # If segment is too large, split by duration first parts = [] start_ms = 0 while start_ms < len(seg): end_ms = min(start_ms + MAX_SEG_MS, len(seg)) parts.append(seg[start_ms:end_ms]) start_ms = end_ms # Then, double-check byte size for each part final_parts = [] for p in parts: if len(export_audio_bytes(p)) < MAX_BYTES: final_parts.append(p) else: # Fallback for rare cases: split in half until compliant sub_parts = [p] while sub_parts: current_part = sub_parts.pop(0) if len(export_audio_bytes(current_part)) < MAX_BYTES: final_parts.append(current_part) continue mid_point = len(current_part) // 2 if mid_point < 1000: # Stop splitting if it's less than a second final_parts.append(current_part) continue sub_parts.extend([current_part[:mid_point], current_part[mid_point:]]) return final_parts # --- Celery Tasks --- @celery.task(base=ProgressTask, bind=True) def extract_audio_task(self, input_path, output_path): try: self.update_progress(0, 100, "Starting audio extraction...") with VideoFileClip(input_path) as video: video.audio.write_audiofile(output_path) self.update_progress(100, 100, "Audio extracted successfully.") return {'status': 'Success', 'result_path': output_path} except Exception as e: self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)}) @celery.task(base=ProgressTask, bind=True) def transcribe_audio_task(self, audio_path): from app import app with app.app_context(): try: self.update_progress(0, 100, "Loading and preparing audio file...") audio = AudioSegment.from_file(audio_path) # 1. Split audio by silence self.update_progress(5, 100, "Detecting silence to split audio into chunks...") chunks = split_on_silence( audio, min_silence_len=700, silence_thresh=-40, keep_silence=300 ) if not chunks: # If no silence is detected, treat the whole audio as one chunk chunks = [audio] # 2. Process chunks and ensure they are within API limits final_segments = [] cursor_ms = 0 for chunk in chunks: start_time = cursor_ms end_time = start_time + len(chunk) safe_parts = ensure_chunk_under_limits(chunk) part_start_time = start_time for part in safe_parts: part_end_time = part_start_time + len(part) final_segments.append({ "start": part_start_time, "end": part_end_time, "segment": part }) part_start_time = part_end_time cursor_ms = end_time # 3. Upload chunks to Dify and get transcriptions transcribed_lines = [] total_segments = len(final_segments) for i, seg_data in enumerate(final_segments): progress = 10 + int((i / total_segments) * 85) self.update_progress(progress, 100, f"Processing chunk {i+1} of {total_segments}...") audio_bytes = export_audio_bytes(seg_data["segment"]) file_id = upload_chunk_to_dify(audio_bytes, f"chunk_{i+1}.mp4") transcribed_text = run_dify_stt_chat_app(file_id).strip() if transcribed_text: start_ts = format_timestamp_from_ms(seg_data["start"]) end_ts = format_timestamp_from_ms(seg_data["end"]) transcribed_lines.append(f"[{start_ts} - {end_ts}] {transcribed_text}") # 4. Finalize and save the result self.update_progress(98, 100, "Finalizing transcript...") full_content = "\n".join(transcribed_lines) transcript_filename = f"transcript_{uuid.uuid4()}.txt" output_txt_path = os.path.join(app.config['UPLOAD_FOLDER'], transcript_filename) with open(output_txt_path, "w", encoding="utf-8") as f: f.write(full_content) self.update_progress(100, 100, "Transcription complete.") return {'status': 'Success', 'content': full_content, 'result_path': transcript_filename} except Exception as e: error_message = f"An error occurred: {str(e)}" self.update_state( state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': error_message} ) return {'status': 'Error', 'error': error_message} @celery.task(base=ProgressTask, bind=True) def translate_text_task(self, text_content, target_language): from app import app with app.app_context(): from services.dify_client import translate_text as dify_translate try: self.update_progress(0, 100, f"Starting translation to {target_language}...") if isinstance(text_content, dict): text_content = text_content.get('content', '') if not text_content or not isinstance(text_content, str): self.update_progress(100, 100, "Translation skipped due to empty input.") return {'status': 'Success', 'content': '', 'result_path': None, 'message': 'Input was empty.'} lines = text_content.strip().split('\n') final_content = "" timestamp_pattern = re.compile(r'^(\s*\[\d{2}:\d{2}:\d{2}\.\d{3}\s-\s\d{2}:\d{2}:\d{2}\.\d{3}\])\s*(.*)') is_timestamped_input = False if lines: if timestamp_pattern.match(lines[0]): is_timestamped_input = True if is_timestamped_input: translated_lines = [] total_lines = len(lines) for i, line in enumerate(lines): match = timestamp_pattern.match(line) if not match: if line.strip(): # Keep non-matching, non-empty lines translated_lines.append(line) continue timestamp = match.group(1) original_text = match.group(2) # Add original line translated_lines.append(line) if not original_text.strip(): continue translated_text = dify_translate(text=original_text, target_lang=target_language) # Add translated line, preserving the timestamp translated_lines.append(f"{timestamp} {translated_text}") progress = int(((i + 1) / total_lines) * 100) self.update_progress(progress, 100, f"Translating line {i+1}/{total_lines}...") final_content = "\n".join(translated_lines) else: # Handle non-timestamped text line by line for bilingual output translated_lines = [] total_lines = len(lines) for i, line in enumerate(lines): progress = int(((i + 1) / total_lines) * 98) # Leave some room for finalization self.update_progress(progress, 100, f"Translating line {i+1}/{total_lines}...") original_line = line.strip() if not original_line: continue translated_text = dify_translate(text=original_line, target_lang=target_language) translated_lines.append(original_line) translated_lines.append(translated_text) translated_lines.append("") # Add a blank line for readability final_content = "\n".join(translated_lines) translated_filename = f"translated_{uuid.uuid4()}.txt" upload_folder = app.config.get('UPLOAD_FOLDER', 'uploads') output_txt_path = os.path.join(upload_folder, translated_filename) os.makedirs(upload_folder, exist_ok=True) with open(output_txt_path, "w", encoding="utf-8") as f: f.write(final_content) self.update_progress(100, 100, "Translation complete.") return {'status': 'Success', 'content': final_content, 'result_path': translated_filename} except Exception as e: self.update_state( state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)} ) @celery.task(bind=True) def summarize_text_task(self, meeting_id): from app import app with app.app_context(): try: meeting = Meeting.query.get(meeting_id) if not meeting or not meeting.transcript: self.update_state(state='FAILURE', meta={'error': 'Meeting or transcript not found'}) return {'status': 'Error', 'error': 'Meeting or transcript not found'} api_key = app.config.get("DIFY_SUMMARIZER_API_KEY") plain_transcript = re.sub(r'^(\s*\[.*?\])\s*', '', meeting.transcript, flags=re.MULTILINE) if not plain_transcript.strip(): self.update_state(state='FAILURE', meta={'error': 'Transcript is empty.'}) return {'status': 'Error', 'error': 'Transcript is empty'} response = ask_dify(api_key, plain_transcript) summary = response.get("answer") if not summary: self.update_state(state='FAILURE', meta={'error': 'Dify returned empty summary.'}) return {'status': 'Error', 'error': 'Dify returned empty summary.'} meeting.summary = summary db.session.commit() return {'status': 'Success', 'summary': summary} except Exception as e: db.session.rollback() self.update_state( state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)} ) return {'status': 'Error', 'error': str(e)} @celery.task(base=ProgressTask, bind=True) def preview_action_items_task(self, text_content): from app import app with app.app_context(): try: self.update_progress(10, 100, "Requesting Dify for action items...") api_key = app.config.get("DIFY_ACTION_EXTRACTOR_API_KEY") plain_text = re.sub(r'^(\s*\[.*?\])\s*', '', text_content, flags=re.MULTILINE) if not plain_text.strip(): return {'status': 'Success', 'items': []} response = ask_dify(api_key, plain_text) answer_text = response.get("answer", "") self.update_progress(80, 100, "Parsing response...") parsed_items = [] try: # Find the JSON array within the response string match = re.search(r'\[.*\]', answer_text, re.DOTALL) if match: json_str = match.group(0) parsed_items = json.loads(json_str) # Ensure it's a list, otherwise reset to empty if not isinstance(parsed_items, list): parsed_items = [] except (json.JSONDecodeError, TypeError): # If parsing fails, leave it as an empty list parsed_items = [] self.update_progress(100, 100, "Action item preview generated.") return {'status': 'Success', 'items': parsed_items} except Exception as e: self.update_state( state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)} ) return {'status': 'Error', 'error': str(e)} @celery.task(bind=True) def process_meeting_flow(self, meeting_id, target_language=None): from app import app with app.app_context(): meeting = Meeting.query.get(meeting_id) if not meeting: return {'status': 'Error', 'error': 'Meeting not found'} try: meeting.status = 'processing' db.session.commit() upload_folder = app.config['UPLOAD_FOLDER'] # Assuming filename exists on the meeting object audio_path = os.path.join(upload_folder, meeting.filename.replace('.mp4', '.wav')) # The call to transcribe_audio_task needs to be updated as it no longer takes 'language' transcript_result = transcribe_audio_task.apply(args=[audio_path]).get() if transcript_result.get('status') != 'Success': raise Exception(f"Transcription failed: {transcript_result.get('error', 'Unknown error')}") meeting.transcript = transcript_result['content'] db.session.commit() if target_language: translation_result = translate_text_task.apply(args=[meeting.transcript, target_language]).get() if translation_result.get('status') != 'Success': raise Exception(f"Translation failed: {translation_result.get('error', 'Unknown error')}") meeting.translated_transcript = translation_result['content'] db.session.commit() summary_result = summarize_text_task.apply(args=[meeting.id]).get() if summary_result.get('status') != 'Success': raise Exception(f"Summarization failed: {summary_result.get('error', 'Unknown error')}") meeting.summary = summary_result['summary'] meeting.status = 'completed' db.session.commit() return {'status': 'Success', 'meeting_id': meeting.id} except Exception as e: meeting.status = 'failed' db.session.commit() return {'status': 'Error', 'error': str(e)}