249 lines
14 KiB
Python
249 lines
14 KiB
Python
import os
|
||
import sys
|
||
import whisper
|
||
import torch
|
||
import shutil
|
||
import subprocess
|
||
import time
|
||
import re
|
||
import json
|
||
import requests
|
||
from celery import Celery, Task
|
||
from opencc import OpenCC
|
||
from moviepy import VideoFileClip
|
||
from dotenv import load_dotenv
|
||
|
||
# Load environment variables from .env file
|
||
load_dotenv()
|
||
|
||
# ========== Dify API Configuration ==========
|
||
DIFY_API_KEY = os.environ.get("DIFY_API_KEY")
|
||
DIFY_API_BASE_URL = os.environ.get("DIFY_API_BASE_URL")
|
||
|
||
# ========== Dify API Client Function ==========
|
||
def ask_dify(prompt: str, user_id: str = "default-tk-user-resume", inputs: dict = None, response_mode: str = "streaming", conversation_id: str = None, timeout_seconds: int = 1200) -> dict:
|
||
if not DIFY_API_KEY or not DIFY_API_BASE_URL:
|
||
return {"answer": "❌ 錯誤:DIFY_API_KEY 或 DIFY_API_BASE_URL 未在環境變數中設定。", "conversation_id": conversation_id}
|
||
|
||
if inputs is None:
|
||
inputs = {}
|
||
url = f"{DIFY_API_BASE_URL}/chat-messages"
|
||
headers = {"Authorization": f"Bearer {DIFY_API_KEY}", "Content-Type": "application/json"}
|
||
payload = {"inputs": inputs, "query": prompt, "user": user_id, "response_mode": response_mode}
|
||
if conversation_id:
|
||
payload["conversation_id"] = conversation_id
|
||
returned_conversation_id = conversation_id
|
||
error_from_stream_message = None
|
||
print(f"\n--- [ASK_DIFY] Sending request to Dify ---")
|
||
try:
|
||
is_streaming_request = (response_mode == "streaming")
|
||
response = requests.post(url, headers=headers, json=payload, timeout=timeout_seconds, stream=is_streaming_request)
|
||
response.raise_for_status()
|
||
if is_streaming_request:
|
||
full_answer_chunks = []
|
||
for line in response.iter_lines():
|
||
if line:
|
||
decoded_line = line.decode('utf-8')
|
||
if decoded_line.startswith("data:"):
|
||
try:
|
||
data_json_str = decoded_line[len("data:"):]
|
||
data_obj = json.loads(data_json_str)
|
||
event_type = data_obj.get("event")
|
||
if event_type == "agent_message" or event_type == "message":
|
||
if "answer" in data_obj and data_obj["answer"] is not None:
|
||
full_answer_chunks.append(data_obj["answer"])
|
||
elif event_type == "message_end":
|
||
if "conversation_id" in data_obj:
|
||
returned_conversation_id = data_obj["conversation_id"]
|
||
break
|
||
elif event_type == "error":
|
||
error_from_stream_message = data_obj.get('message', 'Dify API 返回未知的流式錯誤')
|
||
returned_conversation_id = data_obj.get("conversation_id", returned_conversation_id)
|
||
break
|
||
except json.JSONDecodeError:
|
||
pass
|
||
if error_from_stream_message:
|
||
clean_error_answer = re.sub(r"<think>.*?</think>\s*", "", error_from_stream_message, flags=re.DOTALL).strip()
|
||
return {"answer": f"❌ Dify API 流處理錯誤: {clean_error_answer}", "conversation_id": returned_conversation_id}
|
||
raw_answer = "".join(full_answer_chunks) if full_answer_chunks else "⚠️ 流式響應未包含有效回答或內容為空"
|
||
else:
|
||
response_data = response.json()
|
||
raw_answer = response_data.get("answer", "⚠️ 回應中未找到 'answer' 欄位或內容為空")
|
||
returned_conversation_id = response_data.get("conversation_id", returned_conversation_id)
|
||
clean_answer = re.sub(r"<think>.*?</think>\s*", "", raw_answer, flags=re.DOTALL).strip()
|
||
return {"answer": clean_answer, "conversation_id": returned_conversation_id}
|
||
except requests.exceptions.Timeout:
|
||
return {"answer": f"⚠️ 請求 Dify API 逾時 (超過 {timeout_seconds} 秒)", "conversation_id": conversation_id}
|
||
except requests.exceptions.HTTPError as http_err:
|
||
error_message_detail = f" - 原始響應: {http_err.response.text}"
|
||
final_error_message = f"❌ Dify API HTTP 錯誤: {http_err.response.status_code}{error_message_detail}"
|
||
return {"answer": final_error_message, "conversation_id": conversation_id}
|
||
except Exception as e:
|
||
return {"answer": f"❌ 處理 Dify API 請求或響應時發生未預期錯誤: {type(e).__name__}: {str(e)}", "conversation_id": conversation_id}
|
||
# ========== Dify API Client Function END ==========
|
||
|
||
|
||
# ========== Celery 設定 ==========
|
||
celery = Celery(
|
||
'tasks',
|
||
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
|
||
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
|
||
)
|
||
|
||
class ProgressTask(Task):
|
||
def update_progress(self, current, total, status, extra_info=None):
|
||
meta = {'current': current, 'total': total, 'status': status}
|
||
if extra_info:
|
||
meta.update(extra_info)
|
||
self.update_state(state='PROGRESS', meta=meta)
|
||
|
||
# ========== Demucs 輔助函式 ==========
|
||
def separate_vocals_with_demucs(self, audio_path, project_root):
|
||
self.update_progress(10, 100, "🎛️ 使用 Demucs 分離人聲...")
|
||
output_dir = os.path.join(project_root, 'demucs_separated')
|
||
cmd = [
|
||
sys.executable, '-m', 'demucs.separate', # <-- 將 'python' 改為 sys.executable
|
||
'-n', 'htdemucs_ft',
|
||
'--two-stems=vocals',
|
||
'-o', output_dir,
|
||
audio_path
|
||
]
|
||
try:
|
||
result = subprocess.run(cmd, check=True, capture_output=True)
|
||
print("Demucs stdout:", result.stdout)
|
||
except subprocess.CalledProcessError as e:
|
||
stderr_str = "No stderr output"
|
||
if e.stderr:
|
||
try:
|
||
stderr_str = e.stderr.decode('utf-8')
|
||
except UnicodeDecodeError:
|
||
stderr_str = e.stderr.decode(sys.getdefaultencoding(), errors='replace')
|
||
print("Demucs stderr:", stderr_str)
|
||
raise RuntimeError(f"Demucs 人聲分離失敗: {stderr_str}")
|
||
original_filename_base = os.path.splitext(os.path.basename(audio_path))[0]
|
||
vocals_path = os.path.join(output_dir, 'htdemucs_ft', original_filename_base, 'vocals.wav')
|
||
if not os.path.exists(vocals_path):
|
||
raise FileNotFoundError(f"找不到 Demucs 分離出的人聲音訊檔案: {vocals_path}")
|
||
return vocals_path
|
||
|
||
# ========== Task 1: 影片轉音訊 ==========
|
||
@celery.task(base=ProgressTask, bind=True)
|
||
def extract_audio_task(self, input_video_path, output_audio_path):
|
||
try:
|
||
self.update_progress(0, 100, "準備轉換影片...")
|
||
video_clip = VideoFileClip(input_video_path)
|
||
self.update_progress(50, 100, "正在提取音訊...")
|
||
video_clip.audio.write_audiofile(output_audio_path, codec="pcm_s16le")
|
||
video_clip.close()
|
||
self.update_progress(100, 100, "音訊提取完成!")
|
||
return {'status': '完成', 'result_path': output_audio_path}
|
||
except Exception as e:
|
||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||
return {'status': '錯誤', 'error': str(e)}
|
||
|
||
# ========== Task 2: 音訊轉文字 (Whisper) ==========
|
||
@celery.task(base=ProgressTask, bind=True)
|
||
def transcribe_audio_task(self, input_audio_path, output_txt_path, language, use_demucs=False):
|
||
try:
|
||
self.update_progress(0, 100, "準備開始轉錄...")
|
||
current_audio_path = input_audio_path
|
||
if use_demucs:
|
||
project_root = os.path.dirname(os.path.dirname(output_txt_path)) # uploads folder is inside project_root
|
||
current_audio_path = separate_vocals_with_demucs(self, input_audio_path, project_root)
|
||
self.update_progress(25, 100, "✅ 人聲分離完成,準備載入 Whisper...")
|
||
|
||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||
progress_after_load = 40 if use_demucs else 20
|
||
self.update_progress(progress_after_load - 10, 100, f"載入 Whisper 'medium' 模型 (使用 {device})...")
|
||
model = whisper.load_model("medium", device=device)
|
||
self.update_progress(progress_after_load, 100, "模型載入完畢,開始轉錄音訊...")
|
||
transcription_result = model.transcribe(
|
||
audio=current_audio_path,
|
||
language=language if language != 'auto' else None,
|
||
fp16=(device == "cuda"),
|
||
verbose=False
|
||
)
|
||
self.update_progress(85, 100, "轉錄完成,進行繁體轉換與格式化...")
|
||
cc = OpenCC('s2twp')
|
||
with open(output_txt_path, "w", encoding="utf-8") as f_out:
|
||
for segment in transcription_result["segments"]:
|
||
start_time_s = int(segment["start"])
|
||
end_time_s = int(segment["end"])
|
||
text_content = cc.convert(segment["text"].strip())
|
||
formatted_line = f"[{start_time_s:04d}s - {end_time_s:04d}s] {text_content}\n"
|
||
f_out.write(formatted_line)
|
||
self.update_progress(100, 100, "逐字稿完成!")
|
||
return {'status': '完成', 'result_path': output_txt_path}
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||
return {'status': '錯誤', 'error': str(e)}
|
||
|
||
# ========== Task 3: 逐段翻譯 (Dify) ==========
|
||
glossary = {"LLM": "大型語言模型", "prompt": "提示詞", "API": "應用程式介面"}
|
||
def generate_translation_prompt_for_dify(english_text_segment, target_language):
|
||
glossary_entries = "\n".join([f"- {k}: {v}" for k, v in glossary.items()])
|
||
return (
|
||
f"動作: 翻譯\n目標語言: {target_language}\n參考字典:\n{glossary_entries}\n\n"
|
||
f"需翻譯內容or需總結內容:\n---\n{english_text_segment}\n---\n你的結果:\n"
|
||
)
|
||
def split_segments(text_content):
|
||
pattern = re.compile(r"(\[\d{4}s\s*-\s*\d{4}s\])(.*?)(?=\n\[\d{4}s|$)", re.DOTALL)
|
||
return [f"{match.group(1).strip()} {match.group(2).strip()}" for match in pattern.finditer(text_content)]
|
||
def parse_segment(segment_text):
|
||
match = re.match(r"(\[\d{4}s\s*-\s*\d{4}s\])\s*(.*)", segment_text.strip(), re.DOTALL)
|
||
return (match.group(1), match.group(2)) if match else (None, segment_text.strip())
|
||
|
||
@celery.task(base=ProgressTask, bind=True)
|
||
def translate_segments_task(self, input_txt_path, output_txt_path, target_language):
|
||
try:
|
||
with open(input_txt_path, "r", encoding="utf-8") as f:
|
||
segments = split_segments(f.read())
|
||
total_segments = len(segments)
|
||
if total_segments == 0:
|
||
self.update_progress(100, 100, "完成,但輸入檔無內容。")
|
||
return {'status': '完成', 'result_path': None, 'content': '(輸入檔案為空)'}
|
||
conversation_id = None
|
||
full_translated_content = ""
|
||
for idx, segment_text in enumerate(segments):
|
||
timestamp, original_text = parse_segment(segment_text)
|
||
status_msg = f"正在翻譯第 {idx + 1}/{total_segments} 段..."
|
||
self.update_progress(idx, total_segments, status_msg, {'preview': full_translated_content})
|
||
if not original_text.strip():
|
||
translated_text = "(原始內容為空)"
|
||
else:
|
||
prompt = generate_translation_prompt_for_dify(original_text, target_language)
|
||
response = ask_dify(prompt, conversation_id=conversation_id)
|
||
translated_text = response.get("answer", "翻譯失敗")
|
||
conversation_id = response.get("conversation_id")
|
||
line_break = "\n\n" if full_translated_content else ""
|
||
segment_result = f"{timestamp}\n{original_text}\n👉{translated_text}"
|
||
full_translated_content += f"{line_break}{segment_result}"
|
||
with open(output_txt_path, "w", encoding="utf-8") as f:
|
||
f.write(full_translated_content)
|
||
self.update_progress(total_segments, total_segments, "全部翻譯完成!")
|
||
return {'status': '完成', 'result_path': output_txt_path, 'content': full_translated_content}
|
||
except Exception as e:
|
||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||
return {'status': '錯誤', 'error': str(e)}
|
||
|
||
# ========== Task 4: 會議結論整理 (Dify) ==========
|
||
@celery.task(base=ProgressTask, bind=True)
|
||
def summarize_text_task(self, text_content, target_language, conversation_id=None, revision_instruction=None):
|
||
try:
|
||
self.update_progress(1, 100, "準備提示詞...")
|
||
if revision_instruction:
|
||
prompt = f"Existing Summary:\n---\n{text_content}\n---\n\nUser's Revision Instructions:\n---\n{revision_instruction}\n---\nPlease provide the revised meeting summary in {target_language}:"
|
||
else:
|
||
prompt = f"Please act as a professional meeting analyst and summarize the following meeting transcript into key points, action items, and conclusions. The summary should be in {target_language}.\n\nTranscript:\n---\n{text_content}\n---"
|
||
self.update_progress(20, 100, "正在請求 Dify API...")
|
||
response = ask_dify(prompt, conversation_id=conversation_id)
|
||
summary = response.get("answer", "總結失敗")
|
||
new_conv_id = response.get("conversation_id")
|
||
self.update_progress(100, 100, "總結已生成!")
|
||
return {'status': '完成', 'summary': summary, 'conversation_id': new_conv_id}
|
||
except Exception as e:
|
||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||
return {'status': '錯誤', 'error': str(e)}
|