Files
2025-08-07 09:57:20 +08:00

113 lines
4.4 KiB
Python

import os
import uuid
from flask import Flask, request, jsonify, render_template, send_from_directory, url_for
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
from tasks import (
celery,
extract_audio_task,
transcribe_audio_task,
translate_segments_task,
summarize_text_task
)
# --- Flask App 設定 ---
load_dotenv()
app = Flask(__name__)
project_root = os.path.dirname(os.path.abspath(__file__))
UPLOAD_FOLDER = os.path.join(project_root, 'uploads')
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 # 限制上傳大小為 1GB
def save_uploaded_file(file_key='file'):
"""一個輔助函式,用於儲存上傳的檔案並回傳路徑。"""
if file_key not in request.files:
return None, (jsonify({'error': '請求中沒有檔案部分'}), 400)
file = request.files[file_key]
if file.filename == '':
return None, (jsonify({'error': '未選擇檔案'}), 400)
if file:
original_filename = secure_filename(file.filename)
file_extension = os.path.splitext(original_filename)[1]
unique_filename = f"{uuid.uuid4()}{file_extension}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
return file_path, None
return None, (jsonify({'error': '未知的檔案錯誤'}), 500)
# --- API 路由 (Endpoints) ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/extract_audio', methods=['POST'])
def handle_extract_audio():
input_path, error = save_uploaded_file()
if error: return error
output_audio_path = os.path.splitext(input_path)[0] + ".wav"
task = extract_audio_task.delay(input_path, output_audio_path)
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
@app.route('/transcribe_audio', methods=['POST'])
def handle_transcribe_audio():
input_path, error = save_uploaded_file()
if error: return error
language = request.form.get('language', 'auto')
use_demucs = request.form.get('use_demucs') == 'on'
output_txt_path = os.path.splitext(input_path)[0] + ".txt"
task = transcribe_audio_task.delay(input_path, output_txt_path, language, use_demucs)
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
@app.route('/translate_text', methods=['POST'])
def handle_translate_text():
input_path, error = save_uploaded_file()
if error: return error
target_language = request.form.get('target_language', '繁體中文')
output_txt_path = os.path.splitext(input_path)[0] + "_translated.txt"
task = translate_segments_task.delay(input_path, output_txt_path, target_language)
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
@app.route('/summarize_text', methods=['POST'])
def handle_summarize_text():
data = request.get_json()
if not data or 'text_content' not in data:
return jsonify({'error': '請求中缺少 text_content'}), 400
text_content = data['text_content']
target_language = data.get('target_language', '繁體中文')
conversation_id = data.get('conversation_id')
revision_instruction = data.get('revision_instruction')
task = summarize_text_task.delay(text_content, target_language, conversation_id, revision_instruction)
return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202
# --- 通用狀態查詢和下載 ---
@app.route('/status/<task_id>')
def get_task_status(task_id):
task = celery.AsyncResult(task_id)
response_data = {'state': task.state, 'info': task.info if isinstance(task.info, dict) else str(task.info)}
if task.state == 'SUCCESS' and isinstance(task.info, dict) and 'result_path' in task.info and task.info['result_path']:
response_data['info']['download_url'] = url_for('download_file', filename=os.path.basename(task.info['result_path']))
return jsonify(response_data)
@app.route('/download/<filename>')
def download_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True)
# --- 主程式入口 ---
if __name__ == '__main__':
port = int(os.environ.get("FLASK_RUN_PORT", 5000))
app.run(host='0.0.0.0', port=port, debug=True)