432 lines
18 KiB
Python
432 lines
18 KiB
Python
import os
|
|
import uuid
|
|
import requests
|
|
import json
|
|
import re
|
|
import io
|
|
from datetime import timedelta
|
|
from pydub import AudioSegment
|
|
from pydub.silence import split_on_silence
|
|
from moviepy import VideoFileClip
|
|
from celery_app import celery
|
|
from models import db, Meeting
|
|
import math
|
|
|
|
from celery import Task
|
|
|
|
class ProgressTask(Task):
|
|
def update_progress(self, current, total, status_msg, extra_info=None):
|
|
meta = {'current': current, 'total': total, 'status_msg': status_msg}
|
|
if extra_info and isinstance(extra_info, dict):
|
|
meta.update(extra_info)
|
|
self.update_state(state='PROGRESS', meta=meta)
|
|
|
|
# --- Dify Helper Functions ---
|
|
def ask_dify(api_key: str, query: str, **kwargs):
|
|
from flask import current_app
|
|
DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL")
|
|
if not api_key or not DIFY_API_BASE_URL:
|
|
return {"answer": "Error: DIFY API settings not configured."}
|
|
|
|
url = f"{DIFY_API_BASE_URL}/chat-messages"
|
|
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
|
|
payload = {
|
|
"inputs": kwargs.get("inputs", {}),
|
|
"query": query,
|
|
"user": kwargs.get("user_id", "default-tk-user"),
|
|
"response_mode": kwargs.get("response_mode", "blocking"),
|
|
"conversation_id": kwargs.get("conversation_id")
|
|
}
|
|
try:
|
|
response = requests.post(url, headers=headers, json=payload, timeout=kwargs.get("timeout_seconds", 1200))
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
return {"answer": f"Dify API request error: {e}"}
|
|
|
|
def upload_chunk_to_dify(blob: bytes, filename="chunk.mp4"):
|
|
from flask import current_app
|
|
DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL")
|
|
API_KEY = current_app.config.get("DIFY_STT_API_KEY")
|
|
print(f"DEBUG: In upload_chunk_to_dify, using DIFY_STT_API_KEY: '{API_KEY}'") # FINAL DEBUG
|
|
|
|
r = requests.post(
|
|
f"{DIFY_API_BASE_URL}/files/upload",
|
|
headers={"Authorization": f"Bearer {API_KEY}"},
|
|
files={"file": (filename, io.BytesIO(blob), "audio/mp4")},
|
|
data={"user": "ai-meeting-assistant-user"},
|
|
timeout=300
|
|
)
|
|
print("Dify File Upload API Response:", r.status_code, r.text) # DEBUG PRINT
|
|
r.raise_for_status()
|
|
return r.json()["id"]
|
|
|
|
def run_dify_stt_chat_app(file_id: str) -> str:
|
|
"""
|
|
透過呼叫 Dify 的 chat-messages API 來執行語音轉文字。
|
|
適用於「進階對話型」應用。
|
|
"""
|
|
from flask import current_app
|
|
DIFY_API_BASE_URL = current_app.config.get("DIFY_API_BASE_URL")
|
|
API_KEY = current_app.config.get("DIFY_STT_API_KEY")
|
|
|
|
payload = {
|
|
"inputs": {},
|
|
"query": "請將音檔轉換為文字",
|
|
"user": "ai-meeting-assistant-user",
|
|
"response_mode": "blocking",
|
|
"files": [
|
|
{
|
|
"type": "audio",
|
|
"transfer_method": "local_file",
|
|
"upload_file_id": file_id
|
|
}
|
|
]
|
|
}
|
|
|
|
r = requests.post(
|
|
f"{DIFY_API_BASE_URL}/chat-messages",
|
|
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
|
|
json=payload,
|
|
timeout=1800
|
|
)
|
|
|
|
print("Dify STT API Response:", r.status_code, r.text) # DEBUG PRINT
|
|
r.raise_for_status()
|
|
|
|
j = r.json()
|
|
# 在對話型應用中,答案通常在 'answer' 欄位
|
|
return j.get("answer", "").strip()
|
|
|
|
# --- Timestamp & Audio Chunking Helpers ---
|
|
def format_timestamp_from_ms(ms: int) -> str:
|
|
td = timedelta(milliseconds=ms)
|
|
total_seconds = td.total_seconds()
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
milliseconds = td.microseconds // 1000
|
|
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}.{milliseconds:03d}"
|
|
|
|
def export_audio_bytes(seg: AudioSegment) -> bytes:
|
|
buf = io.BytesIO()
|
|
seg.export(buf, format="mp4", bitrate="64k")
|
|
buf.seek(0)
|
|
return buf.read()
|
|
|
|
def ensure_chunk_under_limits(seg: AudioSegment):
|
|
MAX_SEG_MS = 20 * 60 * 1000 # Max duration 20 minutes
|
|
MAX_BYTES = 24 * 1024 * 1024 # Dify file limit is 25MB, use 24MB as a safe buffer
|
|
|
|
if len(export_audio_bytes(seg)) < MAX_BYTES:
|
|
return [seg]
|
|
|
|
# If segment is too large, split by duration first
|
|
parts = []
|
|
start_ms = 0
|
|
while start_ms < len(seg):
|
|
end_ms = min(start_ms + MAX_SEG_MS, len(seg))
|
|
parts.append(seg[start_ms:end_ms])
|
|
start_ms = end_ms
|
|
|
|
# Then, double-check byte size for each part
|
|
final_parts = []
|
|
for p in parts:
|
|
if len(export_audio_bytes(p)) < MAX_BYTES:
|
|
final_parts.append(p)
|
|
else:
|
|
# Fallback for rare cases: split in half until compliant
|
|
sub_parts = [p]
|
|
while sub_parts:
|
|
current_part = sub_parts.pop(0)
|
|
if len(export_audio_bytes(current_part)) < MAX_BYTES:
|
|
final_parts.append(current_part)
|
|
continue
|
|
|
|
mid_point = len(current_part) // 2
|
|
if mid_point < 1000: # Stop splitting if it's less than a second
|
|
final_parts.append(current_part)
|
|
continue
|
|
|
|
sub_parts.extend([current_part[:mid_point], current_part[mid_point:]])
|
|
return final_parts
|
|
|
|
# --- Celery Tasks ---
|
|
@celery.task(base=ProgressTask, bind=True)
|
|
def extract_audio_task(self, input_path, output_path):
|
|
try:
|
|
self.update_progress(0, 100, "Starting audio extraction...")
|
|
with VideoFileClip(input_path) as video:
|
|
video.audio.write_audiofile(output_path)
|
|
self.update_progress(100, 100, "Audio extracted successfully.")
|
|
return {'status': 'Success', 'result_path': output_path}
|
|
except Exception as e:
|
|
self.update_state(state='FAILURE', meta={'exc_type': type(e).__name__, 'exc_message': str(e)})
|
|
|
|
@celery.task(base=ProgressTask, bind=True)
|
|
def transcribe_audio_task(self, audio_path):
|
|
from app import app
|
|
with app.app_context():
|
|
try:
|
|
self.update_progress(0, 100, "Loading and preparing audio file...")
|
|
audio = AudioSegment.from_file(audio_path)
|
|
|
|
# 1. Split audio by silence
|
|
self.update_progress(5, 100, "Detecting silence to split audio into chunks...")
|
|
chunks = split_on_silence(
|
|
audio,
|
|
min_silence_len=700,
|
|
silence_thresh=-40,
|
|
keep_silence=300
|
|
)
|
|
if not chunks: # If no silence is detected, treat the whole audio as one chunk
|
|
chunks = [audio]
|
|
|
|
# 2. Process chunks and ensure they are within API limits
|
|
final_segments = []
|
|
cursor_ms = 0
|
|
for chunk in chunks:
|
|
start_time = cursor_ms
|
|
end_time = start_time + len(chunk)
|
|
|
|
safe_parts = ensure_chunk_under_limits(chunk)
|
|
part_start_time = start_time
|
|
for part in safe_parts:
|
|
part_end_time = part_start_time + len(part)
|
|
final_segments.append({
|
|
"start": part_start_time,
|
|
"end": part_end_time,
|
|
"segment": part
|
|
})
|
|
part_start_time = part_end_time
|
|
cursor_ms = end_time
|
|
|
|
# 3. Upload chunks to Dify and get transcriptions
|
|
transcribed_lines = []
|
|
total_segments = len(final_segments)
|
|
for i, seg_data in enumerate(final_segments):
|
|
progress = 10 + int((i / total_segments) * 85)
|
|
self.update_progress(progress, 100, f"Processing chunk {i+1} of {total_segments}...")
|
|
|
|
audio_bytes = export_audio_bytes(seg_data["segment"])
|
|
file_id = upload_chunk_to_dify(audio_bytes, f"chunk_{i+1}.mp4")
|
|
transcribed_text = run_dify_stt_chat_app(file_id).strip()
|
|
|
|
if transcribed_text:
|
|
start_ts = format_timestamp_from_ms(seg_data["start"])
|
|
end_ts = format_timestamp_from_ms(seg_data["end"])
|
|
transcribed_lines.append(f"[{start_ts} - {end_ts}] {transcribed_text}")
|
|
|
|
# 4. Finalize and save the result
|
|
self.update_progress(98, 100, "Finalizing transcript...")
|
|
full_content = "\n".join(transcribed_lines)
|
|
|
|
transcript_filename = f"transcript_{uuid.uuid4()}.txt"
|
|
output_txt_path = os.path.join(app.config['UPLOAD_FOLDER'], transcript_filename)
|
|
with open(output_txt_path, "w", encoding="utf-8") as f:
|
|
f.write(full_content)
|
|
|
|
self.update_progress(100, 100, "Transcription complete.")
|
|
return {'status': 'Success', 'content': full_content, 'result_path': transcript_filename}
|
|
|
|
except Exception as e:
|
|
error_message = f"An error occurred: {str(e)}"
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta={'exc_type': type(e).__name__, 'exc_message': error_message}
|
|
)
|
|
return {'status': 'Error', 'error': error_message}
|
|
|
|
@celery.task(base=ProgressTask, bind=True)
|
|
def translate_text_task(self, text_content, target_language):
|
|
from app import app
|
|
with app.app_context():
|
|
from services.dify_client import translate_text as dify_translate
|
|
try:
|
|
self.update_progress(0, 100, f"Starting translation to {target_language}...")
|
|
if isinstance(text_content, dict):
|
|
text_content = text_content.get('content', '')
|
|
|
|
if not text_content or not isinstance(text_content, str):
|
|
self.update_progress(100, 100, "Translation skipped due to empty input.")
|
|
return {'status': 'Success', 'content': '', 'result_path': None, 'message': 'Input was empty.'}
|
|
|
|
lines = text_content.strip().split('\n')
|
|
final_content = ""
|
|
|
|
timestamp_pattern = re.compile(r'^(\s*\[\d{2}:\d{2}:\d{2}\.\d{3}\s-\s\d{2}:\d{2}:\d{2}\.\d{3}\])\s*(.*)')
|
|
|
|
is_timestamped_input = False
|
|
if lines:
|
|
if timestamp_pattern.match(lines[0]):
|
|
is_timestamped_input = True
|
|
|
|
if is_timestamped_input:
|
|
translated_lines = []
|
|
total_lines = len(lines)
|
|
for i, line in enumerate(lines):
|
|
match = timestamp_pattern.match(line)
|
|
if not match:
|
|
if line.strip(): # Keep non-matching, non-empty lines
|
|
translated_lines.append(line)
|
|
continue
|
|
|
|
timestamp = match.group(1)
|
|
original_text = match.group(2)
|
|
|
|
# Add original line
|
|
translated_lines.append(line)
|
|
|
|
if not original_text.strip():
|
|
continue
|
|
|
|
translated_text = dify_translate(text=original_text, target_lang=target_language)
|
|
# Add translated line, preserving the timestamp
|
|
translated_lines.append(f"{timestamp} {translated_text}")
|
|
|
|
progress = int(((i + 1) / total_lines) * 100)
|
|
self.update_progress(progress, 100, f"Translating line {i+1}/{total_lines}...")
|
|
|
|
final_content = "\n".join(translated_lines)
|
|
else:
|
|
# Handle non-timestamped text line by line for bilingual output
|
|
translated_lines = []
|
|
total_lines = len(lines)
|
|
for i, line in enumerate(lines):
|
|
progress = int(((i + 1) / total_lines) * 98) # Leave some room for finalization
|
|
self.update_progress(progress, 100, f"Translating line {i+1}/{total_lines}...")
|
|
|
|
original_line = line.strip()
|
|
if not original_line:
|
|
continue
|
|
|
|
translated_text = dify_translate(text=original_line, target_lang=target_language)
|
|
translated_lines.append(original_line)
|
|
translated_lines.append(translated_text)
|
|
translated_lines.append("") # Add a blank line for readability
|
|
|
|
final_content = "\n".join(translated_lines)
|
|
|
|
translated_filename = f"translated_{uuid.uuid4()}.txt"
|
|
upload_folder = app.config.get('UPLOAD_FOLDER', 'uploads')
|
|
output_txt_path = os.path.join(upload_folder, translated_filename)
|
|
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
with open(output_txt_path, "w", encoding="utf-8") as f:
|
|
f.write(final_content)
|
|
|
|
self.update_progress(100, 100, "Translation complete.")
|
|
return {'status': 'Success', 'content': final_content, 'result_path': translated_filename}
|
|
except Exception as e:
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
|
|
)
|
|
|
|
@celery.task(bind=True)
|
|
def summarize_text_task(self, meeting_id):
|
|
from app import app
|
|
with app.app_context():
|
|
try:
|
|
meeting = Meeting.query.get(meeting_id)
|
|
if not meeting or not meeting.transcript:
|
|
self.update_state(state='FAILURE', meta={'error': 'Meeting or transcript not found'})
|
|
return {'status': 'Error', 'error': 'Meeting or transcript not found'}
|
|
|
|
api_key = app.config.get("DIFY_SUMMARIZER_API_KEY")
|
|
plain_transcript = re.sub(r'^(\s*\[.*?\])\s*', '', meeting.transcript, flags=re.MULTILINE)
|
|
|
|
if not plain_transcript.strip():
|
|
self.update_state(state='FAILURE', meta={'error': 'Transcript is empty.'})
|
|
return {'status': 'Error', 'error': 'Transcript is empty'}
|
|
|
|
response = ask_dify(api_key, plain_transcript)
|
|
summary = response.get("answer")
|
|
|
|
if not summary:
|
|
self.update_state(state='FAILURE', meta={'error': 'Dify returned empty summary.'})
|
|
return {'status': 'Error', 'error': 'Dify returned empty summary.'}
|
|
|
|
meeting.summary = summary
|
|
db.session.commit()
|
|
|
|
return {'status': 'Success', 'summary': summary}
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
|
|
)
|
|
return {'status': 'Error', 'error': str(e)}
|
|
|
|
@celery.task(base=ProgressTask, bind=True)
|
|
def preview_action_items_task(self, text_content):
|
|
from app import app
|
|
with app.app_context():
|
|
from services.dify_client import extract_action_items
|
|
try:
|
|
self.update_progress(10, 100, "Requesting Dify for action items...")
|
|
|
|
# We can reuse the logic from the summarizer to strip timestamps
|
|
plain_text = re.sub(r'^(\s*\[.*?\])\s*', '', text_content, flags=re.MULTILINE)
|
|
|
|
if not plain_text.strip():
|
|
self.update_progress(100, 100, "Preview skipped, content is empty.")
|
|
return {'status': 'Success', 'items': []}
|
|
|
|
# Use the robust client function
|
|
parsed_items = extract_action_items(plain_text)
|
|
|
|
self.update_progress(100, 100, "Action item preview generated.")
|
|
return {'status': 'Success', 'items': parsed_items}
|
|
except Exception as e:
|
|
# Log the exception for better debugging
|
|
import logging
|
|
logging.error(f"Error in preview_action_items_task: {e}", exc_info=True)
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
|
|
)
|
|
return {'status': 'Error', 'error': str(e)}
|
|
|
|
@celery.task(bind=True)
|
|
def process_meeting_flow(self, meeting_id, target_language=None):
|
|
from app import app
|
|
with app.app_context():
|
|
meeting = Meeting.query.get(meeting_id)
|
|
if not meeting:
|
|
return {'status': 'Error', 'error': 'Meeting not found'}
|
|
try:
|
|
meeting.status = 'processing'
|
|
db.session.commit()
|
|
|
|
upload_folder = app.config['UPLOAD_FOLDER']
|
|
# Assuming filename exists on the meeting object
|
|
audio_path = os.path.join(upload_folder, meeting.filename.replace('.mp4', '.wav'))
|
|
|
|
# The call to transcribe_audio_task needs to be updated as it no longer takes 'language'
|
|
transcript_result = transcribe_audio_task.apply(args=[audio_path]).get()
|
|
if transcript_result.get('status') != 'Success':
|
|
raise Exception(f"Transcription failed: {transcript_result.get('error', 'Unknown error')}")
|
|
|
|
meeting.transcript = transcript_result['content']
|
|
db.session.commit()
|
|
|
|
if target_language:
|
|
translation_result = translate_text_task.apply(args=[meeting.transcript, target_language]).get()
|
|
if translation_result.get('status') != 'Success':
|
|
raise Exception(f"Translation failed: {translation_result.get('error', 'Unknown error')}")
|
|
meeting.translated_transcript = translation_result['content']
|
|
db.session.commit()
|
|
|
|
summary_result = summarize_text_task.apply(args=[meeting.id]).get()
|
|
if summary_result.get('status') != 'Success':
|
|
raise Exception(f"Summarization failed: {summary_result.get('error', 'Unknown error')}")
|
|
|
|
meeting.summary = summary_result['summary']
|
|
meeting.status = 'completed'
|
|
db.session.commit()
|
|
return {'status': 'Success', 'meeting_id': meeting.id}
|
|
except Exception as e:
|
|
meeting.status = 'failed'
|
|
db.session.commit()
|
|
return {'status': 'Error', 'error': str(e)} |