Files
AI_pythonTxt2Excel/app.py
2025-09-14 13:17:53 +08:00

398 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import math
from flask import Flask, jsonify, request
from flask_cors import CORS
from dotenv import load_dotenv
import mysql.connector
import random
from mysql.connector import pooling, Error
# 載入 .env 檔案中的環境變數
load_dotenv(override=True)
# --- 偵錯用:印出環境變數 ---
print("--- 偵錯環境變數 ---")
print(f"DB_HOST: {os.getenv('DB_HOST')}")
print(f"DB_PORT: {os.getenv('DB_PORT')}")
print(f"DB_USER: {os.getenv('DB_USER')}")
print("--------------------")
# -------------------------
app = Flask(__name__)
# 啟用 CORS允許來自任何來源的請求這在開發本機前端時很方便
CORS(app)
# --- 資料庫連線設定 ---
try:
# 建立資料庫連線池,這比每次請求都建立新連線更有效率
connection_pool = pooling.MySQLConnectionPool(
pool_name="pizzapool",
pool_size=5,
pool_reset_session=True,
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
database=os.getenv('DB_DATABASE'),
port=os.getenv('DB_PORT')
)
print("資料庫連線池建立成功!")
except Error as e:
print(f"建立資料庫連線池時發生錯誤: {e}")
exit() # 如果連線池建立失敗,則直接退出程式
def get_db_connection():
"""從連線池取得一個資料庫連線"""
try:
return connection_pool.get_connection()
except Error as e:
print(f"從連線池取得連線時發生錯誤: {e}")
return None
# --- 標準化回應格式 ---
def success_response(data, message="Success", code=200):
"""產生成功的 JSON 回應"""
response = {
"status": "success",
"code": code,
"message": message
}
if data is not None:
response["data"] = data
return jsonify(response), code
def error_response(message, code=400):
"""產生錯誤的 JSON 回應"""
return jsonify({
"status": "error",
"code": code,
"message": message
}), code
# --- 資料庫初始化 ---
def init_db():
"""初始化資料庫,建立資料表並插入初始資料"""
print("正在初始化資料庫...")
db_conn = None
cursor = None
try:
db_conn = get_db_connection()
if db_conn is None:
print("無法取得資料庫連線,初始化失敗。")
return
cursor = db_conn.cursor()
# 建立 pizza 資料表,新增 id 作為主鍵
cursor.execute("""
CREATE TABLE IF NOT EXISTS pizza (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
size VARCHAR(50) NOT NULL,
price DECIMAL(10, 2) NOT NULL
)
""")
print("`pizza` 資料表已確認存在。")
# 檢查是否已有範例資料
cursor.execute("SELECT id FROM pizza WHERE name = %s", ("火山披薩",))
if cursor.fetchone() is None:
# 插入一筆範例資料
cursor.execute(
"INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)",
("火山披薩", "M", 280.00)
)
db_conn.commit()
print("已成功插入範例資料:火山披薩。")
else:
print("範例資料已存在。")
# 檢查總筆數如果不足11筆則補齊
cursor.execute("SELECT COUNT(*) FROM pizza")
total_pizzas = cursor.fetchone()[0]
if total_pizzas < 11:
print(f"目前資料庫有 {total_pizzas} 筆資料,將產生 {11 - total_pizzas} 筆新的隨機資料。")
pizza_names = ['夏威夷', '瑪格麗特', 'BBQ雞肉', '海鮮總匯', '超級豪華', '田園蔬菜', '四種起司']
sizes = ['S', 'M', 'L', 'XL']
pizzas_to_add = []
for _ in range(11 - total_pizzas):
name = random.choice(pizza_names) + "披薩"
size = random.choice(sizes)
price = round(random.uniform(250, 900), 0) # 價格取整數
pizzas_to_add.append((name, size, price))
insert_query = "INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)"
cursor.executemany(insert_query, pizzas_to_add)
db_conn.commit()
print(f"已成功新增 {len(pizzas_to_add)} 筆隨機披薩資料。")
else:
print("資料庫中已有11筆或更多資料無需產生新的隨機資料。")
except Error as e:
print(f"資料庫初始化時發生錯誤: {e}")
finally:
if cursor:
cursor.close()
if db_conn and db_conn.is_connected():
db_conn.close()
print("資料庫初始化完成。")
# --- API 路由 ---
@app.route('/v1/pizza', methods=['GET'])
def get_pizzas():
"""取得披薩清單,支援篩選和分頁"""
db_conn = None
cursor = None
try:
db_conn = get_db_connection()
if db_conn is None:
return error_response("無法連線到資料庫", 500)
cursor = db_conn.cursor(dictionary=True)
# 處理查詢參數
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
offset = (page - 1) * limit
# 動態建立查詢語句以避免 SQL 注入
query_conditions = []
query_params = []
if min_price is not None:
query_conditions.append("price >= %s")
query_params.append(min_price)
if max_price is not None:
query_conditions.append("price <= %s")
query_params.append(max_price)
where_clause = " AND ".join(query_conditions)
# 取得總筆數
count_query = "SELECT COUNT(id) as total FROM pizza"
if where_clause:
count_query += " WHERE " + where_clause
cursor.execute(count_query, tuple(query_params))
total_records = cursor.fetchone()['total']
total_pages = math.ceil(total_records / limit)
# 取得分頁資料
data_query = "SELECT * FROM pizza"
if where_clause:
data_query += " WHERE " + where_clause
data_query += " ORDER BY id LIMIT %s OFFSET %s"
cursor.execute(data_query, tuple(query_params + [limit, offset]))
pizzas = cursor.fetchall()
meta = {
"current_page": page,
"last_page": total_pages,
"per_page": limit,
"total": total_records
}
return success_response({"pizzas": pizzas, "meta": meta})
except Error as e:
return error_response(f"伺服器錯誤: {e}", 500)
finally:
if cursor:
cursor.close()
if db_conn and db_conn.is_connected():
db_conn.close()
@app.route('/v1/pizza/<string:name>', methods=['GET'])
def get_pizza_by_name(name):
"""依據名稱取得單一披薩"""
db_conn = None
cursor = None
try:
db_conn = get_db_connection()
if db_conn is None:
return error_response("無法連線到資料庫", 500)
cursor = db_conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM pizza WHERE name = %s", (name,))
pizza = cursor.fetchone()
if pizza:
return success_response(pizza)
else:
return error_response("找不到指定的披薩", 404)
except Error as e:
return error_response(f"伺服器錯誤: {e}", 500)
finally:
if cursor:
cursor.close()
if db_conn and db_conn.is_connected():
db_conn.close()
@app.route('/v1/pizza', methods=['POST'])
def create_pizza():
"""建立新的披薩"""
db_conn = None
cursor = None
try:
data = request.get_json()
if not data or not all(k in data for k in ['name', 'size', 'price']):
return error_response("缺少必要欄位name, size, price")
name = data['name']
size = data['size']
price = data['price']
if not isinstance(name, str) or not isinstance(size, str) or not isinstance(price, (int, float)):
return error_response("欄位型別錯誤")
db_conn = get_db_connection()
if db_conn is None:
return error_response("無法連線到資料庫", 500)
cursor = db_conn.cursor(dictionary=True)
cursor.execute(
"INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)",
(name, size, price)
)
new_id = cursor.lastrowid
db_conn.commit()
# 查詢並回傳新建立的資料
cursor.execute("SELECT * FROM pizza WHERE id = %s", (new_id,))
new_pizza = cursor.fetchone()
return success_response(new_pizza, "披薩建立成功", 201)
except Error as e:
return error_response(f"資料庫錯誤: {e}", 500)
except Exception as e:
return error_response(f"伺服器錯誤: {e}", 500)
finally:
if cursor:
cursor.close()
if db_conn and db_conn.is_connected():
db_conn.close()
@app.route('/v1/pizza/<int:pizza_id>', methods=['PATCH'])
def update_pizza(pizza_id):
"""更新現有披薩的資訊"""
db_conn = None
cursor = None
try:
data = request.get_json()
if not data:
return error_response("沒有提供要更新的資料")
db_conn = get_db_connection()
if db_conn is None:
return error_response("無法連線到資料庫", 500)
cursor = db_conn.cursor(dictionary=True)
# 檢查披薩是否存在
cursor.execute("SELECT id FROM pizza WHERE id = %s", (pizza_id,))
if not cursor.fetchone():
return error_response("找不到要更新的披薩", 404)
# 動態建立更新語句
update_fields = []
update_values = []
if 'name' in data:
update_fields.append("name = %s")
update_values.append(data['name'])
if 'size' in data:
update_fields.append("size = %s")
update_values.append(data['size'])
if 'price' in data:
update_fields.append("price = %s")
update_values.append(data['price'])
if not update_fields:
return error_response("沒有提供可更新的欄位 (name, size, price)")
update_values.append(pizza_id)
query = f"UPDATE pizza SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(query, tuple(update_values))
db_conn.commit()
# 查詢並回傳更新後的資料
cursor.execute("SELECT * FROM pizza WHERE id = %s", (pizza_id,))
updated_pizza = cursor.fetchone()
return success_response(updated_pizza, "披薩資訊已更新")
except Error as e:
return error_response(f"資料庫錯誤: {e}", 500)
except Exception as e:
return error_response(f"伺服器錯誤: {e}", 500)
finally:
if cursor:
cursor.close()
if db_conn and db_conn.is_connected():
db_conn.close()
@app.route('/v1/pizza/<int:pizza_id>', methods=['DELETE'])
def delete_pizza(pizza_id):
"""刪除一個披薩"""
db_conn = None
cursor = None
try:
db_conn = get_db_connection()
if db_conn is None:
return error_response("無法連線到資料庫", 500)
cursor = db_conn.cursor()
# 檢查披薩是否存在
cursor.execute("SELECT id FROM pizza WHERE id = %s", (pizza_id,))
if not cursor.fetchone():
return error_response("找不到要刪除的披薩", 404)
cursor.execute("DELETE FROM pizza WHERE id = %s", (pizza_id,))
db_conn.commit()
# cursor.rowcount 會回傳影響的行數
if cursor.rowcount > 0:
return success_response(None, "披薩已成功刪除", 204)
else:
# 雖然前面檢查過,但這是一個保險措施
return error_response("找不到要刪除的披薩", 404)
except Error as e:
return error_response(f"資料庫錯誤: {e}", 500)
finally:
if cursor:
cursor.close()
if db_conn and db_conn.is_connected():
db_conn.close()
# --- 全域錯誤處理 ---
@app.errorhandler(Exception)
def handle_global_error(e):
"""捕捉所有未處理的例外"""
return error_response(f"發生未預期的伺服器錯誤: {e}", 500)
if __name__ == '__main__':
# 在啟動伺服器前,先執行資料庫初始化
init_db()
# 啟動 Flask 伺服器
# host='0.0.0.0' 讓伺服器可以從外部網路存取
# debug=True 讓伺服器在程式碼變更後自動重啟
print("正在啟動 Flask 伺服器...")
app.run(host='0.0.0.0', port=5000, debug=True)