Files
AI_meeting_assistant/tasks.py
2025-08-07 09:57:20 +08:00

249 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import whisper
import torch
import shutil
import subprocess
import time
import re
import json
import requests
from celery import Celery, Task
from opencc import OpenCC
from moviepy import VideoFileClip
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# ========== Dify API Configuration ==========
DIFY_API_KEY = os.environ.get("DIFY_API_KEY")
DIFY_API_BASE_URL = os.environ.get("DIFY_API_BASE_URL")
# ========== Dify API Client Function ==========
def ask_dify(prompt: str, user_id: str = "default-tk-user-resume", inputs: dict = None, response_mode: str = "streaming", conversation_id: str = None, timeout_seconds: int = 1200) -> dict:
if not DIFY_API_KEY or not DIFY_API_BASE_URL:
return {"answer": "❌ 錯誤DIFY_API_KEY 或 DIFY_API_BASE_URL 未在環境變數中設定。", "conversation_id": conversation_id}
if inputs is None:
inputs = {}
url = f"{DIFY_API_BASE_URL}/chat-messages"
headers = {"Authorization": f"Bearer {DIFY_API_KEY}", "Content-Type": "application/json"}
payload = {"inputs": inputs, "query": prompt, "user": user_id, "response_mode": response_mode}
if conversation_id:
payload["conversation_id"] = conversation_id
returned_conversation_id = conversation_id
error_from_stream_message = None
print(f"\n--- [ASK_DIFY] Sending request to Dify ---")
try:
is_streaming_request = (response_mode == "streaming")
response = requests.post(url, headers=headers, json=payload, timeout=timeout_seconds, stream=is_streaming_request)
response.raise_for_status()
if is_streaming_request:
full_answer_chunks = []
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data:"):
try:
data_json_str = decoded_line[len("data:"):]
data_obj = json.loads(data_json_str)
event_type = data_obj.get("event")
if event_type == "agent_message" or event_type == "message":
if "answer" in data_obj and data_obj["answer"] is not None:
full_answer_chunks.append(data_obj["answer"])
elif event_type == "message_end":
if "conversation_id" in data_obj:
returned_conversation_id = data_obj["conversation_id"]
break
elif event_type == "error":
error_from_stream_message = data_obj.get('message', 'Dify API 返回未知的流式錯誤')
returned_conversation_id = data_obj.get("conversation_id", returned_conversation_id)
break
except json.JSONDecodeError:
pass
if error_from_stream_message:
clean_error_answer = re.sub(r"<think>.*?</think>\s*", "", error_from_stream_message, flags=re.DOTALL).strip()
return {"answer": f"❌ Dify API 流處理錯誤: {clean_error_answer}", "conversation_id": returned_conversation_id}
raw_answer = "".join(full_answer_chunks) if full_answer_chunks else "⚠️ 流式響應未包含有效回答或內容為空"
else:
response_data = response.json()
raw_answer = response_data.get("answer", "⚠️ 回應中未找到 'answer' 欄位或內容為空")
returned_conversation_id = response_data.get("conversation_id", returned_conversation_id)
clean_answer = re.sub(r"<think>.*?</think>\s*", "", raw_answer, flags=re.DOTALL).strip()
return {"answer": clean_answer, "conversation_id": returned_conversation_id}
except requests.exceptions.Timeout:
return {"answer": f"⚠️ 請求 Dify API 逾時 (超過 {timeout_seconds} 秒)", "conversation_id": conversation_id}
except requests.exceptions.HTTPError as http_err:
error_message_detail = f" - 原始響應: {http_err.response.text}"
final_error_message = f"❌ Dify API HTTP 錯誤: {http_err.response.status_code}{error_message_detail}"
return {"answer": final_error_message, "conversation_id": conversation_id}
except Exception as e:
return {"answer": f"❌ 處理 Dify API 請求或響應時發生未預期錯誤: {type(e).__name__}: {str(e)}", "conversation_id": conversation_id}
# ========== Dify API Client Function END ==========
# ========== Celery 設定 ==========
celery = Celery(
'tasks',
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
)
class ProgressTask(Task):
def update_progress(self, current, total, status, extra_info=None):
meta = {'current': current, 'total': total, 'status': status}
if extra_info:
meta.update(extra_info)
self.update_state(state='PROGRESS', meta=meta)
# ========== Demucs 輔助函式 ==========
def separate_vocals_with_demucs(self, audio_path, project_root):
self.update_progress(10, 100, "🎛️ 使用 Demucs 分離人聲...")
output_dir = os.path.join(project_root, 'demucs_separated')
cmd = [
sys.executable, '-m', 'demucs.separate', # <-- 將 'python' 改為 sys.executable
'-n', 'htdemucs_ft',
'--two-stems=vocals',
'-o', output_dir,
audio_path
]
try:
result = subprocess.run(cmd, check=True, capture_output=True)
print("Demucs stdout:", result.stdout)
except subprocess.CalledProcessError as e:
stderr_str = "No stderr output"
if e.stderr:
try:
stderr_str = e.stderr.decode('utf-8')
except UnicodeDecodeError:
stderr_str = e.stderr.decode(sys.getdefaultencoding(), errors='replace')
print("Demucs stderr:", stderr_str)
raise RuntimeError(f"Demucs 人聲分離失敗: {stderr_str}")
original_filename_base = os.path.splitext(os.path.basename(audio_path))[0]
vocals_path = os.path.join(output_dir, 'htdemucs_ft', original_filename_base, 'vocals.wav')
if not os.path.exists(vocals_path):
raise FileNotFoundError(f"找不到 Demucs 分離出的人聲音訊檔案: {vocals_path}")
return vocals_path
# ========== Task 1: 影片轉音訊 ==========
@celery.task(base=ProgressTask, bind=True)
def extract_audio_task(self, input_video_path, output_audio_path):
try:
self.update_progress(0, 100, "準備轉換影片...")
video_clip = VideoFileClip(input_video_path)
self.update_progress(50, 100, "正在提取音訊...")
video_clip.audio.write_audiofile(output_audio_path, codec="pcm_s16le")
video_clip.close()
self.update_progress(100, 100, "音訊提取完成!")
return {'status': '完成', 'result_path': output_audio_path}
except Exception as e:
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
return {'status': '錯誤', 'error': str(e)}
# ========== Task 2: 音訊轉文字 (Whisper) ==========
@celery.task(base=ProgressTask, bind=True)
def transcribe_audio_task(self, input_audio_path, output_txt_path, language, use_demucs=False):
try:
self.update_progress(0, 100, "準備開始轉錄...")
current_audio_path = input_audio_path
if use_demucs:
project_root = os.path.dirname(os.path.dirname(output_txt_path)) # uploads folder is inside project_root
current_audio_path = separate_vocals_with_demucs(self, input_audio_path, project_root)
self.update_progress(25, 100, "✅ 人聲分離完成,準備載入 Whisper...")
device = "cuda" if torch.cuda.is_available() else "cpu"
progress_after_load = 40 if use_demucs else 20
self.update_progress(progress_after_load - 10, 100, f"載入 Whisper 'medium' 模型 (使用 {device})...")
model = whisper.load_model("medium", device=device)
self.update_progress(progress_after_load, 100, "模型載入完畢,開始轉錄音訊...")
transcription_result = model.transcribe(
audio=current_audio_path,
language=language if language != 'auto' else None,
fp16=(device == "cuda"),
verbose=False
)
self.update_progress(85, 100, "轉錄完成,進行繁體轉換與格式化...")
cc = OpenCC('s2twp')
with open(output_txt_path, "w", encoding="utf-8") as f_out:
for segment in transcription_result["segments"]:
start_time_s = int(segment["start"])
end_time_s = int(segment["end"])
text_content = cc.convert(segment["text"].strip())
formatted_line = f"[{start_time_s:04d}s - {end_time_s:04d}s] {text_content}\n"
f_out.write(formatted_line)
self.update_progress(100, 100, "逐字稿完成!")
return {'status': '完成', 'result_path': output_txt_path}
except Exception as e:
import traceback
traceback.print_exc()
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
return {'status': '錯誤', 'error': str(e)}
# ========== Task 3: 逐段翻譯 (Dify) ==========
glossary = {"LLM": "大型語言模型", "prompt": "提示詞", "API": "應用程式介面"}
def generate_translation_prompt_for_dify(english_text_segment, target_language):
glossary_entries = "\n".join([f"- {k}: {v}" for k, v in glossary.items()])
return (
f"動作: 翻譯\n目標語言: {target_language}\n參考字典:\n{glossary_entries}\n\n"
f"需翻譯內容or需總結內容:\n---\n{english_text_segment}\n---\n你的結果:\n"
)
def split_segments(text_content):
pattern = re.compile(r"(\[\d{4}s\s*-\s*\d{4}s\])(.*?)(?=\n\[\d{4}s|$)", re.DOTALL)
return [f"{match.group(1).strip()} {match.group(2).strip()}" for match in pattern.finditer(text_content)]
def parse_segment(segment_text):
match = re.match(r"(\[\d{4}s\s*-\s*\d{4}s\])\s*(.*)", segment_text.strip(), re.DOTALL)
return (match.group(1), match.group(2)) if match else (None, segment_text.strip())
@celery.task(base=ProgressTask, bind=True)
def translate_segments_task(self, input_txt_path, output_txt_path, target_language):
try:
with open(input_txt_path, "r", encoding="utf-8") as f:
segments = split_segments(f.read())
total_segments = len(segments)
if total_segments == 0:
self.update_progress(100, 100, "完成,但輸入檔無內容。")
return {'status': '完成', 'result_path': None, 'content': '(輸入檔案為空)'}
conversation_id = None
full_translated_content = ""
for idx, segment_text in enumerate(segments):
timestamp, original_text = parse_segment(segment_text)
status_msg = f"正在翻譯第 {idx + 1}/{total_segments} 段..."
self.update_progress(idx, total_segments, status_msg, {'preview': full_translated_content})
if not original_text.strip():
translated_text = "(原始內容為空)"
else:
prompt = generate_translation_prompt_for_dify(original_text, target_language)
response = ask_dify(prompt, conversation_id=conversation_id)
translated_text = response.get("answer", "翻譯失敗")
conversation_id = response.get("conversation_id")
line_break = "\n\n" if full_translated_content else ""
segment_result = f"{timestamp}\n{original_text}\n👉{translated_text}"
full_translated_content += f"{line_break}{segment_result}"
with open(output_txt_path, "w", encoding="utf-8") as f:
f.write(full_translated_content)
self.update_progress(total_segments, total_segments, "全部翻譯完成!")
return {'status': '完成', 'result_path': output_txt_path, 'content': full_translated_content}
except Exception as e:
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
return {'status': '錯誤', 'error': str(e)}
# ========== Task 4: 會議結論整理 (Dify) ==========
@celery.task(base=ProgressTask, bind=True)
def summarize_text_task(self, text_content, target_language, conversation_id=None, revision_instruction=None):
try:
self.update_progress(1, 100, "準備提示詞...")
if revision_instruction:
prompt = f"Existing Summary:\n---\n{text_content}\n---\n\nUser's Revision Instructions:\n---\n{revision_instruction}\n---\nPlease provide the revised meeting summary in {target_language}:"
else:
prompt = f"Please act as a professional meeting analyst and summarize the following meeting transcript into key points, action items, and conclusions. The summary should be in {target_language}.\n\nTranscript:\n---\n{text_content}\n---"
self.update_progress(20, 100, "正在請求 Dify API...")
response = ask_dify(prompt, conversation_id=conversation_id)
summary = response.get("answer", "總結失敗")
new_conv_id = response.get("conversation_id")
self.update_progress(100, 100, "總結已生成!")
return {'status': '完成', 'summary': summary, 'conversation_id': new_conv_id}
except Exception as e:
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
return {'status': '錯誤', 'error': str(e)}