import os import math from flask import Flask, jsonify, request from flask_cors import CORS import mysql.connector from mysql.connector import Error # --- 環境變數或直接設定 --- # 建議使用環境變數來管理敏感資訊,但為了方便直接執行,這裡先寫死 DB_HOST = os.getenv("DB_HOST", "mysql.theaken.com") DB_PORT = int(os.getenv("DB_PORT", 33306)) DB_NAME = os.getenv("DB_NAME", "db_A002") DB_USER = os.getenv("DB_USER", "A002") DB_PASSWORD = os.getenv("DB_PASSWORD", "5YqyroX7o22y") DB_TABLE = "users" # --- Flask App 初始化 --- app = Flask(__name__) # 啟用 CORS,允許所有來源的請求,在生產環境中應更嚴格限制 CORS(app, resources={r"/v1/*": {"origins": "*"}}) # --- 資料庫連線 --- def get_db_connection(): """建立並回傳資料庫連線""" try: conn = mysql.connector.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) return conn except Error as e: print(f"資料庫連線失敗: {e}") return None # --- 統一回應格式 --- def create_response(status, code, message, data=None): """建立標準化的 JSON 回應""" response = {"status": status, "code": code, "message": message} if data is not None: response["data"] = data return jsonify(response), code # --- 錯誤處理 --- @app.errorhandler(404) def not_found_error(error): return create_response("error", 404, "Resource not found") @app.errorhandler(500) def internal_error(error): return create_response("error", 500, "Internal server error") # --- API Endpoints --- # [GET] /v1/users - 取得使用者列表(支援篩選與分頁) @app.route('/v1/users', methods=['GET']) def get_users(): conn = get_db_connection() if not conn: return create_response("error", 500, "Database connection failed") try: # 1. 取得查詢參數 min_age = request.args.get('min_age', type=int) max_age = request.args.get('max_age', type=int) page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 10, type=int) offset = (page - 1) * limit # 2. 動態建立查詢條件 query_conditions = [] params = [] if min_age is not None: query_conditions.append("age >= %s") params.append(min_age) if max_age is not None: query_conditions.append("age <= %s") params.append(max_age) where_clause = " AND ".join(query_conditions) if where_clause: where_clause = "WHERE " + where_clause # 3. 查詢總數以計算分頁 cursor = conn.cursor() count_query = f"SELECT COUNT(*) FROM {DB_TABLE} {where_clause}" cursor.execute(count_query, tuple(params)) total_items = cursor.fetchone()[0] total_pages = math.ceil(total_items / limit) # 4. 查詢當頁資料 data_query = f"SELECT id, name, email, age FROM {DB_TABLE} {where_clause} ORDER BY id ASC LIMIT %s OFFSET %s" cursor = conn.cursor(dictionary=True) cursor.execute(data_query, tuple(params + [limit, offset])) users = cursor.fetchall() # 5. 組合回應 meta = { "total_items": total_items, "total_pages": total_pages, "current_page": page, "page_size": limit } return create_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta}) except Error as e: return create_response("error", 500, f"An error occurred: {e}") finally: if conn.is_connected(): cursor.close() conn.close() # [GET] /v1/users/ - 取得單一使用者 @app.route('/v1/users/', methods=['GET']) def get_user(user_id): conn = get_db_connection() if not conn: return create_response("error", 500, "Database connection failed") try: cursor = conn.cursor(dictionary=True) cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,)) user = cursor.fetchone() if user: return create_response("success", 200, "User found", user) else: return create_response("error", 404, "User not found") except Error as e: return create_response("error", 500, f"An error occurred: {e}") finally: if conn.is_connected(): cursor.close() conn.close() # [POST] /v1/users - 新增使用者 @app.route('/v1/users', methods=['POST']) def create_user(): data = request.get_json() if not data or not 'name' in data or not 'email' in data or not 'age' in data: return create_response("error", 400, "Missing required fields: name, email, age") name, email, age = data['name'], data['email'], data['age'] if not isinstance(name, str) or not isinstance(email, str) or not isinstance(age, int): return create_response("error", 400, "Invalid data type for fields") conn = get_db_connection() if not conn: return create_response("error", 500, "Database connection failed") try: cursor = conn.cursor(dictionary=True) # 檢查 Email 是否已存在 cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE email = %s", (email,)) if cursor.fetchone(): return create_response("error", 409, "Email already exists") # 插入新資料 insert_query = f"INSERT INTO {DB_TABLE} (name, email, age) VALUES (%s, %s, %s)" cursor.execute(insert_query, (name, email, age)) conn.commit() new_user_id = cursor.lastrowid # 查詢並回傳新建立的資料 cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (new_user_id,)) new_user = cursor.fetchone() return create_response("success", 201, "User created successfully", new_user) except Error as e: conn.rollback() return create_response("error", 500, f"An error occurred: {e}") finally: if conn.is_connected(): cursor.close() conn.close() # [PATCH] /v1/users/ - 更新使用者 @app.route('/v1/users/', methods=['PATCH']) def update_user(user_id): data = request.get_json() if not data: return create_response("error", 400, "No update data provided") conn = get_db_connection() if not conn: return create_response("error", 500, "Database connection failed") try: cursor = conn.cursor(dictionary=True) # 檢查使用者是否存在 cursor.execute(f"SELECT id FROM {DB_TABLE} WHERE id = %s", (user_id,)) if not cursor.fetchone(): return create_response("error", 404, "User not found") # 動態建立更新欄位 update_fields = [] params = [] for key in ['name', 'email', 'age']: if key in data: update_fields.append(f"{key} = %s") params.append(data[key]) if not update_fields: return create_response("error", 400, "No valid fields to update") params.append(user_id) update_query = f"UPDATE {DB_TABLE} SET {', '.join(update_fields)} WHERE id = %s" cursor.execute(update_query, tuple(params)) conn.commit() # 查詢並回傳更新後的資料 cursor.execute(f"SELECT id, name, email, age FROM {DB_TABLE} WHERE id = %s", (user_id,)) updated_user = cursor.fetchone() return create_response("success", 200, "User updated successfully", updated_user) except Error as e: conn.rollback() # 處理 email 重複的錯誤 if 'Duplicate entry' in str(e) and 'for key \'email\'' in str(e): return create_response("error", 409, "Email already exists") return create_response("error", 500, f"An error occurred: {e}") finally: if conn.is_connected(): cursor.close() conn.close() # [DELETE] /v1/users/ - 刪除使用者 @app.route('/v1/users/', methods=['DELETE']) def delete_user(user_id): conn = get_db_connection() if not conn: return create_response("error", 500, "Database connection failed") try: cursor = conn.cursor() # 執行刪除 cursor.execute(f"DELETE FROM {DB_TABLE} WHERE id = %s", (user_id,)) conn.commit() # 檢查是否有資料被刪除 if cursor.rowcount == 0: return create_response("error", 404, "User not found") # 成功刪除,回傳 204 No Content return '', 204 except Error as e: conn.rollback() return create_response("error", 500, f"An error occurred: {e}") finally: if conn.is_connected(): cursor.close() conn.close() # --- 主程式進入點 --- if __name__ == '__main__': # 建議在生產環境中使用 Gunicorn 或 uWSGI app.run(host='0.0.0.0', port=5000, debug=True)