from flask import Flask, request, jsonify from flask_cors import CORS import mysql.connector import os from datetime import datetime app = Flask(__name__) CORS(app) # 啟用 CORS,允許本機前端存取 # 資料庫連線資訊 DB_CONFIG = { 'host': 'mysql.theaken.com', 'port': 33306, 'user': 'A019', 'password': '9wvKEkxBzVca', 'database': 'db_A019' } # 建立資料庫連線 def get_db_connection(): try: conn = mysql.connector.connect(**DB_CONFIG) return conn except mysql.connector.Error as err: print(f"資料庫連線錯誤: {err}") return None # 統一回應格式 def create_response(status, code, message, data=None): response = { "status": status, "code": code, "message": message } if data is not None: response["data"] = data return response # 錯誤處理 def create_error_response(code, message): return create_response("error", code, message) # 獲取所有使用者,支援過濾和分頁 @app.route('/v1/users', methods=['GET']) def get_users(): try: # 取得查詢參數 min_age = request.args.get('min_age', type=int) max_age = request.args.get('max_age', type=int) page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 10, type=int) # 計算偏移量 offset = (page - 1) * limit conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) # 建立基本查詢 query = "SELECT * FROM users" count_query = "SELECT COUNT(*) as total FROM users" params = [] where_clauses = [] # 加入過濾條件 if min_age is not None: where_clauses.append("age >= %s") params.append(min_age) if max_age is not None: where_clauses.append("age <= %s") params.append(max_age) # 組合 WHERE 子句 if where_clauses: query += " WHERE " + " AND ".join(where_clauses) count_query += " WHERE " + " AND ".join(where_clauses) # 加入分頁 query += " LIMIT %s OFFSET %s" params.extend([limit, offset]) # 執行查詢 cursor.execute(query, params) users = cursor.fetchall() # 獲取總記錄數 cursor.execute(count_query, params[:-2] if params else []) total = cursor.fetchone()['total'] # 計算總頁數 total_pages = (total + limit - 1) // limit # 建立 meta 資訊 meta = { "total": total, "page": page, "limit": limit, "total_pages": total_pages } cursor.close() conn.close() return jsonify(create_response("success", 200, "成功獲取使用者列表", {"users": users, "meta": meta})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 獲取所有披薩,支援過濾和分頁 @app.route('/v1/pizzas', methods=['GET']) def get_pizzas(): try: # 取得查詢參數 min_price = request.args.get('min_price', type=float) max_price = request.args.get('max_price', type=float) size = request.args.get('size') page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 10, type=int) # 計算偏移量 offset = (page - 1) * limit conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) # 建立基本查詢 query = "SELECT * FROM pizzas" count_query = "SELECT COUNT(*) as total FROM pizzas" params = [] where_clauses = [] # 加入過濾條件 if min_price is not None: where_clauses.append("price >= %s") params.append(min_price) if max_price is not None: where_clauses.append("price <= %s") params.append(max_price) if size is not None: where_clauses.append("size = %s") params.append(size) # 組合 WHERE 子句 if where_clauses: query += " WHERE " + " AND ".join(where_clauses) count_query += " WHERE " + " AND ".join(where_clauses) # 加入分頁 query += " LIMIT %s OFFSET %s" params.extend([limit, offset]) # 執行查詢 cursor.execute(query, params) pizzas = cursor.fetchall() # 獲取總記錄數 cursor.execute(count_query, params[:-2] if params else []) total = cursor.fetchone()['total'] # 計算總頁數 total_pages = (total + limit - 1) // limit # 建立 meta 資訊 meta = { "total": total, "page": page, "limit": limit, "total_pages": total_pages } cursor.close() conn.close() return jsonify(create_response("success", 200, "成功獲取披薩列表", {"pizzas": pizzas, "meta": meta})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 獲取單一使用者 @app.route('/v1/users/', methods=['GET']) def get_user(user_id): try: conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() cursor.close() conn.close() if not user: return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404 return jsonify(create_response("success", 200, "成功獲取使用者資料", {"user": user})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 獲取單一披薩 @app.route('/v1/pizzas/', methods=['GET']) def get_pizza_by_id(pizza_id): try: conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,)) pizza = cursor.fetchone() cursor.close() conn.close() if not pizza: return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404 return jsonify(create_response("success", 200, "成功獲取披薩資料", {"pizza": pizza})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() cursor.close() conn.close() if not user: return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404 return jsonify(create_response("success", 200, "成功獲取使用者資料", {"user": user})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 獲取單一披薩 # 已在上方定義 GET /v1/pizzas/ 路由 # 新增披薩 @app.route('/v1/pizzas', methods=['POST']) def create_pizza(): try: data = request.get_json() # 驗證必要欄位 required_fields = ['name', 'price', 'size'] for field in required_fields: if field not in data: return jsonify(create_error_response(400, f"缺少必要欄位: {field}")), 400 # 驗證資料類型 if not isinstance(data['name'], str): return jsonify(create_error_response(400, "name 必須是字串")), 400 try: price = float(data['price']) if price <= 0: return jsonify(create_error_response(400, "price 必須大於 0")), 400 except (ValueError, TypeError): return jsonify(create_error_response(400, "price 必須是有效的數字")), 400 if not isinstance(data['size'], str) or data['size'] not in ['S', 'M', 'L']: return jsonify(create_error_response(400, "size 必須是 S、M 或 L")), 400 conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) # 設定時間 now = datetime.now() # 插入資料 insert_query = "INSERT INTO pizzas (name, price, size, created_at, updated_at) VALUES (%s, %s, %s, %s, %s)" cursor.execute(insert_query, (data['name'], price, data['size'], now, now)) pizza_id = cursor.lastrowid conn.commit() # 獲取新增的披薩資料 cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,)) new_pizza = cursor.fetchone() cursor.close() conn.close() return jsonify(create_response("success", 201, "成功新增披薩", {"pizza": new_pizza})), 201 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 更新披薩 @app.route('/v1/pizzas/', methods=['PATCH']) def update_pizza(pizza_id): try: data = request.get_json() # 至少需要一個欄位 if not data or not any(field in data for field in ['name', 'price', 'size']): return jsonify(create_error_response(400, "至少需要提供一個欄位: name, price, size")), 400 # 驗證資料類型 if 'name' in data and not isinstance(data['name'], str): return jsonify(create_error_response(400, "name 必須是字串")), 400 if 'price' in data: try: price = float(data['price']) if price <= 0: return jsonify(create_error_response(400, "price 必須大於 0")), 400 data['price'] = price except (ValueError, TypeError): return jsonify(create_error_response(400, "price 必須是有效的數字")), 400 if 'size' in data and (not isinstance(data['size'], str) or data['size'] not in ['S', 'M', 'L']): return jsonify(create_error_response(400, "size 必須是 S、M 或 L")), 400 conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) # 檢查披薩是否存在 cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,)) pizza = cursor.fetchone() if not pizza: cursor.close() conn.close() return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404 # 設定更新時間 now = datetime.now() data['updated_at'] = now # 建立更新查詢 update_fields = [] update_values = [] for field in ['name', 'price', 'size', 'updated_at']: if field in data: update_fields.append(f"{field} = %s") update_values.append(data[field]) update_values.append(pizza_id) # WHERE id = %s 的參數 update_query = f"UPDATE pizzas SET {', '.join(update_fields)} WHERE id = %s" cursor.execute(update_query, update_values) conn.commit() # 獲取更新後的披薩資料 cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,)) updated_pizza = cursor.fetchone() cursor.close() conn.close() return jsonify(create_response("success", 200, "成功更新披薩", {"pizza": updated_pizza})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 刪除披薩 @app.route('/v1/pizzas/', methods=['DELETE']) def delete_pizza(pizza_id): try: conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor() # 檢查披薩是否存在 cursor.execute("SELECT id FROM pizzas WHERE id = %s", (pizza_id,)) pizza = cursor.fetchone() if not pizza: cursor.close() conn.close() return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404 # 刪除披薩 cursor.execute("DELETE FROM pizzas WHERE id = %s", (pizza_id,)) conn.commit() cursor.close() conn.close() return jsonify(create_response("success", 204, "成功刪除披薩")), 204 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 新增使用者 @app.route('/v1/users', methods=['POST']) def create_user(): try: data = request.get_json() # 驗證必要欄位 required_fields = ['name', 'email', 'age'] for field in required_fields: if field not in data: return jsonify(create_error_response(400, f"缺少必要欄位: {field}")), 400 # 驗證資料類型 if not isinstance(data['name'], str): return jsonify(create_error_response(400, "name 必須是字串")), 400 # 驗證資料類型 if not isinstance(data['name'], str): return jsonify(create_error_response(400, "name 必須是字串")), 400 try: price = float(data['price']) if price <= 0: return jsonify(create_error_response(400, "price 必須大於 0")), 400 except (ValueError, TypeError): return jsonify(create_error_response(400, "price 必須是有效的數字")), 400 if not isinstance(data['size'], str) or data['size'] not in ['S', 'M', 'L']: return jsonify(create_error_response(400, "size 必須是 S、M 或 L")), 400 conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor() # 設定時間 now = datetime.now() # 插入資料 insert_query = "INSERT INTO pizzas (name, price, size, created_at, updated_at) VALUES (%s, %s, %s, %s, %s)" cursor.execute(insert_query, (data['name'], price, data['size'], now, now)) pizza_id = cursor.lastrowid conn.commit() # 獲取新增的披薩資料 cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,)) new_pizza = cursor.fetchone() cursor.close() conn.close() return jsonify(create_response("success", 201, "成功新增披薩", {"pizza": new_pizza})), 201 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 if not isinstance(data['email'], str): return jsonify(create_error_response(400, "email 必須是字串")), 400 if not isinstance(data['age'], int): return jsonify(create_error_response(400, "age 必須是整數")), 400 # 驗證 email 格式 (簡單驗證) if '@' not in data['email']: return jsonify(create_error_response(400, "email 格式不正確")), 400 conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) # 使用參數化查詢避免 SQL 注入 query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)" cursor.execute(query, (data['name'], data['email'], data['age'])) # 獲取新增的使用者 ID user_id = cursor.lastrowid conn.commit() # 獲取新增的使用者資料 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) new_user = cursor.fetchone() cursor.close() conn.close() return jsonify(create_response("success", 201, "成功新增使用者", {"user": new_user})), 201 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 更新使用者 @app.route('/v1/users/', methods=['PATCH']) def update_user(user_id): try: data = request.get_json() # 檢查是否有要更新的欄位 if not data: return jsonify(create_error_response(400, "沒有提供要更新的資料")), 400 # 驗證資料類型 if 'name' in data and not isinstance(data['name'], str): return jsonify(create_error_response(400, "name 必須是字串")), 400 if 'email' in data and not isinstance(data['email'], str): return jsonify(create_error_response(400, "email 必須是字串")), 400 if 'age' in data and not isinstance(data['age'], int): return jsonify(create_error_response(400, "age 必須是整數")), 400 # 驗證 email 格式 (簡單驗證) if 'email' in data and '@' not in data['email']: return jsonify(create_error_response(400, "email 格式不正確")), 400 conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) # 檢查使用者是否存在 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() if not user: cursor.close() conn.close() return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404 # 建立更新查詢 update_fields = [] params = [] if 'name' in data: update_fields.append("name = %s") params.append(data['name']) if 'email' in data: update_fields.append("email = %s") params.append(data['email']) if 'age' in data: update_fields.append("age = %s") params.append(data['age']) # 如果沒有要更新的欄位 if not update_fields: cursor.close() conn.close() return jsonify(create_error_response(400, "沒有提供有效的更新欄位")), 400 # 建立更新查詢 query = "UPDATE users SET " + ", ".join(update_fields) + " WHERE id = %s" params.append(user_id) # 執行更新 cursor.execute(query, params) conn.commit() # 獲取更新後的使用者資料 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) updated_user = cursor.fetchone() cursor.close() conn.close() return jsonify(create_response("success", 200, "成功更新使用者資料", {"user": updated_user})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 刪除使用者 @app.route('/v1/users/', methods=['DELETE']) def delete_user(user_id): try: conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor() # 檢查使用者是否存在 cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() if not user: cursor.close() conn.close() return jsonify(create_error_response(404, f"找不到 ID 為 {user_id} 的使用者")), 404 # 刪除使用者 cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) conn.commit() cursor.close() conn.close() return jsonify(create_response("success", 204, "成功刪除使用者")), 204 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 獲取所有 pizzas,支援分頁 # 已在上方定義 GET /v1/pizzas 路由 # 獲取總記錄數 cursor.execute(count_query, params[:-2] if params else []) total = cursor.fetchone()['total'] # 計算總頁數 total_pages = (total + limit - 1) // limit # 建立 meta 資訊 meta = { "total": total, "page": page, "limit": limit, "total_pages": total_pages } cursor.close() conn.close() return jsonify(create_response("success", 200, "成功獲取披薩列表", {"pizzas": pizzas, "meta": meta})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 獲取單一 pizza # 注意:此路由已在其他地方定義 def get_pizza(pizza_id): try: conn = get_db_connection() if not conn: return jsonify(create_error_response(500, "資料庫連線失敗")), 500 cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM pizzas WHERE id = %s", (pizza_id,)) pizza = cursor.fetchone() cursor.close() conn.close() if not pizza: return jsonify(create_error_response(404, f"找不到 ID 為 {pizza_id} 的披薩")), 404 return jsonify(create_response("success", 200, "成功獲取披薩資料", {"pizza": pizza})), 200 except Exception as e: return jsonify(create_error_response(500, f"伺服器錯誤: {str(e)}")), 500 # 新增 pizza # 已在上方定義 POST /v1/pizzas 路由 # 更新 pizza # 已在上方定義 PATCH /v1/pizzas/ 路由 # 刪除 pizza # 已在上方定義 DELETE /v1/pizzas/ 路由 # 主程式入口 if __name__ == '__main__': # 確保資料表存在 conn = get_db_connection() if conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) NOT NULL, age INT NOT NULL ) """) # 確保 pizzas 資料表存在 cursor.execute(""" CREATE TABLE IF NOT EXISTS pizzas ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50) NOT NULL, price DECIMAL(10, 2) NOT NULL, size VARCHAR(2) NOT NULL, created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL ) """) conn.commit() cursor.close() conn.close() app.run(debug=True)