import os import uuid from flask import Flask, request, jsonify, render_template, send_from_directory, url_for from werkzeug.utils import secure_filename from dotenv import load_dotenv from tasks import ( celery, extract_audio_task, transcribe_audio_task, translate_segments_task, summarize_text_task ) # --- Flask App 設定 --- load_dotenv() app = Flask(__name__) project_root = os.path.dirname(os.path.abspath(__file__)) UPLOAD_FOLDER = os.path.join(project_root, 'uploads') if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 # 限制上傳大小為 1GB def save_uploaded_file(file_key='file'): """一個輔助函式,用於儲存上傳的檔案並回傳路徑。""" if file_key not in request.files: return None, (jsonify({'error': '請求中沒有檔案部分'}), 400) file = request.files[file_key] if file.filename == '': return None, (jsonify({'error': '未選擇檔案'}), 400) if file: original_filename = secure_filename(file.filename) file_extension = os.path.splitext(original_filename)[1] unique_filename = f"{uuid.uuid4()}{file_extension}" file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) file.save(file_path) return file_path, None return None, (jsonify({'error': '未知的檔案錯誤'}), 500) # --- API 路由 (Endpoints) --- @app.route('/') def index(): return render_template('index.html') @app.route('/extract_audio', methods=['POST']) def handle_extract_audio(): input_path, error = save_uploaded_file() if error: return error output_audio_path = os.path.splitext(input_path)[0] + ".wav" task = extract_audio_task.delay(input_path, output_audio_path) return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202 @app.route('/transcribe_audio', methods=['POST']) def handle_transcribe_audio(): input_path, error = save_uploaded_file() if error: return error language = request.form.get('language', 'auto') use_demucs = request.form.get('use_demucs') == 'on' output_txt_path = os.path.splitext(input_path)[0] + ".txt" task = transcribe_audio_task.delay(input_path, output_txt_path, language, use_demucs) return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202 @app.route('/translate_text', methods=['POST']) def handle_translate_text(): input_path, error = save_uploaded_file() if error: return error target_language = request.form.get('target_language', '繁體中文') output_txt_path = os.path.splitext(input_path)[0] + "_translated.txt" task = translate_segments_task.delay(input_path, output_txt_path, target_language) return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202 @app.route('/summarize_text', methods=['POST']) def handle_summarize_text(): data = request.get_json() if not data or 'text_content' not in data: return jsonify({'error': '請求中缺少 text_content'}), 400 text_content = data['text_content'] target_language = data.get('target_language', '繁體中文') conversation_id = data.get('conversation_id') revision_instruction = data.get('revision_instruction') task = summarize_text_task.delay(text_content, target_language, conversation_id, revision_instruction) return jsonify({'task_id': task.id, 'status_url': url_for('get_task_status', task_id=task.id)}), 202 # --- 通用狀態查詢和下載 --- @app.route('/status/') def get_task_status(task_id): task = celery.AsyncResult(task_id) response_data = {'state': task.state, 'info': task.info if isinstance(task.info, dict) else str(task.info)} if task.state == 'SUCCESS' and isinstance(task.info, dict) and 'result_path' in task.info and task.info['result_path']: response_data['info']['download_url'] = url_for('download_file', filename=os.path.basename(task.info['result_path'])) return jsonify(response_data) @app.route('/download/') def download_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=True) # --- 主程式入口 --- if __name__ == '__main__': port = int(os.environ.get("FLASK_RUN_PORT", 5000)) app.run(host='0.0.0.0', port=port, debug=True)