commit 100f9a2ede5d8ce9960a641e28daf8804187b832 Author: Allen Date: Mon Sep 8 16:24:14 2025 +0800 Initial commit diff --git a/app.py b/app.py new file mode 100644 index 0000000..e860321 --- /dev/null +++ b/app.py @@ -0,0 +1,263 @@ + + +import os +import math +from flask import Flask, jsonify, request +from flask_cors import CORS +import mysql.connector +from mysql.connector import Error + +# --- 環境變數或直接設定 --- +# 建議使用環境變數來管理敏感資訊,但為了方便直接執行,這裡先寫死 +DB_HOST = os.getenv("DB_HOST", "mysql.theaken.com") +DB_PORT = int(os.getenv("DB_PORT", 33306)) +DB_NAME = os.getenv("DB_NAME", "db_A002") +DB_USER = os.getenv("DB_USER", "A002") +DB_PASSWORD = os.getenv("DB_PASSWORD", "5YqyroX7o22y") +DB_TABLE = "users" + +# --- Flask App 初始化 --- +app = Flask(__name__) +# 啟用 CORS,允許所有來源的請求,在生產環境中應更嚴格限制 +CORS(app, resources={r"/v1/*": {"origins": "*"}}) + +# --- 資料庫連線 --- +def get_db_connection(): + """建立並回傳資料庫連線""" + try: + conn = mysql.connector.connect( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASSWORD + ) + return conn + except Error as e: + print(f"資料庫連線失敗: {e}") + return None + +# --- 統一回應格式 --- +def create_response(status, code, message, data=None): + """建立標準化的 JSON 回應""" + response = {"status": status, "code": code, "message": message} + if data is not None: + response["data"] = data + return jsonify(response), code + +# --- 錯誤處理 --- +@app.errorhandler(404) +def not_found_error(error): + return create_response("error", 404, "Resource not found") + +@app.errorhandler(500) +def internal_error(error): + return create_response("error", 500, "Internal server error") + +# --- API Endpoints --- + +# [GET] /v1/users - 取得使用者列表(支援篩選與分頁) +@app.route('/v1/users', methods=['GET']) +def get_users(): + conn = get_db_connection() + if not conn: + return create_response("error", 500, "Database connection failed") + + try: + # 1. 取得查詢參數 + min_age = request.args.get('min_age', type=int) + max_age = request.args.get('max_age', type=int) + page = request.args.get('page', 1, type=int) + limit = request.args.get('limit', 10, type=int) + offset = (page - 1) * limit + + # 2. 動態建立查詢條件 + query_conditions = [] + params = [] + if min_age is not None: + query_conditions.append("age >= %s") + params.append(min_age) + if max_age is not None: + query_conditions.append("age <= %s") + params.append(max_age) + + where_clause = " AND ".join(query_conditions) + if where_clause: + where_clause = "WHERE " + where_clause + + # 3. 查詢總數以計算分頁 + cursor = conn.cursor() + count_query = f"SELECT COUNT(*) FROM {DB_TABLE} {where_clause}" + cursor.execute(count_query, tuple(params)) + total_items = cursor.fetchone()[0] + total_pages = math.ceil(total_items / limit) + + # 4. 查詢當頁資料 + data_query = f"SELECT id, name, email, age FROM {DB_TABLE} {where_clause} ORDER BY id ASC LIMIT %s OFFSET %s" + cursor = conn.cursor(dictionary=True) + cursor.execute(data_query, tuple(params + [limit, offset])) + users = cursor.fetchall() + + # 5. 組合回應 + meta = { + "total_items": total_items, + "total_pages": total_pages, + "current_page": page, + "page_size": limit + } + return create_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta}) + + except Error as e: + return create_response("error", 500, f"An error occurred: {e}") + finally: + if conn.is_connected(): + cursor.close() + conn.close() + +# [GET] /v1/users/ - 取得單一使用者 +@app.route('/v1/users/', methods=['GET']) +def get_user(user_id): + conn = get_db_connection() + if not conn: + return create_response("error", 500, "Database connection failed") + + try: + cursor = conn.cursor(dictionary=True) + cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,)) + user = cursor.fetchone() + if user: + return create_response("success", 200, "User found", user) + else: + return create_response("error", 404, "User not found") + except Error as e: + return create_response("error", 500, f"An error occurred: {e}") + finally: + if conn.is_connected(): + cursor.close() + conn.close() + +# [POST] /v1/users - 新增使用者 +@app.route('/v1/users', methods=['POST']) +def create_user(): + data = request.get_json() + if not data or not 'name' in data or not 'email' in data or not 'age' in data: + return create_response("error", 400, "Missing required fields: name, email, age") + + name, email, age = data['name'], data['email'], data['age'] + if not isinstance(name, str) or not isinstance(email, str) or not isinstance(age, int): + return create_response("error", 400, "Invalid data type for fields") + + conn = get_db_connection() + if not conn: + return create_response("error", 500, "Database connection failed") + + try: + cursor = conn.cursor(dictionary=True) + # 檢查 Email 是否已存在 + cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE email = %s", (email,)) + if cursor.fetchone(): + return create_response("error", 409, "Email already exists") + + # 插入新資料 + insert_query = f"INSERT INTO {DB_TABLE} (name, email, age) VALUES (%s, %s, %s)" + cursor.execute(insert_query, (name, email, age)) + conn.commit() + + new_user_id = cursor.lastrowid + + # 查詢並回傳新建立的資料 + cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (new_user_id,)) + new_user = cursor.fetchone() + + return create_response("success", 201, "User created successfully", new_user) + except Error as e: + conn.rollback() + return create_response("error", 500, f"An error occurred: {e}") + finally: + if conn.is_connected(): + cursor.close() + conn.close() + +# [PATCH] /v1/users/ - 更新使用者 +@app.route('/v1/users/', methods=['PATCH']) +def update_user(user_id): + data = request.get_json() + if not data: + return create_response("error", 400, "No update data provided") + + conn = get_db_connection() + if not conn: + return create_response("error", 500, "Database connection failed") + + try: + cursor = conn.cursor(dictionary=True) + # 檢查使用者是否存在 + cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (user_id,)) + if not cursor.fetchone(): + return create_response("error", 404, "User not found") + + # 動態建立更新欄位 + update_fields = [] + params = [] + for key in ['name', 'email', 'age']: + if key in data: + update_fields.append(f"{key} = %s") + params.append(data[key]) + + if not update_fields: + return create_response("error", 400, "No valid fields to update") + + params.append(user_id) + update_query = f"UPDATE {DB_TABLE} SET {', '.join(update_fields)} WHERE id = %s" + + cursor.execute(update_query, tuple(params)) + conn.commit() + + # 查詢並回傳更新後的資料 + cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,)) + updated_user = cursor.fetchone() + + return create_response("success", 200, "User updated successfully", updated_user) + except Error as e: + conn.rollback() + # 處理 email 重複的錯誤 + if 'Duplicate entry' in str(e) and 'for key \'email\'' in str(e): + return create_response("error", 409, "Email already exists") + return create_response("error", 500, f"An error occurred: {e}") + finally: + if conn.is_connected(): + cursor.close() + conn.close() + +# [DELETE] /v1/users/ - 刪除使用者 +@app.route('/v1/users/', methods=['DELETE']) +def delete_user(user_id): + conn = get_db_connection() + if not conn: + return create_response("error", 500, "Database connection failed") + + try: + cursor = conn.cursor() + # 執行刪除 + cursor.execute(f"DELETE FROM {DB_TABLE} WHERE id = %s", (user_id,)) + conn.commit() + + # 檢查是否有資料被刪除 + if cursor.rowcount == 0: + return create_response("error", 404, "User not found") + + # 成功刪除,回傳 204 No Content + return '', 204 + except Error as e: + conn.rollback() + return create_response("error", 500, f"An error occurred: {e}") + finally: + if conn.is_connected(): + cursor.close() + conn.close() + + +# --- 主程式進入點 --- +if __name__ == '__main__': + # 建議在生產環境中使用 Gunicorn 或 uWSGI + app.run(host='0.0.0.0', port=5000, debug=True)