import os import math from flask import Flask, jsonify, request from flask_cors import CORS from dotenv import load_dotenv import mysql.connector import random from mysql.connector import pooling, Error # 載入 .env 檔案中的環境變數 load_dotenv(override=True) # --- 偵錯用:印出環境變數 --- print("--- 偵錯環境變數 ---") print(f"DB_HOST: {os.getenv('DB_HOST')}") print(f"DB_PORT: {os.getenv('DB_PORT')}") print(f"DB_USER: {os.getenv('DB_USER')}") print("--------------------") # ------------------------- app = Flask(__name__) # 啟用 CORS,允許來自任何來源的請求,這在開發本機前端時很方便 CORS(app) # --- 資料庫連線設定 --- try: # 建立資料庫連線池,這比每次請求都建立新連線更有效率 connection_pool = pooling.MySQLConnectionPool( pool_name="pizzapool", pool_size=5, pool_reset_session=True, host=os.getenv('DB_HOST'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), database=os.getenv('DB_DATABASE'), port=os.getenv('DB_PORT') ) print("資料庫連線池建立成功!") except Error as e: print(f"建立資料庫連線池時發生錯誤: {e}") exit() # 如果連線池建立失敗,則直接退出程式 def get_db_connection(): """從連線池取得一個資料庫連線""" try: return connection_pool.get_connection() except Error as e: print(f"從連線池取得連線時發生錯誤: {e}") return None # --- 標準化回應格式 --- def success_response(data, message="Success", code=200): """產生成功的 JSON 回應""" response = { "status": "success", "code": code, "message": message } if data is not None: response["data"] = data return jsonify(response), code def error_response(message, code=400): """產生錯誤的 JSON 回應""" return jsonify({ "status": "error", "code": code, "message": message }), code # --- 資料庫初始化 --- def init_db(): """初始化資料庫,建立資料表並插入初始資料""" print("正在初始化資料庫...") db_conn = None cursor = None try: db_conn = get_db_connection() if db_conn is None: print("無法取得資料庫連線,初始化失敗。") return cursor = db_conn.cursor() # 建立 pizza 資料表,新增 id 作為主鍵 cursor.execute(""" CREATE TABLE IF NOT EXISTS pizza ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, size VARCHAR(50) NOT NULL, price DECIMAL(10, 2) NOT NULL ) """) print("`pizza` 資料表已確認存在。") # 檢查是否已有範例資料 cursor.execute("SELECT id FROM pizza WHERE name = %s", ("火山披薩",)) if cursor.fetchone() is None: # 插入一筆範例資料 cursor.execute( "INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)", ("火山披薩", "M", 280.00) ) db_conn.commit() print("已成功插入範例資料:火山披薩。") else: print("範例資料已存在。") # 檢查總筆數,如果不足11筆,則補齊 cursor.execute("SELECT COUNT(*) FROM pizza") total_pizzas = cursor.fetchone()[0] if total_pizzas < 11: print(f"目前資料庫有 {total_pizzas} 筆資料,將產生 {11 - total_pizzas} 筆新的隨機資料。") pizza_names = ['夏威夷', '瑪格麗特', 'BBQ雞肉', '海鮮總匯', '超級豪華', '田園蔬菜', '四種起司'] sizes = ['S', 'M', 'L', 'XL'] pizzas_to_add = [] for _ in range(11 - total_pizzas): name = random.choice(pizza_names) + "披薩" size = random.choice(sizes) price = round(random.uniform(250, 900), 0) # 價格取整數 pizzas_to_add.append((name, size, price)) insert_query = "INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)" cursor.executemany(insert_query, pizzas_to_add) db_conn.commit() print(f"已成功新增 {len(pizzas_to_add)} 筆隨機披薩資料。") else: print("資料庫中已有11筆或更多資料,無需產生新的隨機資料。") except Error as e: print(f"資料庫初始化時發生錯誤: {e}") finally: if cursor: cursor.close() if db_conn and db_conn.is_connected(): db_conn.close() print("資料庫初始化完成。") # --- API 路由 --- @app.route('/v1/pizza', methods=['GET']) def get_pizzas(): """取得披薩清單,支援篩選和分頁""" db_conn = None cursor = None try: db_conn = get_db_connection() if db_conn is None: return error_response("無法連線到資料庫", 500) cursor = db_conn.cursor(dictionary=True) # 處理查詢參數 min_price = request.args.get('min_price', type=float) max_price = request.args.get('max_price', type=float) page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 10, type=int) offset = (page - 1) * limit # 動態建立查詢語句以避免 SQL 注入 query_conditions = [] query_params = [] if min_price is not None: query_conditions.append("price >= %s") query_params.append(min_price) if max_price is not None: query_conditions.append("price <= %s") query_params.append(max_price) where_clause = " AND ".join(query_conditions) # 取得總筆數 count_query = "SELECT COUNT(id) as total FROM pizza" if where_clause: count_query += " WHERE " + where_clause cursor.execute(count_query, tuple(query_params)) total_records = cursor.fetchone()['total'] total_pages = math.ceil(total_records / limit) # 取得分頁資料 data_query = "SELECT * FROM pizza" if where_clause: data_query += " WHERE " + where_clause data_query += " ORDER BY id LIMIT %s OFFSET %s" cursor.execute(data_query, tuple(query_params + [limit, offset])) pizzas = cursor.fetchall() meta = { "current_page": page, "last_page": total_pages, "per_page": limit, "total": total_records } return success_response({"pizzas": pizzas, "meta": meta}) except Error as e: return error_response(f"伺服器錯誤: {e}", 500) finally: if cursor: cursor.close() if db_conn and db_conn.is_connected(): db_conn.close() @app.route('/v1/pizza/', methods=['GET']) def get_pizza_by_name(name): """依據名稱取得單一披薩""" db_conn = None cursor = None try: db_conn = get_db_connection() if db_conn is None: return error_response("無法連線到資料庫", 500) cursor = db_conn.cursor(dictionary=True) cursor.execute("SELECT * FROM pizza WHERE name = %s", (name,)) pizza = cursor.fetchone() if pizza: return success_response(pizza) else: return error_response("找不到指定的披薩", 404) except Error as e: return error_response(f"伺服器錯誤: {e}", 500) finally: if cursor: cursor.close() if db_conn and db_conn.is_connected(): db_conn.close() @app.route('/v1/pizza', methods=['POST']) def create_pizza(): """建立新的披薩""" db_conn = None cursor = None try: data = request.get_json() if not data or not all(k in data for k in ['name', 'size', 'price']): return error_response("缺少必要欄位:name, size, price") name = data['name'] size = data['size'] price = data['price'] if not isinstance(name, str) or not isinstance(size, str) or not isinstance(price, (int, float)): return error_response("欄位型別錯誤") db_conn = get_db_connection() if db_conn is None: return error_response("無法連線到資料庫", 500) cursor = db_conn.cursor(dictionary=True) cursor.execute( "INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)", (name, size, price) ) new_id = cursor.lastrowid db_conn.commit() # 查詢並回傳新建立的資料 cursor.execute("SELECT * FROM pizza WHERE id = %s", (new_id,)) new_pizza = cursor.fetchone() return success_response(new_pizza, "披薩建立成功", 201) except Error as e: return error_response(f"資料庫錯誤: {e}", 500) except Exception as e: return error_response(f"伺服器錯誤: {e}", 500) finally: if cursor: cursor.close() if db_conn and db_conn.is_connected(): db_conn.close() @app.route('/v1/pizza/', methods=['PATCH']) def update_pizza(pizza_id): """更新現有披薩的資訊""" db_conn = None cursor = None try: data = request.get_json() if not data: return error_response("沒有提供要更新的資料") db_conn = get_db_connection() if db_conn is None: return error_response("無法連線到資料庫", 500) cursor = db_conn.cursor(dictionary=True) # 檢查披薩是否存在 cursor.execute("SELECT id FROM pizza WHERE id = %s", (pizza_id,)) if not cursor.fetchone(): return error_response("找不到要更新的披薩", 404) # 動態建立更新語句 update_fields = [] update_values = [] if 'name' in data: update_fields.append("name = %s") update_values.append(data['name']) if 'size' in data: update_fields.append("size = %s") update_values.append(data['size']) if 'price' in data: update_fields.append("price = %s") update_values.append(data['price']) if not update_fields: return error_response("沒有提供可更新的欄位 (name, size, price)") update_values.append(pizza_id) query = f"UPDATE pizza SET {', '.join(update_fields)} WHERE id = %s" cursor.execute(query, tuple(update_values)) db_conn.commit() # 查詢並回傳更新後的資料 cursor.execute("SELECT * FROM pizza WHERE id = %s", (pizza_id,)) updated_pizza = cursor.fetchone() return success_response(updated_pizza, "披薩資訊已更新") except Error as e: return error_response(f"資料庫錯誤: {e}", 500) except Exception as e: return error_response(f"伺服器錯誤: {e}", 500) finally: if cursor: cursor.close() if db_conn and db_conn.is_connected(): db_conn.close() @app.route('/v1/pizza/', methods=['DELETE']) def delete_pizza(pizza_id): """刪除一個披薩""" db_conn = None cursor = None try: db_conn = get_db_connection() if db_conn is None: return error_response("無法連線到資料庫", 500) cursor = db_conn.cursor() # 檢查披薩是否存在 cursor.execute("SELECT id FROM pizza WHERE id = %s", (pizza_id,)) if not cursor.fetchone(): return error_response("找不到要刪除的披薩", 404) cursor.execute("DELETE FROM pizza WHERE id = %s", (pizza_id,)) db_conn.commit() # cursor.rowcount 會回傳影響的行數 if cursor.rowcount > 0: return success_response(None, "披薩已成功刪除", 204) else: # 雖然前面檢查過,但這是一個保險措施 return error_response("找不到要刪除的披薩", 404) except Error as e: return error_response(f"資料庫錯誤: {e}", 500) finally: if cursor: cursor.close() if db_conn and db_conn.is_connected(): db_conn.close() # --- 全域錯誤處理 --- @app.errorhandler(Exception) def handle_global_error(e): """捕捉所有未處理的例外""" return error_response(f"發生未預期的伺服器錯誤: {e}", 500) if __name__ == '__main__': # 在啟動伺服器前,先執行資料庫初始化 init_db() # 啟動 Flask 伺服器 # host='0.0.0.0' 讓伺服器可以從外部網路存取 # debug=True 讓伺服器在程式碼變更後自動重啟 print("正在啟動 Flask 伺服器...") app.run(host='0.0.0.0', port=5000, debug=True)