Initial commit

This commit is contained in:
Allen
2025-09-08 16:24:14 +08:00
commit 100f9a2ede

263
app.py Normal file
View File

@@ -0,0 +1,263 @@
import os
import math
from flask import Flask, jsonify, request
from flask_cors import CORS
import mysql.connector
from mysql.connector import Error
# --- 環境變數或直接設定 ---
# 建議使用環境變數來管理敏感資訊,但為了方便直接執行,這裡先寫死
DB_HOST = os.getenv("DB_HOST", "mysql.theaken.com")
DB_PORT = int(os.getenv("DB_PORT", 33306))
DB_NAME = os.getenv("DB_NAME", "db_A002")
DB_USER = os.getenv("DB_USER", "A002")
DB_PASSWORD = os.getenv("DB_PASSWORD", "5YqyroX7o22y")
DB_TABLE = "users"
# --- Flask App 初始化 ---
app = Flask(__name__)
# 啟用 CORS允許所有來源的請求在生產環境中應更嚴格限制
CORS(app, resources={r"/v1/*": {"origins": "*"}})
# --- 資料庫連線 ---
def get_db_connection():
"""建立並回傳資料庫連線"""
try:
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
return conn
except Error as e:
print(f"資料庫連線失敗: {e}")
return None
# --- 統一回應格式 ---
def create_response(status, code, message, data=None):
"""建立標準化的 JSON 回應"""
response = {"status": status, "code": code, "message": message}
if data is not None:
response["data"] = data
return jsonify(response), code
# --- 錯誤處理 ---
@app.errorhandler(404)
def not_found_error(error):
return create_response("error", 404, "Resource not found")
@app.errorhandler(500)
def internal_error(error):
return create_response("error", 500, "Internal server error")
# --- API Endpoints ---
# [GET] /v1/users - 取得使用者列表(支援篩選與分頁)
@app.route('/v1/users', methods=['GET'])
def get_users():
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
# 1. 取得查詢參數
min_age = request.args.get('min_age', type=int)
max_age = request.args.get('max_age', type=int)
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
offset = (page - 1) * limit
# 2. 動態建立查詢條件
query_conditions = []
params = []
if min_age is not None:
query_conditions.append("age >= %s")
params.append(min_age)
if max_age is not None:
query_conditions.append("age <= %s")
params.append(max_age)
where_clause = " AND ".join(query_conditions)
if where_clause:
where_clause = "WHERE " + where_clause
# 3. 查詢總數以計算分頁
cursor = conn.cursor()
count_query = f"SELECT COUNT(*) FROM {DB_TABLE} {where_clause}"
cursor.execute(count_query, tuple(params))
total_items = cursor.fetchone()[0]
total_pages = math.ceil(total_items / limit)
# 4. 查詢當頁資料
data_query = f"SELECT id, name, email, age FROM {DB_TABLE} {where_clause} ORDER BY id ASC LIMIT %s OFFSET %s"
cursor = conn.cursor(dictionary=True)
cursor.execute(data_query, tuple(params + [limit, offset]))
users = cursor.fetchall()
# 5. 組合回應
meta = {
"total_items": total_items,
"total_pages": total_pages,
"current_page": page,
"page_size": limit
}
return create_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta})
except Error as e:
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [GET] /v1/users/<id> - 取得單一使用者
@app.route('/v1/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor(dictionary=True)
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,))
user = cursor.fetchone()
if user:
return create_response("success", 200, "User found", user)
else:
return create_response("error", 404, "User not found")
except Error as e:
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [POST] /v1/users - 新增使用者
@app.route('/v1/users', methods=['POST'])
def create_user():
data = request.get_json()
if not data or not 'name' in data or not 'email' in data or not 'age' in data:
return create_response("error", 400, "Missing required fields: name, email, age")
name, email, age = data['name'], data['email'], data['age']
if not isinstance(name, str) or not isinstance(email, str) or not isinstance(age, int):
return create_response("error", 400, "Invalid data type for fields")
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor(dictionary=True)
# 檢查 Email 是否已存在
cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE email = %s", (email,))
if cursor.fetchone():
return create_response("error", 409, "Email already exists")
# 插入新資料
insert_query = f"INSERT INTO {DB_TABLE} (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(insert_query, (name, email, age))
conn.commit()
new_user_id = cursor.lastrowid
# 查詢並回傳新建立的資料
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (new_user_id,))
new_user = cursor.fetchone()
return create_response("success", 201, "User created successfully", new_user)
except Error as e:
conn.rollback()
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [PATCH] /v1/users/<id> - 更新使用者
@app.route('/v1/users/<int:user_id>', methods=['PATCH'])
def update_user(user_id):
data = request.get_json()
if not data:
return create_response("error", 400, "No update data provided")
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor(dictionary=True)
# 檢查使用者是否存在
cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (user_id,))
if not cursor.fetchone():
return create_response("error", 404, "User not found")
# 動態建立更新欄位
update_fields = []
params = []
for key in ['name', 'email', 'age']:
if key in data:
update_fields.append(f"{key} = %s")
params.append(data[key])
if not update_fields:
return create_response("error", 400, "No valid fields to update")
params.append(user_id)
update_query = f"UPDATE {DB_TABLE} SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(update_query, tuple(params))
conn.commit()
# 查詢並回傳更新後的資料
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,))
updated_user = cursor.fetchone()
return create_response("success", 200, "User updated successfully", updated_user)
except Error as e:
conn.rollback()
# 處理 email 重複的錯誤
if 'Duplicate entry' in str(e) and 'for key \'email\'' in str(e):
return create_response("error", 409, "Email already exists")
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [DELETE] /v1/users/<id> - 刪除使用者
@app.route('/v1/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor()
# 執行刪除
cursor.execute(f"DELETE FROM {DB_TABLE} WHERE id = %s", (user_id,))
conn.commit()
# 檢查是否有資料被刪除
if cursor.rowcount == 0:
return create_response("error", 404, "User not found")
# 成功刪除,回傳 204 No Content
return '', 204
except Error as e:
conn.rollback()
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# --- 主程式進入點 ---
if __name__ == '__main__':
# 建議在生產環境中使用 Gunicorn 或 uWSGI
app.run(host='0.0.0.0', port=5000, debug=True)