464 lines
15 KiB
Python
464 lines
15 KiB
Python
from flask import Flask, request, jsonify, send_from_directory, render_template
|
|
from flask_cors import CORS
|
|
import mysql.connector
|
|
from mysql.connector import Error
|
|
import os
|
|
import pandas as pd
|
|
import io
|
|
import csv
|
|
from werkzeug.utils import secure_filename
|
|
|
|
app = Flask(__name__, static_folder='static')
|
|
CORS(app)
|
|
|
|
# 檔案上傳設定
|
|
UPLOAD_FOLDER = 'uploads'
|
|
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls'}
|
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 限制上傳檔案大小為 16MB
|
|
|
|
# 確保上傳資料夾存在
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
# 資料庫連接設定
|
|
db_config = {
|
|
'host': 'mysql.theaken.com',
|
|
'port': 33306,
|
|
'user': 'A008',
|
|
'password': '74knygxwzEms',
|
|
'database': 'db_A008'
|
|
}
|
|
|
|
# 建立資料庫連接
|
|
def get_db_connection():
|
|
try:
|
|
conn = mysql.connector.connect(**db_config)
|
|
return conn
|
|
except Error as e:
|
|
print(f"資料庫連接錯誤: {e}")
|
|
return None
|
|
|
|
# 統一回應格式
|
|
def make_response(status, code, message, data=None):
|
|
response = {
|
|
"status": status,
|
|
"code": code,
|
|
"message": message
|
|
}
|
|
if data is not None:
|
|
response["data"] = data
|
|
return jsonify(response)
|
|
|
|
# 錯誤處理
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return make_response("error", 404, "Not Found"), 404
|
|
|
|
@app.errorhandler(400)
|
|
def bad_request(error):
|
|
return make_response("error", 400, "Bad Request"), 400
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(error):
|
|
return make_response("error", 500, "Internal Server Error"), 500
|
|
|
|
# 路由: GET /v1/users
|
|
@app.route('/v1/users', methods=['GET'])
|
|
def get_users():
|
|
try:
|
|
# 取得查詢參數
|
|
min_age = request.args.get('min_age', type=int)
|
|
max_age = request.args.get('max_age', type=int)
|
|
page = request.args.get('page', 1, type=int)
|
|
limit = request.args.get('limit', 10, type=int)
|
|
|
|
# 計算偏移量
|
|
offset = (page - 1) * limit
|
|
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
return make_response("error", 500, "Database connection error"), 500
|
|
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# 建立基本查詢
|
|
query = "SELECT * FROM users"
|
|
count_query = "SELECT COUNT(*) as total FROM users"
|
|
params = []
|
|
where_clauses = []
|
|
|
|
# 加入年齡過濾條件
|
|
if min_age is not None:
|
|
where_clauses.append("age >= %s")
|
|
params.append(min_age)
|
|
|
|
if max_age is not None:
|
|
where_clauses.append("age <= %s")
|
|
params.append(max_age)
|
|
|
|
# 組合 WHERE 子句
|
|
if where_clauses:
|
|
query += " WHERE " + " AND ".join(where_clauses)
|
|
count_query += " WHERE " + " AND ".join(where_clauses)
|
|
|
|
# 加入分頁
|
|
query += " LIMIT %s OFFSET %s"
|
|
params.extend([limit, offset])
|
|
|
|
# 執行查詢
|
|
cursor.execute(query, params)
|
|
users = cursor.fetchall()
|
|
|
|
# 取得總筆數
|
|
cursor.execute(count_query, params[:-2] if params else [])
|
|
total = cursor.fetchone()['total']
|
|
|
|
# 計算總頁數
|
|
total_pages = (total + limit - 1) // limit
|
|
|
|
# 建立 meta 資訊
|
|
meta = {
|
|
"total": total,
|
|
"page": page,
|
|
"limit": limit,
|
|
"total_pages": total_pages
|
|
}
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return make_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta}), 200
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return make_response("error", 500, str(e)), 500
|
|
|
|
# 路由: GET /v1/users/<id>
|
|
@app.route('/v1/users/<int:user_id>', methods=['GET'])
|
|
def get_user(user_id):
|
|
try:
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
return make_response("error", 500, "Database connection error"), 500
|
|
|
|
cursor = conn.cursor(dictionary=True)
|
|
query = "SELECT * FROM users WHERE id = %s"
|
|
cursor.execute(query, (user_id,))
|
|
user = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
if not user:
|
|
return make_response("error", 404, f"User with id {user_id} not found"), 404
|
|
|
|
return make_response("success", 200, "User retrieved successfully", {"user": user}), 200
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return make_response("error", 500, str(e)), 500
|
|
|
|
# 路由: POST /v1/users
|
|
@app.route('/v1/users', methods=['POST'])
|
|
def create_user():
|
|
try:
|
|
data = request.get_json()
|
|
|
|
# 驗證必要欄位
|
|
if not all(key in data for key in ['name', 'email', 'age']):
|
|
return make_response("error", 400, "Missing required fields: name, email, age"), 400
|
|
|
|
# 驗證資料類型
|
|
if not isinstance(data['name'], str) or not data['name'].strip():
|
|
return make_response("error", 400, "Name must be a non-empty string"), 400
|
|
|
|
if not isinstance(data['email'], str) or '@' not in data['email']:
|
|
return make_response("error", 400, "Invalid email format"), 400
|
|
|
|
if not isinstance(data['age'], int) or data['age'] < 0:
|
|
return make_response("error", 400, "Age must be a positive integer"), 400
|
|
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
return make_response("error", 500, "Database connection error"), 500
|
|
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# 使用參數化查詢避免 SQL 注入
|
|
query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
|
|
cursor.execute(query, (data['name'], data['email'], data['age']))
|
|
|
|
# 取得新增的使用者 ID
|
|
user_id = cursor.lastrowid
|
|
conn.commit()
|
|
|
|
# 取得新增的使用者資料
|
|
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
|
new_user = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return make_response("success", 201, "User created successfully", {"user": new_user}), 201
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return make_response("error", 500, str(e)), 500
|
|
|
|
# 路由: PATCH /v1/users/<id>
|
|
@app.route('/v1/users/<int:user_id>', methods=['PATCH'])
|
|
def update_user(user_id):
|
|
try:
|
|
data = request.get_json()
|
|
|
|
# 檢查是否有要更新的欄位
|
|
if not any(key in data for key in ['name', 'email', 'age']):
|
|
return make_response("error", 400, "No fields to update"), 400
|
|
|
|
# 驗證資料類型
|
|
if 'name' in data and (not isinstance(data['name'], str) or not data['name'].strip()):
|
|
return make_response("error", 400, "Name must be a non-empty string"), 400
|
|
|
|
if 'email' in data and (not isinstance(data['email'], str) or '@' not in data['email']):
|
|
return make_response("error", 400, "Invalid email format"), 400
|
|
|
|
if 'age' in data and (not isinstance(data['age'], int) or data['age'] < 0):
|
|
return make_response("error", 400, "Age must be a positive integer"), 400
|
|
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
return make_response("error", 500, "Database connection error"), 500
|
|
|
|
cursor = conn.cursor(dictionary=True)
|
|
|
|
# 檢查使用者是否存在
|
|
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
|
user = cursor.fetchone()
|
|
|
|
if not user:
|
|
cursor.close()
|
|
conn.close()
|
|
return make_response("error", 404, f"User with id {user_id} not found"), 404
|
|
|
|
# 建立更新查詢
|
|
update_fields = []
|
|
params = []
|
|
|
|
if 'name' in data:
|
|
update_fields.append("name = %s")
|
|
params.append(data['name'])
|
|
|
|
if 'email' in data:
|
|
update_fields.append("email = %s")
|
|
params.append(data['email'])
|
|
|
|
if 'age' in data:
|
|
update_fields.append("age = %s")
|
|
params.append(data['age'])
|
|
|
|
# 加入使用者 ID 到參數列表
|
|
params.append(user_id)
|
|
|
|
# 執行更新查詢
|
|
query = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s"
|
|
cursor.execute(query, params)
|
|
conn.commit()
|
|
|
|
# 取得更新後的使用者資料
|
|
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
|
updated_user = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return make_response("success", 200, "User updated successfully", {"user": updated_user}), 200
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return make_response("error", 500, str(e)), 500
|
|
|
|
# 路由: DELETE /v1/users/<id>
|
|
@app.route('/v1/users/<int:user_id>', methods=['DELETE'])
|
|
def delete_user(user_id):
|
|
try:
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
return make_response("error", 500, "Database connection error"), 500
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# 檢查使用者是否存在
|
|
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
|
user = cursor.fetchone()
|
|
|
|
if not user:
|
|
cursor.close()
|
|
conn.close()
|
|
return make_response("error", 404, f"User with id {user_id} not found"), 404
|
|
|
|
# 刪除使用者
|
|
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
|
|
conn.commit()
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return make_response("success", 204, "User deleted successfully"), 204
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return make_response("error", 500, str(e)), 500
|
|
|
|
# 檢查檔案副檔名是否允許
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
# 解析 Excel 檔案
|
|
def parse_excel(file_path):
|
|
try:
|
|
df = pd.read_excel(file_path)
|
|
return df.to_dict('records')
|
|
except Exception as e:
|
|
print(f"Excel 解析錯誤: {e}")
|
|
return None
|
|
|
|
# 解析 CSV 檔案
|
|
def parse_csv(file_path):
|
|
try:
|
|
df = pd.read_csv(file_path)
|
|
return df.to_dict('records')
|
|
except Exception as e:
|
|
print(f"CSV 解析錯誤: {e}")
|
|
return None
|
|
|
|
# 將使用者資料匯入資料庫
|
|
def import_users_to_db(users_data):
|
|
try:
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
return False, "資料庫連接錯誤"
|
|
|
|
cursor = conn.cursor()
|
|
success_count = 0
|
|
error_count = 0
|
|
errors = []
|
|
|
|
for user in users_data:
|
|
try:
|
|
# 確保必要欄位存在
|
|
if 'name' not in user or 'email' not in user or 'age' not in user:
|
|
error_count += 1
|
|
errors.append(f"缺少必要欄位: {user}")
|
|
continue
|
|
|
|
# 驗證資料類型
|
|
if not isinstance(user['name'], str) or not user['name'].strip():
|
|
error_count += 1
|
|
errors.append(f"名稱必須是非空字串: {user}")
|
|
continue
|
|
|
|
if not isinstance(user['email'], str) or '@' not in user['email']:
|
|
error_count += 1
|
|
errors.append(f"無效的電子郵件格式: {user}")
|
|
continue
|
|
|
|
try:
|
|
age = int(user['age'])
|
|
if age < 0:
|
|
error_count += 1
|
|
errors.append(f"年齡必須是正整數: {user}")
|
|
continue
|
|
except (ValueError, TypeError):
|
|
error_count += 1
|
|
errors.append(f"年齡必須是整數: {user}")
|
|
continue
|
|
|
|
# 插入資料
|
|
query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)"
|
|
cursor.execute(query, (user['name'], user['email'], age))
|
|
success_count += 1
|
|
except Exception as e:
|
|
error_count += 1
|
|
errors.append(f"插入錯誤: {str(e)}, 資料: {user}")
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return True, {
|
|
"success_count": success_count,
|
|
"error_count": error_count,
|
|
"errors": errors[:10] # 只返回前 10 個錯誤
|
|
}
|
|
except Exception as e:
|
|
print(f"匯入錯誤: {e}")
|
|
return False, str(e)
|
|
|
|
# 主頁路由
|
|
@app.route('/')
|
|
def index():
|
|
return send_from_directory('static', 'index.html')
|
|
|
|
# 靜態文件路由
|
|
@app.route('/<path:path>')
|
|
def static_files(path):
|
|
return send_from_directory('static', path)
|
|
|
|
# 檔案上傳和匯入路由
|
|
@app.route('/v1/users/import', methods=['POST'])
|
|
def import_users():
|
|
try:
|
|
# 檢查是否有檔案
|
|
if 'file' not in request.files:
|
|
return make_response("error", 400, "未找到檔案"), 400
|
|
|
|
file = request.files['file']
|
|
|
|
# 檢查檔案名稱
|
|
if file.filename == '':
|
|
return make_response("error", 400, "未選擇檔案"), 400
|
|
|
|
# 檢查檔案類型
|
|
if not allowed_file(file.filename):
|
|
return make_response("error", 400, f"不支援的檔案類型,僅支援 {', '.join(ALLOWED_EXTENSIONS)}"), 400
|
|
|
|
# 安全地保存檔案
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(file_path)
|
|
|
|
# 根據檔案類型解析
|
|
file_ext = filename.rsplit('.', 1)[1].lower()
|
|
if file_ext in ['xlsx', 'xls']:
|
|
users_data = parse_excel(file_path)
|
|
elif file_ext == 'csv':
|
|
users_data = parse_csv(file_path)
|
|
else:
|
|
# 刪除檔案
|
|
os.remove(file_path)
|
|
return make_response("error", 400, "不支援的檔案類型"), 400
|
|
|
|
# 檢查解析結果
|
|
if not users_data or len(users_data) == 0:
|
|
# 刪除檔案
|
|
os.remove(file_path)
|
|
return make_response("error", 400, "檔案解析失敗或沒有資料"), 400
|
|
|
|
# 匯入資料庫
|
|
success, result = import_users_to_db(users_data)
|
|
|
|
# 刪除檔案
|
|
os.remove(file_path)
|
|
|
|
if not success:
|
|
return make_response("error", 500, f"匯入失敗: {result}"), 500
|
|
|
|
return make_response("success", 200, "使用者資料匯入成功", result), 200
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return make_response("error", 500, str(e)), 500
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |