Files
wen_0908/app.py
2025-09-08 17:06:58 +08:00

464 lines
15 KiB
Python

from flask import Flask, request, jsonify, send_from_directory, render_template
from flask_cors import CORS
import mysql.connector
from mysql.connector import Error
import os
import pandas as pd
import io
import csv
from werkzeug.utils import secure_filename
app = Flask(__name__, static_folder='static')
CORS(app)
# 檔案上傳設定
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 限制上傳檔案大小為 16MB
# 確保上傳資料夾存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# 資料庫連接設定
db_config = {
'host': 'mysql.theaken.com',
'port': 33306,
'user': 'A008',
'password': '74knygxwzEms',
'database': 'db_A008'
}
# 建立資料庫連接
def get_db_connection():
try:
conn = mysql.connector.connect(**db_config)
return conn
except Error as e:
print(f"資料庫連接錯誤: {e}")
return None
# 統一回應格式
def make_response(status, code, message, data=None):
response = {
"status": status,
"code": code,
"message": message
}
if data is not None:
response["data"] = data
return jsonify(response)
# 錯誤處理
@app.errorhandler(404)
def not_found(error):
return make_response("error", 404, "Not Found"), 404
@app.errorhandler(400)
def bad_request(error):
return make_response("error", 400, "Bad Request"), 400
@app.errorhandler(500)
def server_error(error):
return make_response("error", 500, "Internal Server Error"), 500
# 路由: GET /v1/users
@app.route('/v1/users', methods=['GET'])
def get_users():
try:
# 取得查詢參數
min_age = request.args.get('min_age', type=int)
max_age = request.args.get('max_age', type=int)
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 10, type=int)
# 計算偏移量
offset = (page - 1) * limit
conn = get_db_connection()
if not conn:
return make_response("error", 500, "Database connection error"), 500
cursor = conn.cursor(dictionary=True)
# 建立基本查詢
query = "SELECT * FROM users"
count_query = "SELECT COUNT(*) as total FROM users"
params = []
where_clauses = []
# 加入年齡過濾條件
if min_age is not None:
where_clauses.append("age >= %s")
params.append(min_age)
if max_age is not None:
where_clauses.append("age <= %s")
params.append(max_age)
# 組合 WHERE 子句
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
count_query += " WHERE " + " AND ".join(where_clauses)
# 加入分頁
query += " LIMIT %s OFFSET %s"
params.extend([limit, offset])
# 執行查詢
cursor.execute(query, params)
users = cursor.fetchall()
# 取得總筆數
cursor.execute(count_query, params[:-2] if params else [])
total = cursor.fetchone()['total']
# 計算總頁數
total_pages = (total + limit - 1) // limit
# 建立 meta 資訊
meta = {
"total": total,
"page": page,
"limit": limit,
"total_pages": total_pages
}
cursor.close()
conn.close()
return make_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta}), 200
except Exception as e:
print(f"Error: {e}")
return make_response("error", 500, str(e)), 500
# 路由: GET /v1/users/<id>
@app.route('/v1/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
try:
conn = get_db_connection()
if not conn:
return make_response("error", 500, "Database connection error"), 500
cursor = conn.cursor(dictionary=True)
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
user = cursor.fetchone()
cursor.close()
conn.close()
if not user:
return make_response("error", 404, f"User with id {user_id} not found"), 404
return make_response("success", 200, "User retrieved successfully", {"user": user}), 200
except Exception as e:
print(f"Error: {e}")
return make_response("error", 500, str(e)), 500
# 路由: POST /v1/users
@app.route('/v1/users', methods=['POST'])
def create_user():
try:
data = request.get_json()
# 驗證必要欄位
if not all(key in data for key in ['name', 'email', 'age']):
return make_response("error", 400, "Missing required fields: name, email, age"), 400
# 驗證資料類型
if not isinstance(data['name'], str) or not data['name'].strip():
return make_response("error", 400, "Name must be a non-empty string"), 400
if not isinstance(data['email'], str) or '@' not in data['email']:
return make_response("error", 400, "Invalid email format"), 400
if not isinstance(data['age'], int) or data['age'] < 0:
return make_response("error", 400, "Age must be a positive integer"), 400
conn = get_db_connection()
if not conn:
return make_response("error", 500, "Database connection error"), 500
cursor = conn.cursor(dictionary=True)
# 使用參數化查詢避免 SQL 注入
query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(query, (data['name'], data['email'], data['age']))
# 取得新增的使用者 ID
user_id = cursor.lastrowid
conn.commit()
# 取得新增的使用者資料
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
new_user = cursor.fetchone()
cursor.close()
conn.close()
return make_response("success", 201, "User created successfully", {"user": new_user}), 201
except Exception as e:
print(f"Error: {e}")
return make_response("error", 500, str(e)), 500
# 路由: PATCH /v1/users/<id>
@app.route('/v1/users/<int:user_id>', methods=['PATCH'])
def update_user(user_id):
try:
data = request.get_json()
# 檢查是否有要更新的欄位
if not any(key in data for key in ['name', 'email', 'age']):
return make_response("error", 400, "No fields to update"), 400
# 驗證資料類型
if 'name' in data and (not isinstance(data['name'], str) or not data['name'].strip()):
return make_response("error", 400, "Name must be a non-empty string"), 400
if 'email' in data and (not isinstance(data['email'], str) or '@' not in data['email']):
return make_response("error", 400, "Invalid email format"), 400
if 'age' in data and (not isinstance(data['age'], int) or data['age'] < 0):
return make_response("error", 400, "Age must be a positive integer"), 400
conn = get_db_connection()
if not conn:
return make_response("error", 500, "Database connection error"), 500
cursor = conn.cursor(dictionary=True)
# 檢查使用者是否存在
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
cursor.close()
conn.close()
return make_response("error", 404, f"User with id {user_id} not found"), 404
# 建立更新查詢
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'email' in data:
update_fields.append("email = %s")
params.append(data['email'])
if 'age' in data:
update_fields.append("age = %s")
params.append(data['age'])
# 加入使用者 ID 到參數列表
params.append(user_id)
# 執行更新查詢
query = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s"
cursor.execute(query, params)
conn.commit()
# 取得更新後的使用者資料
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
updated_user = cursor.fetchone()
cursor.close()
conn.close()
return make_response("success", 200, "User updated successfully", {"user": updated_user}), 200
except Exception as e:
print(f"Error: {e}")
return make_response("error", 500, str(e)), 500
# 路由: DELETE /v1/users/<id>
@app.route('/v1/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
try:
conn = get_db_connection()
if not conn:
return make_response("error", 500, "Database connection error"), 500
cursor = conn.cursor()
# 檢查使用者是否存在
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if not user:
cursor.close()
conn.close()
return make_response("error", 404, f"User with id {user_id} not found"), 404
# 刪除使用者
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
conn.commit()
cursor.close()
conn.close()
return make_response("success", 204, "User deleted successfully"), 204
except Exception as e:
print(f"Error: {e}")
return make_response("error", 500, str(e)), 500
# 檢查檔案副檔名是否允許
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# 解析 Excel 檔案
def parse_excel(file_path):
try:
df = pd.read_excel(file_path)
return df.to_dict('records')
except Exception as e:
print(f"Excel 解析錯誤: {e}")
return None
# 解析 CSV 檔案
def parse_csv(file_path):
try:
df = pd.read_csv(file_path)
return df.to_dict('records')
except Exception as e:
print(f"CSV 解析錯誤: {e}")
return None
# 將使用者資料匯入資料庫
def import_users_to_db(users_data):
try:
conn = get_db_connection()
if not conn:
return False, "資料庫連接錯誤"
cursor = conn.cursor()
success_count = 0
error_count = 0
errors = []
for user in users_data:
try:
# 確保必要欄位存在
if 'name' not in user or 'email' not in user or 'age' not in user:
error_count += 1
errors.append(f"缺少必要欄位: {user}")
continue
# 驗證資料類型
if not isinstance(user['name'], str) or not user['name'].strip():
error_count += 1
errors.append(f"名稱必須是非空字串: {user}")
continue
if not isinstance(user['email'], str) or '@' not in user['email']:
error_count += 1
errors.append(f"無效的電子郵件格式: {user}")
continue
try:
age = int(user['age'])
if age < 0:
error_count += 1
errors.append(f"年齡必須是正整數: {user}")
continue
except (ValueError, TypeError):
error_count += 1
errors.append(f"年齡必須是整數: {user}")
continue
# 插入資料
query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
cursor.execute(query, (user['name'], user['email'], age))
success_count += 1
except Exception as e:
error_count += 1
errors.append(f"插入錯誤: {str(e)}, 資料: {user}")
conn.commit()
cursor.close()
conn.close()
return True, {
"success_count": success_count,
"error_count": error_count,
"errors": errors[:10] # 只返回前 10 個錯誤
}
except Exception as e:
print(f"匯入錯誤: {e}")
return False, str(e)
# 主頁路由
@app.route('/')
def index():
return send_from_directory('static', 'index.html')
# 靜態文件路由
@app.route('/<path:path>')
def static_files(path):
return send_from_directory('static', path)
# 檔案上傳和匯入路由
@app.route('/v1/users/import', methods=['POST'])
def import_users():
try:
# 檢查是否有檔案
if 'file' not in request.files:
return make_response("error", 400, "未找到檔案"), 400
file = request.files['file']
# 檢查檔案名稱
if file.filename == '':
return make_response("error", 400, "未選擇檔案"), 400
# 檢查檔案類型
if not allowed_file(file.filename):
return make_response("error", 400, f"不支援的檔案類型,僅支援 {', '.join(ALLOWED_EXTENSIONS)}"), 400
# 安全地保存檔案
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# 根據檔案類型解析
file_ext = filename.rsplit('.', 1)[1].lower()
if file_ext in ['xlsx', 'xls']:
users_data = parse_excel(file_path)
elif file_ext == 'csv':
users_data = parse_csv(file_path)
else:
# 刪除檔案
os.remove(file_path)
return make_response("error", 400, "不支援的檔案類型"), 400
# 檢查解析結果
if not users_data or len(users_data) == 0:
# 刪除檔案
os.remove(file_path)
return make_response("error", 400, "檔案解析失敗或沒有資料"), 400
# 匯入資料庫
success, result = import_users_to_db(users_data)
# 刪除檔案
os.remove(file_path)
if not success:
return make_response("error", 500, f"匯入失敗: {result}"), 500
return make_response("success", 200, "使用者資料匯入成功", result), 200
except Exception as e:
print(f"Error: {e}")
return make_response("error", 500, str(e)), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)