113 lines
4.4 KiB
Python
113 lines
4.4 KiB
Python
import os
|
|
import uuid
|
|
from flask import Flask, request, jsonify, render_template, send_from_directory, url_for
|
|
from werkzeug.utils import secure_filename
|
|
from dotenv import load_dotenv
|
|
from tasks import (
|
|
celery,
|
|
extract_audio_task,
|
|
transcribe_audio_task,
|
|
translate_segments_task,
|
|
summarize_text_task
|
|
)
|
|
|
|
# --- Flask App 設定 ---
|
|
load_dotenv()
|
|
app = Flask(__name__)
|
|
|
|
project_root = os.path.dirname(os.path.abspath(__file__))
|
|
UPLOAD_FOLDER = os.path.join(project_root, 'uploads')
|
|
|
|
if not os.path.exists(UPLOAD_FOLDER):
|
|
os.makedirs(UPLOAD_FOLDER)
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 # 限制上傳大小為 1GB
|
|
|
|
def save_uploaded_file(file_key='file'):
|
|
"""一個輔助函式,用於儲存上傳的檔案並回傳路徑。"""
|
|
if file_key not in request.files:
|
|
return None, (jsonify({'error': '請求中沒有檔案部分'}), 400)
|
|
file = request.files[file_key]
|
|
if file.filename == '':
|
|
return None, (jsonify({'error': '未選擇檔案'}), 400)
|
|
if file:
|
|
original_filename = secure_filename(file.filename)
|
|
file_extension = os.path.splitext(original_filename)[1]
|
|
unique_filename = f"{uuid.uuid4()}{file_extension}"
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
|
|
file.save(file_path)
|
|
return file_path, None
|
|
return None, (jsonify({'error': '未知的檔案錯誤'}), 500)
|
|
|
|
# --- API 路由 (Endpoints) ---
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/extract_audio', methods=['POST'])
|
|
def handle_extract_audio():
|
|
input_path, error = save_uploaded_file()
|
|
if error: return error
|
|
|
|
output_audio_path = os.path.splitext(input_path)[0] + ".wav"
|
|
task = extract_audio_task.delay(input_path, output_audio_path)
|
|
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
|
|
|
|
@app.route('/transcribe_audio', methods=['POST'])
|
|
def handle_transcribe_audio():
|
|
input_path, error = save_uploaded_file()
|
|
if error: return error
|
|
|
|
language = request.form.get('language', 'auto')
|
|
use_demucs = request.form.get('use_demucs') == 'on'
|
|
|
|
output_txt_path = os.path.splitext(input_path)[0] + ".txt"
|
|
|
|
task = transcribe_audio_task.delay(input_path, output_txt_path, language, use_demucs)
|
|
|
|
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
|
|
|
|
@app.route('/translate_text', methods=['POST'])
|
|
def handle_translate_text():
|
|
input_path, error = save_uploaded_file()
|
|
if error: return error
|
|
|
|
target_language = request.form.get('target_language', '繁體中文')
|
|
output_txt_path = os.path.splitext(input_path)[0] + "_translated.txt"
|
|
task = translate_segments_task.delay(input_path, output_txt_path, target_language)
|
|
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
|
|
|
|
@app.route('/summarize_text', methods=['POST'])
|
|
def handle_summarize_text():
|
|
data = request.get_json()
|
|
if not data or 'text_content' not in data:
|
|
return jsonify({'error': '請求中缺少 text_content'}), 400
|
|
|
|
text_content = data['text_content']
|
|
target_language = data.get('target_language', '繁體中文')
|
|
conversation_id = data.get('conversation_id')
|
|
revision_instruction = data.get('revision_instruction')
|
|
|
|
task = summarize_text_task.delay(text_content, target_language, conversation_id, revision_instruction)
|
|
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
|
|
|
|
# --- 通用狀態查詢和下載 ---
|
|
@app.route('/status/<task_id>')
|
|
def get_task_status(task_id):
|
|
task = celery.AsyncResult(task_id)
|
|
response_data = {'state': task.state, 'info': task.info if isinstance(task.info, dict) else str(task.info)}
|
|
|
|
if task.state == 'SUCCESS' and isinstance(task.info, dict) and 'result_path' in task.info and task.info['result_path']:
|
|
response_data['info']['download_url'] = url_for('download_file', filename=os.path.basename(task.info['result_path']))
|
|
|
|
return jsonify(response_data)
|
|
|
|
@app.route('/download/<filename>')
|
|
def download_file(filename):
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
|
|
|
|
# --- 主程式入口 ---
|
|
if __name__ == '__main__':
|
|
port = int(os.environ.get("FLASK_RUN_PORT", 5000))
|
|
app.run(host='0.0.0.0', port=port, debug=True)
|