Files
beabigegg 788e2409df fix6
2025-08-17 18:34:38 +08:00

432 lines
18 KiB
Python

import os
import uuid
import requests
import json
import re
import io
from datetime import timedelta
from pydub import AudioSegment
from pydub.silence import split_on_silence
from moviepy import VideoFileClip
from celery_app import celery
from models import db, Meeting
import math
from celery import Task
class ProgressTask(Task):
def update_progress(self, current, total, status_msg, extra_info=None):
meta = {'current': current, 'total': total, 'status_msg': status_msg}
if extra_info and isinstance(extra_info, dict):
meta.update(extra_info)
self.update_state(state='PROGRESS', meta=meta)
# --- Dify Helper Functions ---
def ask_dify(api_key: str, query: str, **kwargs):
from flask import current_app
DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL")
if not api_key or not DIFY_API_BASE_URL:
return {"answer": "Error: DIFY API settings not configured."}
url = f"{DIFY_API_BASE_URL}/chat-messages"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {
"inputs": kwargs.get("inputs", {}),
"query": query,
"user": kwargs.get("user_id", "default-tk-user"),
"response_mode": kwargs.get("response_mode", "blocking"),
"conversation_id": kwargs.get("conversation_id")
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=kwargs.get("timeout_seconds", 1200))
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"answer": f"Dify API request error: {e}"}
def upload_chunk_to_dify(blob: bytes, filename="chunk.mp4"):
from flask import current_app
DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL")
API_KEY = current_app.config.get("DIFY_STT_API_KEY")
print(f"DEBUG: In upload_chunk_to_dify, using DIFY_STT_API_KEY: '{API_KEY}'") # FINAL DEBUG
r = requests.post(
f"{DIFY_API_BASE_URL}/files/upload",
headers={"Authorization": f"Bearer {API_KEY}"},
files={"file": (filename, io.BytesIO(blob), "audio/mp4")},
data={"user": "ai-meeting-assistant-user"},
timeout=300
)
print("Dify File Upload API Response:", r.status_code, r.text) # DEBUG PRINT
r.raise_for_status()
return r.json()["id"]
def run_dify_stt_chat_app(file_id: str) -> str:
"""
透過呼叫 Dify 的 chat-messages API 來執行語音轉文字。
適用於「進階對話型」應用。
"""
from flask import current_app
DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL")
API_KEY = current_app.config.get("DIFY_STT_API_KEY")
payload = {
"inputs": {},
"query": "請將音檔轉換為文字",
"user": "ai-meeting-assistant-user",
"response_mode": "blocking",
"files": [
{
"type": "audio",
"transfer_method": "local_file",
"upload_file_id": file_id
}
]
}
r = requests.post(
f"{DIFY_API_BASE_URL}/chat-messages",
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
json=payload,
timeout=1800
)
print("Dify STT API Response:", r.status_code, r.text) # DEBUG PRINT
r.raise_for_status()
j = r.json()
# 在對話型應用中,答案通常在 'answer' 欄位
return j.get("answer", "").strip()
# --- Timestamp & Audio Chunking Helpers ---
def format_timestamp_from_ms(ms: int) -> str:
td = timedelta(milliseconds=ms)
total_seconds = td.total_seconds()
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
milliseconds = td.microseconds // 1000
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}.{milliseconds:03d}"
def export_audio_bytes(seg: AudioSegment) -> bytes:
buf = io.BytesIO()
seg.export(buf, format="mp4", bitrate="64k")
buf.seek(0)
return buf.read()
def ensure_chunk_under_limits(seg: AudioSegment):
MAX_SEG_MS = 20 * 60 * 1000 # Max duration 20 minutes
MAX_BYTES = 24 * 1024 * 1024 # Dify file limit is 25MB, use 24MB as a safe buffer
if len(export_audio_bytes(seg)) < MAX_BYTES:
return [seg]
# If segment is too large, split by duration first
parts = []
start_ms = 0
while start_ms < len(seg):
end_ms = min(start_ms + MAX_SEG_MS, len(seg))
parts.append(seg[start_ms:end_ms])
start_ms = end_ms
# Then, double-check byte size for each part
final_parts = []
for p in parts:
if len(export_audio_bytes(p)) < MAX_BYTES:
final_parts.append(p)
else:
# Fallback for rare cases: split in half until compliant
sub_parts = [p]
while sub_parts:
current_part = sub_parts.pop(0)
if len(export_audio_bytes(current_part)) < MAX_BYTES:
final_parts.append(current_part)
continue
mid_point = len(current_part) // 2
if mid_point < 1000: # Stop splitting if it's less than a second
final_parts.append(current_part)
continue
sub_parts.extend([current_part[:mid_point], current_part[mid_point:]])
return final_parts
# --- Celery Tasks ---
@celery.task(base=ProgressTask, bind=True)
def extract_audio_task(self, input_path, output_path):
try:
self.update_progress(0, 100, "Starting audio extraction...")
with VideoFileClip(input_path) as video:
video.audio.write_audiofile(output_path)
self.update_progress(100, 100, "Audio extracted successfully.")
return {'status': 'Success', 'result_path': output_path}
except Exception as e:
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
@celery.task(base=ProgressTask, bind=True)
def transcribe_audio_task(self, audio_path):
from app import app
with app.app_context():
try:
self.update_progress(0, 100, "Loading and preparing audio file...")
audio = AudioSegment.from_file(audio_path)
# 1. Split audio by silence
self.update_progress(5, 100, "Detecting silence to split audio into chunks...")
chunks = split_on_silence(
audio,
min_silence_len=700,
silence_thresh=-40,
keep_silence=300
)
if not chunks: # If no silence is detected, treat the whole audio as one chunk
chunks = [audio]
# 2. Process chunks and ensure they are within API limits
final_segments = []
cursor_ms = 0
for chunk in chunks:
start_time = cursor_ms
end_time = start_time + len(chunk)
safe_parts = ensure_chunk_under_limits(chunk)
part_start_time = start_time
for part in safe_parts:
part_end_time = part_start_time + len(part)
final_segments.append({
"start": part_start_time,
"end": part_end_time,
"segment": part
})
part_start_time = part_end_time
cursor_ms = end_time
# 3. Upload chunks to Dify and get transcriptions
transcribed_lines = []
total_segments = len(final_segments)
for i, seg_data in enumerate(final_segments):
progress = 10 + int((i / total_segments) * 85)
self.update_progress(progress, 100, f"Processing chunk {i+1} of {total_segments}...")
audio_bytes = export_audio_bytes(seg_data["segment"])
file_id = upload_chunk_to_dify(audio_bytes, f"chunk_{i+1}.mp4")
transcribed_text = run_dify_stt_chat_app(file_id).strip()
if transcribed_text:
start_ts = format_timestamp_from_ms(seg_data["start"])
end_ts = format_timestamp_from_ms(seg_data["end"])
transcribed_lines.append(f"[{start_ts} - {end_ts}] {transcribed_text}")
# 4. Finalize and save the result
self.update_progress(98, 100, "Finalizing transcript...")
full_content = "\n".join(transcribed_lines)
transcript_filename = f"transcript_{uuid.uuid4()}.txt"
output_txt_path = os.path.join(app.config['UPLOAD_FOLDER'], transcript_filename)
with open(output_txt_path, "w", encoding="utf-8") as f:
f.write(full_content)
self.update_progress(100, 100, "Transcription complete.")
return {'status': 'Success', 'content': full_content, 'result_path': transcript_filename}
except Exception as e:
error_message = f"An error occurred: {str(e)}"
self.update_state(
state='FAILURE',
meta={'exc_type': type(e).__name__, 'exc_message': error_message}
)
return {'status': 'Error', 'error': error_message}
@celery.task(base=ProgressTask, bind=True)
def translate_text_task(self, text_content, target_language):
from app import app
with app.app_context():
from services.dify_client import translate_text as dify_translate
try:
self.update_progress(0, 100, f"Starting translation to {target_language}...")
if isinstance(text_content, dict):
text_content = text_content.get('content', '')
if not text_content or not isinstance(text_content, str):
self.update_progress(100, 100, "Translation skipped due to empty input.")
return {'status': 'Success', 'content': '', 'result_path': None, 'message': 'Input was empty.'}
lines = text_content.strip().split('\n')
final_content = ""
timestamp_pattern = re.compile(r'^(\s*\[\d{2}:\d{2}:\d{2}\.\d{3}\s-\s\d{2}:\d{2}:\d{2}\.\d{3}\])\s*(.*)')
is_timestamped_input = False
if lines:
if timestamp_pattern.match(lines[0]):
is_timestamped_input = True
if is_timestamped_input:
translated_lines = []
total_lines = len(lines)
for i, line in enumerate(lines):
match = timestamp_pattern.match(line)
if not match:
if line.strip(): # Keep non-matching, non-empty lines
translated_lines.append(line)
continue
timestamp = match.group(1)
original_text = match.group(2)
# Add original line
translated_lines.append(line)
if not original_text.strip():
continue
translated_text = dify_translate(text=original_text, target_lang=target_language)
# Add translated line, preserving the timestamp
translated_lines.append(f"{timestamp} {translated_text}")
progress = int(((i + 1) / total_lines) * 100)
self.update_progress(progress, 100, f"Translating line {i+1}/{total_lines}...")
final_content = "\n".join(translated_lines)
else:
# Handle non-timestamped text line by line for bilingual output
translated_lines = []
total_lines = len(lines)
for i, line in enumerate(lines):
progress = int(((i + 1) / total_lines) * 98) # Leave some room for finalization
self.update_progress(progress, 100, f"Translating line {i+1}/{total_lines}...")
original_line = line.strip()
if not original_line:
continue
translated_text = dify_translate(text=original_line, target_lang=target_language)
translated_lines.append(original_line)
translated_lines.append(translated_text)
translated_lines.append("") # Add a blank line for readability
final_content = "\n".join(translated_lines)
translated_filename = f"translated_{uuid.uuid4()}.txt"
upload_folder = app.config.get('UPLOAD_FOLDER', 'uploads')
output_txt_path = os.path.join(upload_folder, translated_filename)
os.makedirs(upload_folder, exist_ok=True)
with open(output_txt_path, "w", encoding="utf-8") as f:
f.write(final_content)
self.update_progress(100, 100, "Translation complete.")
return {'status': 'Success', 'content': final_content, 'result_path': translated_filename}
except Exception as e:
self.update_state(
state='FAILURE',
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
)
@celery.task(bind=True)
def summarize_text_task(self, meeting_id):
from app import app
with app.app_context():
try:
meeting = Meeting.query.get(meeting_id)
if not meeting or not meeting.transcript:
self.update_state(state='FAILURE', meta={'error': 'Meeting or transcript not found'})
return {'status': 'Error', 'error': 'Meeting or transcript not found'}
api_key = app.config.get("DIFY_SUMMARIZER_API_KEY")
plain_transcript = re.sub(r'^(\s*\[.*?\])\s*', '', meeting.transcript, flags=re.MULTILINE)
if not plain_transcript.strip():
self.update_state(state='FAILURE', meta={'error': 'Transcript is empty.'})
return {'status': 'Error', 'error': 'Transcript is empty'}
response = ask_dify(api_key, plain_transcript)
summary = response.get("answer")
if not summary:
self.update_state(state='FAILURE', meta={'error': 'Dify returned empty summary.'})
return {'status': 'Error', 'error': 'Dify returned empty summary.'}
meeting.summary = summary
db.session.commit()
return {'status': 'Success', 'summary': summary}
except Exception as e:
db.session.rollback()
self.update_state(
state='FAILURE',
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
)
return {'status': 'Error', 'error': str(e)}
@celery.task(base=ProgressTask, bind=True)
def preview_action_items_task(self, text_content):
from app import app
with app.app_context():
from services.dify_client import extract_action_items
try:
self.update_progress(10, 100, "Requesting Dify for action items...")
# We can reuse the logic from the summarizer to strip timestamps
plain_text = re.sub(r'^(\s*\[.*?\])\s*', '', text_content, flags=re.MULTILINE)
if not plain_text.strip():
self.update_progress(100, 100, "Preview skipped, content is empty.")
return {'status': 'Success', 'items': []}
# Use the robust client function
parsed_items = extract_action_items(plain_text)
self.update_progress(100, 100, "Action item preview generated.")
return {'status': 'Success', 'items': parsed_items}
except Exception as e:
# Log the exception for better debugging
import logging
logging.error(f"Error in preview_action_items_task: {e}", exc_info=True)
self.update_state(
state='FAILURE',
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
)
return {'status': 'Error', 'error': str(e)}
@celery.task(bind=True)
def process_meeting_flow(self, meeting_id, target_language=None):
from app import app
with app.app_context():
meeting = Meeting.query.get(meeting_id)
if not meeting:
return {'status': 'Error', 'error': 'Meeting not found'}
try:
meeting.status = 'processing'
db.session.commit()
upload_folder = app.config['UPLOAD_FOLDER']
# Assuming filename exists on the meeting object
audio_path = os.path.join(upload_folder, meeting.filename.replace('.mp4', '.wav'))
# The call to transcribe_audio_task needs to be updated as it no longer takes 'language'
transcript_result = transcribe_audio_task.apply(args=[audio_path]).get()
if transcript_result.get('status') != 'Success':
raise Exception(f"Transcription failed: {transcript_result.get('error', 'Unknown error')}")
meeting.transcript = transcript_result['content']
db.session.commit()
if target_language:
translation_result = translate_text_task.apply(args=[meeting.transcript, target_language]).get()
if translation_result.get('status') != 'Success':
raise Exception(f"Translation failed: {translation_result.get('error', 'Unknown error')}")
meeting.translated_transcript = translation_result['content']
db.session.commit()
summary_result = summarize_text_task.apply(args=[meeting.id]).get()
if summary_result.get('status') != 'Success':
raise Exception(f"Summarization failed: {summary_result.get('error', 'Unknown error')}")
meeting.summary = summary_result['summary']
meeting.status = 'completed'
db.session.commit()
return {'status': 'Success', 'meeting_id': meeting.id}
except Exception as e:
meeting.status = 'failed'
db.session.commit()
return {'status': 'Error', 'error': str(e)}