Files
0908-1/app.py
2025-09-08 16:24:14 +08:00

264 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import math
from flask import Flask, jsonify, request
from flask_cors import CORS
import mysql.connector
from mysql.connector import Error
# --- 環境變數或直接設定 ---
# 建議使用環境變數來管理敏感資訊,但為了方便直接執行,這裡先寫死
DB_HOST = os.getenv("DB_HOST", "mysql.theaken.com")
DB_PORT = int(os.getenv("DB_PORT", 33306))
DB_NAME = os.getenv("DB_NAME", "db_A002")
DB_USER = os.getenv("DB_USER", "A002")
DB_PASSWORD = os.getenv("DB_PASSWORD", "5YqyroX7o22y")
DB_TABLE = "users"
# --- Flask App 初始化 ---
app = Flask(__name__)
# 啟用 CORS允許所有來源的請求在生產環境中應更嚴格限制
CORS(app, resources={r"/v1/*": {"origins": "*"}})
# --- 資料庫連線 ---
def get_db_connection():
"""建立並回傳資料庫連線"""
try:
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
return conn
except Error as e:
print(f"資料庫連線失敗: {e}")
return None
# --- 統一回應格式 ---
def create_response(status, code, message, data=None):
"""建立標準化的 JSON 回應"""
response = {"status": status, "code": code, "message": message}
if data is not None:
response["data"] = data
return jsonify(response), code
# --- 錯誤處理 ---
@app.errorhandler(404)
def not_found_error(error):
return create_response("error", 404, "Resource not found")
@app.errorhandler(500)
def internal_error(error):
return create_response("error", 500, "Internal server error")
# --- API Endpoints ---
# [GET] /v1/users - 取得使用者列表(支援篩選與分頁)
@app.route('/v1/users', methods=['GET'])
def get_users():
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
# 1. 取得查詢參數
min_age = request.args.get('min_age', type=int)
max_age = request.args.get('max_age', type=int)
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
offset = (page - 1) * limit
# 2. 動態建立查詢條件
query_conditions = []
params = []
if min_age is not None:
query_conditions.append("age >= %s")
params.append(min_age)
if max_age is not None:
query_conditions.append("age <= %s")
params.append(max_age)
where_clause = " AND ".join(query_conditions)
if where_clause:
where_clause = "WHERE " + where_clause
# 3. 查詢總數以計算分頁
cursor = conn.cursor()
count_query = f"SELECT COUNT(*) FROM {DB_TABLE} {where_clause}"
cursor.execute(count_query, tuple(params))
total_items = cursor.fetchone()[0]
total_pages = math.ceil(total_items / limit)
# 4. 查詢當頁資料
data_query = f"SELECT id, name, email, age FROM {DB_TABLE} {where_clause} ORDER BY id ASC LIMIT %s OFFSET %s"
cursor = conn.cursor(dictionary=True)
cursor.execute(data_query, tuple(params + [limit, offset]))
users = cursor.fetchall()
# 5. 組合回應
meta = {
"total_items": total_items,
"total_pages": total_pages,
"current_page": page,
"page_size": limit
}
return create_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta})
except Error as e:
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [GET] /v1/users/<id> - 取得單一使用者
@app.route('/v1/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor(dictionary=True)
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,))
user = cursor.fetchone()
if user:
return create_response("success", 200, "User found", user)
else:
return create_response("error", 404, "User not found")
except Error as e:
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [POST] /v1/users - 新增使用者
@app.route('/v1/users', methods=['POST'])
def create_user():
data = request.get_json()
if not data or not 'name' in data or not 'email' in data or not 'age' in data:
return create_response("error", 400, "Missing required fields: name, email, age")
name, email, age = data['name'], data['email'], data['age']
if not isinstance(name, str) or not isinstance(email, str) or not isinstance(age, int):
return create_response("error", 400, "Invalid data type for fields")
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor(dictionary=True)
# 檢查 Email 是否已存在
cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE email = %s", (email,))
if cursor.fetchone():
return create_response("error", 409, "Email already exists")
# 插入新資料
insert_query = f"INSERT INTO {DB_TABLE} (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(insert_query, (name, email, age))
conn.commit()
new_user_id = cursor.lastrowid
# 查詢並回傳新建立的資料
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (new_user_id,))
new_user = cursor.fetchone()
return create_response("success", 201, "User created successfully", new_user)
except Error as e:
conn.rollback()
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [PATCH] /v1/users/<id> - 更新使用者
@app.route('/v1/users/<int:user_id>', methods=['PATCH'])
def update_user(user_id):
data = request.get_json()
if not data:
return create_response("error", 400, "No update data provided")
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor(dictionary=True)
# 檢查使用者是否存在
cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (user_id,))
if not cursor.fetchone():
return create_response("error", 404, "User not found")
# 動態建立更新欄位
update_fields = []
params = []
for key in ['name', 'email', 'age']:
if key in data:
update_fields.append(f"{key} = %s")
params.append(data[key])
if not update_fields:
return create_response("error", 400, "No valid fields to update")
params.append(user_id)
update_query = f"UPDATE {DB_TABLE} SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(update_query, tuple(params))
conn.commit()
# 查詢並回傳更新後的資料
cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,))
updated_user = cursor.fetchone()
return create_response("success", 200, "User updated successfully", updated_user)
except Error as e:
conn.rollback()
# 處理 email 重複的錯誤
if 'Duplicate entry' in str(e) and 'for key \'email\'' in str(e):
return create_response("error", 409, "Email already exists")
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# [DELETE] /v1/users/<id> - 刪除使用者
@app.route('/v1/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
conn = get_db_connection()
if not conn:
return create_response("error", 500, "Database connection failed")
try:
cursor = conn.cursor()
# 執行刪除
cursor.execute(f"DELETE FROM {DB_TABLE} WHERE id = %s", (user_id,))
conn.commit()
# 檢查是否有資料被刪除
if cursor.rowcount == 0:
return create_response("error", 404, "User not found")
# 成功刪除,回傳 204 No Content
return '', 204
except Error as e:
conn.rollback()
return create_response("error", 500, f"An error occurred: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# --- 主程式進入點 ---
if __name__ == '__main__':
# 建議在生產環境中使用 Gunicorn 或 uWSGI
app.run(host='0.0.0.0', port=5000, debug=True)