Initial commit

This commit is contained in:
2025-09-08 16:23:54 +08:00
commit 0e1ee0820e
6 changed files with 1304 additions and 0 deletions

701
app.py Normal file
View File

@@ -0,0 +1,701 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
import mysql.connector
import os
from datetime import datetime
app = Flask(__name__)
CORS(app) # 啟用 CORS允許本機前端存取
# 資料庫連線資訊
DB_CONFIG = {
'host': 'mysql.theaken.com',
'port': 33306,
'user': 'A019',
'password': '9wvKEkxBzVca',
'database': 'db_A019'
}
# 建立資料庫連線
def get_db_connection():
try:
conn = mysql.connector.connect(**DB_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"資料庫連線錯誤: {err}")
return None
# 統一回應格式
def create_response(status, code, message, data=None):
response = {
"status": status,
"code": code,
"message": message
}
if data is not None:
response["data"] = data
return response
# 錯誤處理
def create_error_response(code, message):
return create_response("error", code, message)
# 獲取所有使用者,支援過濾和分頁
@app.route('/v1/users', methods=['GET'])
def get_users():
try:
# 取得查詢參數
min_age = request.args.get('min_age', type=int)
max_age = request.args.get('max_age', type=int)
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
# 計算偏移量
offset = (page - 1) * limit
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
# 建立基本查詢
query = "SELECT * FROM users"
count_query = "SELECT COUNT(*) as total FROM users"
params = []
where_clauses = []
# 加入過濾條件
if min_age is not None:
where_clauses.append("age >= %s")
params.append(min_age)
if max_age is not None:
where_clauses.append("age <= %s")
params.append(max_age)
# 組合 WHERE 子句
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
count_query += " WHERE " + " AND ".join(where_clauses)
# 加入分頁
query += " LIMIT %s OFFSET %s"
params.extend([limit, offset])
# 執行查詢
cursor.execute(query, params)
users = cursor.fetchall()
# 獲取總記錄數
cursor.execute(count_query, params[:-2] if params else [])
total = cursor.fetchone()['total']
# 計算總頁數
total_pages = (total + limit - 1) // limit
# 建立 meta 資訊
meta = {
"total": total,
"page": page,
"limit": limit,
"total_pages": total_pages
}
cursor.close()
conn.close()
return jsonify(create_response("success", 200, "成功獲取使用者列表", {"users": users, "meta": meta})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 獲取所有披薩,支援過濾和分頁
@app.route('/v1/pizzas', methods=['GET'])
def get_pizzas():
try:
# 取得查詢參數
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
size = request.args.get('size')
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
# 計算偏移量
offset = (page - 1) * limit
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
# 建立基本查詢
query = "SELECT * FROM pizzas"
count_query = "SELECT COUNT(*) as total FROM pizzas"
params = []
where_clauses = []
# 加入過濾條件
if min_price is not None:
where_clauses.append("price >= %s")
params.append(min_price)
if max_price is not None:
where_clauses.append("price <= %s")
params.append(max_price)
if size is not None:
where_clauses.append("size = %s")
params.append(size)
# 組合 WHERE 子句
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
count_query += " WHERE " + " AND ".join(where_clauses)
# 加入分頁
query += " LIMIT %s OFFSET %s"
params.extend([limit, offset])
# 執行查詢
cursor.execute(query, params)
pizzas = cursor.fetchall()
# 獲取總記錄數
cursor.execute(count_query, params[:-2] if params else [])
total = cursor.fetchone()['total']
# 計算總頁數
total_pages = (total + limit - 1) // limit
# 建立 meta 資訊
meta = {
"total": total,
"page": page,
"limit": limit,
"total_pages": total_pages
}
cursor.close()
conn.close()
return jsonify(create_response("success", 200, "成功獲取披薩列表", {"pizzas": pizzas, "meta": meta})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 獲取單一使用者
@app.route('/v1/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
try:
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
cursor.close()
conn.close()
if not user:
return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404
return jsonify(create_response("success", 200, "成功獲取使用者資料", {"user": user})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 獲取單一披薩
@app.route('/v1/pizzas/<int:pizza_id>', methods=['GET'])
def get_pizza_by_id(pizza_id):
try:
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,))
pizza = cursor.fetchone()
cursor.close()
conn.close()
if not pizza:
return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404
return jsonify(create_response("success", 200, "成功獲取披薩資料", {"pizza": pizza})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
cursor.close()
conn.close()
if not user:
return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404
return jsonify(create_response("success", 200, "成功獲取使用者資料", {"user": user})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 獲取單一披薩
# 已在上方定義 GET /v1/pizzas/<int:pizza_id> 路由
# 新增披薩
@app.route('/v1/pizzas', methods=['POST'])
def create_pizza():
try:
data = request.get_json()
# 驗證必要欄位
required_fields = ['name', 'price', 'size']
for field in required_fields:
if field not in data:
return jsonify(create_error_response(400, f"缺少必要欄位: {field}")), 400
# 驗證資料類型
if not isinstance(data['name'], str):
return jsonify(create_error_response(400, "name 必須是字串")), 400
try:
price = float(data['price'])
if price <= 0:
return jsonify(create_error_response(400, "price 必須大於 0")), 400
except (ValueError, TypeError):
return jsonify(create_error_response(400, "price 必須是有效的數字")), 400
if not isinstance(data['size'], str) or data['size'] not in ['S', 'M', 'L']:
return jsonify(create_error_response(400, "size 必須是 S、M 或 L")), 400
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
# 設定時間
now = datetime.now()
# 插入資料
insert_query = "INSERT INTO pizzas (name, price, size, created_at, updated_at) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(insert_query, (data['name'], price, data['size'], now, now))
pizza_id = cursor.lastrowid
conn.commit()
# 獲取新增的披薩資料
cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,))
new_pizza = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(create_response("success", 201, "成功新增披薩", {"pizza": new_pizza})), 201
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 更新披薩
@app.route('/v1/pizzas/<int:pizza_id>', methods=['PATCH'])
def update_pizza(pizza_id):
try:
data = request.get_json()
# 至少需要一個欄位
if not data or not any(field in data for field in ['name', 'price', 'size']):
return jsonify(create_error_response(400, "至少需要提供一個欄位: name, price, size")), 400
# 驗證資料類型
if 'name' in data and not isinstance(data['name'], str):
return jsonify(create_error_response(400, "name 必須是字串")), 400
if 'price' in data:
try:
price = float(data['price'])
if price <= 0:
return jsonify(create_error_response(400, "price 必須大於 0")), 400
data['price'] = price
except (ValueError, TypeError):
return jsonify(create_error_response(400, "price 必須是有效的數字")), 400
if 'size' in data and (not isinstance(data['size'], str) or data['size'] not in ['S', 'M', 'L']):
return jsonify(create_error_response(400, "size 必須是 S、M 或 L")), 400
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
# 檢查披薩是否存在
cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,))
pizza = cursor.fetchone()
if not pizza:
cursor.close()
conn.close()
return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404
# 設定更新時間
now = datetime.now()
data['updated_at'] = now
# 建立更新查詢
update_fields = []
update_values = []
for field in ['name', 'price', 'size', 'updated_at']:
if field in data:
update_fields.append(f"{field} = %s")
update_values.append(data[field])
update_values.append(pizza_id) # WHERE id = %s 的參數
update_query = f"UPDATE pizzas SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(update_query, update_values)
conn.commit()
# 獲取更新後的披薩資料
cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,))
updated_pizza = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(create_response("success", 200, "成功更新披薩", {"pizza": updated_pizza})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 刪除披薩
@app.route('/v1/pizzas/<int:pizza_id>', methods=['DELETE'])
def delete_pizza(pizza_id):
try:
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor()
# 檢查披薩是否存在
cursor.execute("SELECT id FROM pizzas WHERE id = %s", (pizza_id,))
pizza = cursor.fetchone()
if not pizza:
cursor.close()
conn.close()
return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404
# 刪除披薩
cursor.execute("DELETE FROM pizzas WHERE id = %s", (pizza_id,))
conn.commit()
cursor.close()
conn.close()
return jsonify(create_response("success", 204, "成功刪除披薩")), 204
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 新增使用者
@app.route('/v1/users', methods=['POST'])
def create_user():
try:
data = request.get_json()
# 驗證必要欄位
required_fields = ['name', 'email', 'age']
for field in required_fields:
if field not in data:
return jsonify(create_error_response(400, f"缺少必要欄位: {field}")), 400
# 驗證資料類型
if not isinstance(data['name'], str):
return jsonify(create_error_response(400, "name 必須是字串")), 400
# 驗證資料類型
if not isinstance(data['name'], str):
return jsonify(create_error_response(400, "name 必須是字串")), 400
try:
price = float(data['price'])
if price <= 0:
return jsonify(create_error_response(400, "price 必須大於 0")), 400
except (ValueError, TypeError):
return jsonify(create_error_response(400, "price 必須是有效的數字")), 400
if not isinstance(data['size'], str) or data['size'] not in ['S', 'M', 'L']:
return jsonify(create_error_response(400, "size 必須是 S、M 或 L")), 400
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor()
# 設定時間
now = datetime.now()
# 插入資料
insert_query = "INSERT INTO pizzas (name, price, size, created_at, updated_at) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(insert_query, (data['name'], price, data['size'], now, now))
pizza_id = cursor.lastrowid
conn.commit()
# 獲取新增的披薩資料
cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,))
new_pizza = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(create_response("success", 201, "成功新增披薩", {"pizza": new_pizza})), 201
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
if not isinstance(data['email'], str):
return jsonify(create_error_response(400, "email 必須是字串")), 400
if not isinstance(data['age'], int):
return jsonify(create_error_response(400, "age 必須是整數")), 400
# 驗證 email 格式 (簡單驗證)
if '@' not in data['email']:
return jsonify(create_error_response(400, "email 格式不正確")), 400
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
# 使用參數化查詢避免 SQL 注入
query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(query, (data['name'], data['email'], data['age']))
# 獲取新增的使用者 ID
user_id = cursor.lastrowid
conn.commit()
# 獲取新增的使用者資料
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
new_user = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(create_response("success", 201, "成功新增使用者", {"user": new_user})), 201
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 更新使用者
@app.route('/v1/users/<int:user_id>', methods=['PATCH'])
def update_user(user_id):
try:
data = request.get_json()
# 檢查是否有要更新的欄位
if not data:
return jsonify(create_error_response(400, "沒有提供要更新的資料")), 400
# 驗證資料類型
if 'name' in data and not isinstance(data['name'], str):
return jsonify(create_error_response(400, "name 必須是字串")), 400
if 'email' in data and not isinstance(data['email'], str):
return jsonify(create_error_response(400, "email 必須是字串")), 400
if 'age' in data and not isinstance(data['age'], int):
return jsonify(create_error_response(400, "age 必須是整數")), 400
# 驗證 email 格式 (簡單驗證)
if 'email' in data and '@' not in data['email']:
return jsonify(create_error_response(400, "email 格式不正確")), 400
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
# 檢查使用者是否存在
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
cursor.close()
conn.close()
return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404
# 建立更新查詢
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'email' in data:
update_fields.append("email = %s")
params.append(data['email'])
if 'age' in data:
update_fields.append("age = %s")
params.append(data['age'])
# 如果沒有要更新的欄位
if not update_fields:
cursor.close()
conn.close()
return jsonify(create_error_response(400, "沒有提供有效的更新欄位")), 400
# 建立更新查詢
query = "UPDATE users SET " + ", ".join(update_fields) + " WHERE id = %s"
params.append(user_id)
# 執行更新
cursor.execute(query, params)
conn.commit()
# 獲取更新後的使用者資料
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
updated_user = cursor.fetchone()
cursor.close()
conn.close()
return jsonify(create_response("success", 200, "成功更新使用者資料", {"user": updated_user})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 刪除使用者
@app.route('/v1/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
try:
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor()
# 檢查使用者是否存在
cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
cursor.close()
conn.close()
return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404
# 刪除使用者
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
cursor.close()
conn.close()
return jsonify(create_response("success", 204, "成功刪除使用者")), 204
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 獲取所有 pizzas支援分頁
# 已在上方定義 GET /v1/pizzas 路由
# 獲取總記錄數
cursor.execute(count_query, params[:-2] if params else [])
total = cursor.fetchone()['total']
# 計算總頁數
total_pages = (total + limit - 1) // limit
# 建立 meta 資訊
meta = {
"total": total,
"page": page,
"limit": limit,
"total_pages": total_pages
}
cursor.close()
conn.close()
return jsonify(create_response("success", 200, "成功獲取披薩列表", {"pizzas": pizzas, "meta": meta})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 獲取單一 pizza
# 注意:此路由已在其他地方定義
def get_pizza(pizza_id):
try:
conn = get_db_connection()
if not conn:
return jsonify(create_error_response(500, "資料庫連線失敗")), 500
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,))
pizza = cursor.fetchone()
cursor.close()
conn.close()
if not pizza:
return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404
return jsonify(create_response("success", 200, "成功獲取披薩資料", {"pizza": pizza})), 200
except Exception as e:
return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500
# 新增 pizza
# 已在上方定義 POST /v1/pizzas 路由
# 更新 pizza
# 已在上方定義 PATCH /v1/pizzas/<int:pizza_id> 路由
# 刪除 pizza
# 已在上方定義 DELETE /v1/pizzas/<int:pizza_id> 路由
# 主程式入口
if __name__ == '__main__':
# 確保資料表存在
conn = get_db_connection()
if conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) NOT NULL,
age INT NOT NULL
)
""")
# 確保 pizzas 資料表存在
cursor.execute("""
CREATE TABLE IF NOT EXISTS pizzas (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
size VARCHAR(2) NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
)
""")
conn.commit()
cursor.close()
conn.close()
app.run(debug=True)