first_upload
This commit is contained in:
248
tasks.py
Normal file
248
tasks.py
Normal file
@@ -0,0 +1,248 @@
|
||||
import os
|
||||
import sys
|
||||
import whisper
|
||||
import torch
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import re
|
||||
import json
|
||||
import requests
|
||||
from celery import Celery, Task
|
||||
from opencc import OpenCC
|
||||
from moviepy import VideoFileClip
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables from .env file
|
||||
load_dotenv()
|
||||
|
||||
# ========== Dify API Configuration ==========
|
||||
DIFY_API_KEY = os.environ.get("DIFY_API_KEY")
|
||||
DIFY_API_BASE_URL = os.environ.get("DIFY_API_BASE_URL")
|
||||
|
||||
# ========== Dify API Client Function ==========
|
||||
def ask_dify(prompt: str, user_id: str = "default-tk-user-resume", inputs: dict = None, response_mode: str = "streaming", conversation_id: str = None, timeout_seconds: int = 1200) -> dict:
|
||||
if not DIFY_API_KEY or not DIFY_API_BASE_URL:
|
||||
return {"answer": "❌ 錯誤:DIFY_API_KEY 或 DIFY_API_BASE_URL 未在環境變數中設定。", "conversation_id": conversation_id}
|
||||
|
||||
if inputs is None:
|
||||
inputs = {}
|
||||
url = f"{DIFY_API_BASE_URL}/chat-messages"
|
||||
headers = {"Authorization": f"Bearer {DIFY_API_KEY}", "Content-Type": "application/json"}
|
||||
payload = {"inputs": inputs, "query": prompt, "user": user_id, "response_mode": response_mode}
|
||||
if conversation_id:
|
||||
payload["conversation_id"] = conversation_id
|
||||
returned_conversation_id = conversation_id
|
||||
error_from_stream_message = None
|
||||
print(f"\n--- [ASK_DIFY] Sending request to Dify ---")
|
||||
try:
|
||||
is_streaming_request = (response_mode == "streaming")
|
||||
response = requests.post(url, headers=headers, json=payload, timeout=timeout_seconds, stream=is_streaming_request)
|
||||
response.raise_for_status()
|
||||
if is_streaming_request:
|
||||
full_answer_chunks = []
|
||||
for line in response.iter_lines():
|
||||
if line:
|
||||
decoded_line = line.decode('utf-8')
|
||||
if decoded_line.startswith("data:"):
|
||||
try:
|
||||
data_json_str = decoded_line[len("data:"):]
|
||||
data_obj = json.loads(data_json_str)
|
||||
event_type = data_obj.get("event")
|
||||
if event_type == "agent_message" or event_type == "message":
|
||||
if "answer" in data_obj and data_obj["answer"] is not None:
|
||||
full_answer_chunks.append(data_obj["answer"])
|
||||
elif event_type == "message_end":
|
||||
if "conversation_id" in data_obj:
|
||||
returned_conversation_id = data_obj["conversation_id"]
|
||||
break
|
||||
elif event_type == "error":
|
||||
error_from_stream_message = data_obj.get('message', 'Dify API 返回未知的流式錯誤')
|
||||
returned_conversation_id = data_obj.get("conversation_id", returned_conversation_id)
|
||||
break
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
if error_from_stream_message:
|
||||
clean_error_answer = re.sub(r"<think>.*?</think>\s*", "", error_from_stream_message, flags=re.DOTALL).strip()
|
||||
return {"answer": f"❌ Dify API 流處理錯誤: {clean_error_answer}", "conversation_id": returned_conversation_id}
|
||||
raw_answer = "".join(full_answer_chunks) if full_answer_chunks else "⚠️ 流式響應未包含有效回答或內容為空"
|
||||
else:
|
||||
response_data = response.json()
|
||||
raw_answer = response_data.get("answer", "⚠️ 回應中未找到 'answer' 欄位或內容為空")
|
||||
returned_conversation_id = response_data.get("conversation_id", returned_conversation_id)
|
||||
clean_answer = re.sub(r"<think>.*?</think>\s*", "", raw_answer, flags=re.DOTALL).strip()
|
||||
return {"answer": clean_answer, "conversation_id": returned_conversation_id}
|
||||
except requests.exceptions.Timeout:
|
||||
return {"answer": f"⚠️ 請求 Dify API 逾時 (超過 {timeout_seconds} 秒)", "conversation_id": conversation_id}
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
error_message_detail = f" - 原始響應: {http_err.response.text}"
|
||||
final_error_message = f"❌ Dify API HTTP 錯誤: {http_err.response.status_code}{error_message_detail}"
|
||||
return {"answer": final_error_message, "conversation_id": conversation_id}
|
||||
except Exception as e:
|
||||
return {"answer": f"❌ 處理 Dify API 請求或響應時發生未預期錯誤: {type(e).__name__}: {str(e)}", "conversation_id": conversation_id}
|
||||
# ========== Dify API Client Function END ==========
|
||||
|
||||
|
||||
# ========== Celery 設定 ==========
|
||||
celery = Celery(
|
||||
'tasks',
|
||||
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
|
||||
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
|
||||
)
|
||||
|
||||
class ProgressTask(Task):
|
||||
def update_progress(self, current, total, status, extra_info=None):
|
||||
meta = {'current': current, 'total': total, 'status': status}
|
||||
if extra_info:
|
||||
meta.update(extra_info)
|
||||
self.update_state(state='PROGRESS', meta=meta)
|
||||
|
||||
# ========== Demucs 輔助函式 ==========
|
||||
def separate_vocals_with_demucs(self, audio_path, project_root):
|
||||
self.update_progress(10, 100, "🎛️ 使用 Demucs 分離人聲...")
|
||||
output_dir = os.path.join(project_root, 'demucs_separated')
|
||||
cmd = [
|
||||
sys.executable, '-m', 'demucs.separate', # <-- 將 'python' 改為 sys.executable
|
||||
'-n', 'htdemucs_ft',
|
||||
'--two-stems=vocals',
|
||||
'-o', output_dir,
|
||||
audio_path
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, check=True, capture_output=True)
|
||||
print("Demucs stdout:", result.stdout)
|
||||
except subprocess.CalledProcessError as e:
|
||||
stderr_str = "No stderr output"
|
||||
if e.stderr:
|
||||
try:
|
||||
stderr_str = e.stderr.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
stderr_str = e.stderr.decode(sys.getdefaultencoding(), errors='replace')
|
||||
print("Demucs stderr:", stderr_str)
|
||||
raise RuntimeError(f"Demucs 人聲分離失敗: {stderr_str}")
|
||||
original_filename_base = os.path.splitext(os.path.basename(audio_path))[0]
|
||||
vocals_path = os.path.join(output_dir, 'htdemucs_ft', original_filename_base, 'vocals.wav')
|
||||
if not os.path.exists(vocals_path):
|
||||
raise FileNotFoundError(f"找不到 Demucs 分離出的人聲音訊檔案: {vocals_path}")
|
||||
return vocals_path
|
||||
|
||||
# ========== Task 1: 影片轉音訊 ==========
|
||||
@celery.task(base=ProgressTask, bind=True)
|
||||
def extract_audio_task(self, input_video_path, output_audio_path):
|
||||
try:
|
||||
self.update_progress(0, 100, "準備轉換影片...")
|
||||
video_clip = VideoFileClip(input_video_path)
|
||||
self.update_progress(50, 100, "正在提取音訊...")
|
||||
video_clip.audio.write_audiofile(output_audio_path, codec="pcm_s16le")
|
||||
video_clip.close()
|
||||
self.update_progress(100, 100, "音訊提取完成!")
|
||||
return {'status': '完成', 'result_path': output_audio_path}
|
||||
except Exception as e:
|
||||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||||
return {'status': '錯誤', 'error': str(e)}
|
||||
|
||||
# ========== Task 2: 音訊轉文字 (Whisper) ==========
|
||||
@celery.task(base=ProgressTask, bind=True)
|
||||
def transcribe_audio_task(self, input_audio_path, output_txt_path, language, use_demucs=False):
|
||||
try:
|
||||
self.update_progress(0, 100, "準備開始轉錄...")
|
||||
current_audio_path = input_audio_path
|
||||
if use_demucs:
|
||||
project_root = os.path.dirname(os.path.dirname(output_txt_path)) # uploads folder is inside project_root
|
||||
current_audio_path = separate_vocals_with_demucs(self, input_audio_path, project_root)
|
||||
self.update_progress(25, 100, "✅ 人聲分離完成,準備載入 Whisper...")
|
||||
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
progress_after_load = 40 if use_demucs else 20
|
||||
self.update_progress(progress_after_load - 10, 100, f"載入 Whisper 'medium' 模型 (使用 {device})...")
|
||||
model = whisper.load_model("medium", device=device)
|
||||
self.update_progress(progress_after_load, 100, "模型載入完畢,開始轉錄音訊...")
|
||||
transcription_result = model.transcribe(
|
||||
audio=current_audio_path,
|
||||
language=language if language != 'auto' else None,
|
||||
fp16=(device == "cuda"),
|
||||
verbose=False
|
||||
)
|
||||
self.update_progress(85, 100, "轉錄完成,進行繁體轉換與格式化...")
|
||||
cc = OpenCC('s2twp')
|
||||
with open(output_txt_path, "w", encoding="utf-8") as f_out:
|
||||
for segment in transcription_result["segments"]:
|
||||
start_time_s = int(segment["start"])
|
||||
end_time_s = int(segment["end"])
|
||||
text_content = cc.convert(segment["text"].strip())
|
||||
formatted_line = f"[{start_time_s:04d}s - {end_time_s:04d}s] {text_content}\n"
|
||||
f_out.write(formatted_line)
|
||||
self.update_progress(100, 100, "逐字稿完成!")
|
||||
return {'status': '完成', 'result_path': output_txt_path}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||||
return {'status': '錯誤', 'error': str(e)}
|
||||
|
||||
# ========== Task 3: 逐段翻譯 (Dify) ==========
|
||||
glossary = {"LLM": "大型語言模型", "prompt": "提示詞", "API": "應用程式介面"}
|
||||
def generate_translation_prompt_for_dify(english_text_segment, target_language):
|
||||
glossary_entries = "\n".join([f"- {k}: {v}" for k, v in glossary.items()])
|
||||
return (
|
||||
f"動作: 翻譯\n目標語言: {target_language}\n參考字典:\n{glossary_entries}\n\n"
|
||||
f"需翻譯內容or需總結內容:\n---\n{english_text_segment}\n---\n你的結果:\n"
|
||||
)
|
||||
def split_segments(text_content):
|
||||
pattern = re.compile(r"(\[\d{4}s\s*-\s*\d{4}s\])(.*?)(?=\n\[\d{4}s|$)", re.DOTALL)
|
||||
return [f"{match.group(1).strip()} {match.group(2).strip()}" for match in pattern.finditer(text_content)]
|
||||
def parse_segment(segment_text):
|
||||
match = re.match(r"(\[\d{4}s\s*-\s*\d{4}s\])\s*(.*)", segment_text.strip(), re.DOTALL)
|
||||
return (match.group(1), match.group(2)) if match else (None, segment_text.strip())
|
||||
|
||||
@celery.task(base=ProgressTask, bind=True)
|
||||
def translate_segments_task(self, input_txt_path, output_txt_path, target_language):
|
||||
try:
|
||||
with open(input_txt_path, "r", encoding="utf-8") as f:
|
||||
segments = split_segments(f.read())
|
||||
total_segments = len(segments)
|
||||
if total_segments == 0:
|
||||
self.update_progress(100, 100, "完成,但輸入檔無內容。")
|
||||
return {'status': '完成', 'result_path': None, 'content': '(輸入檔案為空)'}
|
||||
conversation_id = None
|
||||
full_translated_content = ""
|
||||
for idx, segment_text in enumerate(segments):
|
||||
timestamp, original_text = parse_segment(segment_text)
|
||||
status_msg = f"正在翻譯第 {idx + 1}/{total_segments} 段..."
|
||||
self.update_progress(idx, total_segments, status_msg, {'preview': full_translated_content})
|
||||
if not original_text.strip():
|
||||
translated_text = "(原始內容為空)"
|
||||
else:
|
||||
prompt = generate_translation_prompt_for_dify(original_text, target_language)
|
||||
response = ask_dify(prompt, conversation_id=conversation_id)
|
||||
translated_text = response.get("answer", "翻譯失敗")
|
||||
conversation_id = response.get("conversation_id")
|
||||
line_break = "\n\n" if full_translated_content else ""
|
||||
segment_result = f"{timestamp}\n{original_text}\n👉{translated_text}"
|
||||
full_translated_content += f"{line_break}{segment_result}"
|
||||
with open(output_txt_path, "w", encoding="utf-8") as f:
|
||||
f.write(full_translated_content)
|
||||
self.update_progress(total_segments, total_segments, "全部翻譯完成!")
|
||||
return {'status': '完成', 'result_path': output_txt_path, 'content': full_translated_content}
|
||||
except Exception as e:
|
||||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||||
return {'status': '錯誤', 'error': str(e)}
|
||||
|
||||
# ========== Task 4: 會議結論整理 (Dify) ==========
|
||||
@celery.task(base=ProgressTask, bind=True)
|
||||
def summarize_text_task(self, text_content, target_language, conversation_id=None, revision_instruction=None):
|
||||
try:
|
||||
self.update_progress(1, 100, "準備提示詞...")
|
||||
if revision_instruction:
|
||||
prompt = f"Existing Summary:\n---\n{text_content}\n---\n\nUser's Revision Instructions:\n---\n{revision_instruction}\n---\nPlease provide the revised meeting summary in {target_language}:"
|
||||
else:
|
||||
prompt = f"Please act as a professional meeting analyst and summarize the following meeting transcript into key points, action items, and conclusions. The summary should be in {target_language}.\n\nTranscript:\n---\n{text_content}\n---"
|
||||
self.update_progress(20, 100, "正在請求 Dify API...")
|
||||
response = ask_dify(prompt, conversation_id=conversation_id)
|
||||
summary = response.get("answer", "總結失敗")
|
||||
new_conv_id = response.get("conversation_id")
|
||||
self.update_progress(100, 100, "總結已生成!")
|
||||
return {'status': '完成', 'summary': summary, 'conversation_id': new_conv_id}
|
||||
except Exception as e:
|
||||
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
||||
return {'status': '錯誤', 'error': str(e)}
|
Reference in New Issue
Block a user