Initial commit

This commit is contained in:
2025-09-08 16:20:32 +08:00
commit 812dc64e22
6 changed files with 326 additions and 0 deletions

BIN
Data/pizza.xlsx Normal file

Binary file not shown.

193
flask_pizza_api.py Normal file
View File

@@ -0,0 +1,193 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
import mysql.connector
from mysql.connector import errorcode
app = Flask(__name__)
CORS(app)
# 資料庫連線資訊
DB_HOST = "mysql.theaken.com"
DB_PORT = 33306
DB_NAME = "db_A018"
DB_USER = "A018"
DB_PASSWORD = "4MQYkJRYtyLE"
# 統一回應格式
def success_response(data=None, code=200, message="Success"):
return jsonify({"status": "success", "code": code, "message": message, "data": data}), code
def error_response(code, message):
return jsonify({"status": "error", "code": code, "message": message}), code
# 建立資料庫連線
def get_db_connection():
return mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
@app.route("/v1/pizza", methods=["GET"])
def get_pizzas():
try:
min_price = request.args.get("min_price")
max_price = request.args.get("max_price")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 10))
offset = (page - 1) * limit
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM pizza WHERE 1=1"
params = []
if min_price:
query += " AND price >= %s"
params.append(min_price)
if max_price:
query += " AND price <= %s"
params.append(max_price)
query += " LIMIT %s OFFSET %s"
params.extend([limit, offset])
cursor.execute(query, params)
pizzas = cursor.fetchall()
cursor.execute("SELECT COUNT(*) as total FROM pizza")
total = cursor.fetchone()["total"]
meta = {"page": page, "limit": limit, "total": total}
return success_response({"pizzas": pizzas, "meta": meta})
except Exception as e:
return error_response(500, str(e))
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
@app.route("/v1/pizza/<int:id>", methods=["GET"])
def get_pizza(id):
try:
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM pizza WHERE id = %s", (id,))
pizza = cursor.fetchone()
if not pizza:
return error_response(404, "Pizza not found")
return success_response(pizza)
except Exception as e:
return error_response(500, str(e))
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
@app.route("/v1/pizza", methods=["POST"])
def create_pizza():
try:
data = request.get_json()
name = data.get("name")
size = data.get("size")
price = data.get("price")
if not all([name, size, price]):
return error_response(400, "Missing required fields")
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(
"INSERT INTO pizza (name, size, price) VALUES (%s, %s, %s)",
(name, size, price)
)
conn.commit()
new_id = cursor.lastrowid
cursor.execute("SELECT * FROM pizza WHERE id = %s", (new_id,))
new_pizza = cursor.fetchone()
return success_response(new_pizza, 201, "Pizza created successfully")
except Exception as e:
return error_response(500, str(e))
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
@app.route("/v1/pizza/<int:id>", methods=["PATCH"])
def update_pizza(id):
try:
data = request.get_json()
fields = []
params = []
for key in ["name", "size", "price"]:
if key in data:
fields.append(f"{key} = %s")
params.append(data[key])
if not fields:
return error_response(400, "No fields to update")
params.append(id)
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(f"UPDATE pizza SET {', '.join(fields)} WHERE id = %s", params)
conn.commit()
cursor.execute("SELECT * FROM pizza WHERE id = %s", (id,))
updated_pizza = cursor.fetchone()
if not updated_pizza:
return error_response(404, "Pizza not found")
return success_response(updated_pizza, 200, "Pizza updated successfully")
except Exception as e:
return error_response(500, str(e))
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
@app.route("/v1/pizza/<int:id>", methods=["DELETE"])
def delete_pizza(id):
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM pizza WHERE id = %s", (id,))
conn.commit()
if cursor.rowcount == 0:
return error_response(404, "Pizza not found")
return success_response(None, 204, "Pizza deleted successfully")
except Exception as e:
return error_response(500, str(e))
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
app.run(debug=True)

63
insert_pizza_data.py Normal file
View File

@@ -0,0 +1,63 @@
import pandas as pd
import mysql.connector
from mysql.connector import errorcode
# 資料庫連線資訊
DB_HOST = "mysql.theaken.com"
DB_PORT = 33306
DB_NAME = "db_A018"
DB_USER = "A018"
DB_PASSWORD = "4MQYkJRYtyLE"
# Excel 檔案路徑
EXCEL_FILE = "c:\\AI_Program\\C0908\\Data\\pizza.xlsx"
def insert_data_to_db():
try:
# 讀取 Excel 檔案
data = pd.read_excel(EXCEL_FILE)
# 檢查欄位名稱是否正確
required_columns = ['name', 'size', 'price']
for column in required_columns:
if column not in data.columns:
raise ValueError(f"Excel 檔案缺少必要欄位: {column}")
# 建立資料庫連線
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
cursor = conn.cursor()
# 插入資料到 pizza 資料表ID 自動生成
for index, row in data.iterrows():
insert_query = (
"INSERT INTO pizza (name, size, price) "
"VALUES (%s, %s, %s)"
)
cursor.execute(insert_query, (row['name'], row['size'], row['price']))
conn.commit()
print("資料插入成功!")
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("使用者名稱或密碼錯誤")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("資料庫不存在")
else:
print(err)
except Exception as e:
print(f"處理資料時發生錯誤: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
insert_data_to_db()

17
read_pizza_excel.py Normal file
View File

@@ -0,0 +1,17 @@
import pandas as pd
# 讀取 Excel 檔案
file_path = "c:\\AI_Program\\C0908\\Data\\pizza.xlsx"
def read_excel():
try:
# 使用 pandas 讀取 Excel
data = pd.read_excel(file_path)
print("資料內容:")
print(data.head())
return data
except Exception as e:
print(f"讀取 Excel 檔案時發生錯誤: {e}")
if __name__ == "__main__":
read_excel()

53
test_db_connection.py Normal file
View File

@@ -0,0 +1,53 @@
import mysql.connector
from mysql.connector import errorcode
# 資料庫連線資訊
DB_HOST = "mysql.theaken.com"
DB_PORT = 33306
DB_NAME = "db_A018"
DB_USER = "A018"
DB_PASSWORD = "4MQYkJRYtyLE"
def create_table():
try:
# 建立資料庫連線
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
cursor = conn.cursor()
# 如果資料表已存在,先刪除
cursor.execute("DROP TABLE IF EXISTS pizza")
# 建立資料表
create_table_query = (
"CREATE TABLE pizza ("
" id INT AUTO_INCREMENT PRIMARY KEY,"
" name VARCHAR(50),"
" size VARCHAR(50),"
" price INT"
")"
)
cursor.execute(create_table_query)
print("資料表建立成功!")
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("使用者名稱或密碼錯誤")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("資料庫不存在")
else:
print(err)
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
create_table()

0
新文字文件.txt Normal file
View File