from flask import Flask, request, jsonify, send_from_directory, render_template from flask_cors import CORS import mysql.connector from mysql.connector import Error import os import pandas as pd import io import csv from werkzeug.utils import secure_filename app = Flask(__name__, static_folder='static') CORS(app) # 檔案上傳設定 UPLOAD_FOLDER = 'uploads' ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 限制上傳檔案大小為 16MB # 確保上傳資料夾存在 os.makedirs(UPLOAD_FOLDER, exist_ok=True) # 資料庫連接設定 db_config = { 'host': 'mysql.theaken.com', 'port': 33306, 'user': 'A008', 'password': '74knygxwzEms', 'database': 'db_A008' } # 建立資料庫連接 def get_db_connection(): try: conn = mysql.connector.connect(**db_config) return conn except Error as e: print(f"資料庫連接錯誤: {e}") return None # 統一回應格式 def make_response(status, code, message, data=None): response = { "status": status, "code": code, "message": message } if data is not None: response["data"] = data return jsonify(response) # 錯誤處理 @app.errorhandler(404) def not_found(error): return make_response("error", 404, "Not Found"), 404 @app.errorhandler(400) def bad_request(error): return make_response("error", 400, "Bad Request"), 400 @app.errorhandler(500) def server_error(error): return make_response("error", 500, "Internal Server Error"), 500 # 路由: GET /v1/users @app.route('/v1/users', methods=['GET']) def get_users(): try: # 取得查詢參數 min_age = request.args.get('min_age', type=int) max_age = request.args.get('max_age', type=int) page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 10, type=int) # 計算偏移量 offset = (page - 1) * limit conn = get_db_connection() if not conn: return make_response("error", 500, "Database connection error"), 500 cursor = conn.cursor(dictionary=True) # 建立基本查詢 query = "SELECT * FROM users" count_query = "SELECT COUNT(*) as total FROM users" params = [] where_clauses = [] # 加入年齡過濾條件 if min_age is not None: where_clauses.append("age >= %s") params.append(min_age) if max_age is not None: where_clauses.append("age <= %s") params.append(max_age) # 組合 WHERE 子句 if where_clauses: query += " WHERE " + " AND ".join(where_clauses) count_query += " WHERE " + " AND ".join(where_clauses) # 加入分頁 query += " LIMIT %s OFFSET %s" params.extend([limit, offset]) # 執行查詢 cursor.execute(query, params) users = cursor.fetchall() # 取得總筆數 cursor.execute(count_query, params[:-2] if params else []) total = cursor.fetchone()['total'] # 計算總頁數 total_pages = (total + limit - 1) // limit # 建立 meta 資訊 meta = { "total": total, "page": page, "limit": limit, "total_pages": total_pages } cursor.close() conn.close() return make_response("success", 200, "Users retrieved successfully", {"users": users, "meta": meta}), 200 except Exception as e: print(f"Error: {e}") return make_response("error", 500, str(e)), 500 # 路由: GET /v1/users/ @app.route('/v1/users/', methods=['GET']) def get_user(user_id): try: conn = get_db_connection() if not conn: return make_response("error", 500, "Database connection error"), 500 cursor = conn.cursor(dictionary=True) query = "SELECT * FROM users WHERE id = %s" cursor.execute(query, (user_id,)) user = cursor.fetchone() cursor.close() conn.close() if not user: return make_response("error", 404, f"User with id {user_id} not found"), 404 return make_response("success", 200, "User retrieved successfully", {"user": user}), 200 except Exception as e: print(f"Error: {e}") return make_response("error", 500, str(e)), 500 # 路由: POST /v1/users @app.route('/v1/users', methods=['POST']) def create_user(): try: data = request.get_json() # 驗證必要欄位 if not all(key in data for key in ['name', 'email', 'age']): return make_response("error", 400, "Missing required fields: name, email, age"), 400 # 驗證資料類型 if not isinstance(data['name'], str) or not data['name'].strip(): return make_response("error", 400, "Name must be a non-empty string"), 400 if not isinstance(data['email'], str) or '@' not in data['email']: return make_response("error", 400, "Invalid email format"), 400 if not isinstance(data['age'], int) or data['age'] < 0: return make_response("error", 400, "Age must be a positive integer"), 400 conn = get_db_connection() if not conn: return make_response("error", 500, "Database connection error"), 500 cursor = conn.cursor(dictionary=True) # 使用參數化查詢避免 SQL 注入 query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)" cursor.execute(query, (data['name'], data['email'], data['age'])) # 取得新增的使用者 ID user_id = cursor.lastrowid conn.commit() # 取得新增的使用者資料 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) new_user = cursor.fetchone() cursor.close() conn.close() return make_response("success", 201, "User created successfully", {"user": new_user}), 201 except Exception as e: print(f"Error: {e}") return make_response("error", 500, str(e)), 500 # 路由: PATCH /v1/users/ @app.route('/v1/users/', methods=['PATCH']) def update_user(user_id): try: data = request.get_json() # 檢查是否有要更新的欄位 if not any(key in data for key in ['name', 'email', 'age']): return make_response("error", 400, "No fields to update"), 400 # 驗證資料類型 if 'name' in data and (not isinstance(data['name'], str) or not data['name'].strip()): return make_response("error", 400, "Name must be a non-empty string"), 400 if 'email' in data and (not isinstance(data['email'], str) or '@' not in data['email']): return make_response("error", 400, "Invalid email format"), 400 if 'age' in data and (not isinstance(data['age'], int) or data['age'] < 0): return make_response("error", 400, "Age must be a positive integer"), 400 conn = get_db_connection() if not conn: return make_response("error", 500, "Database connection error"), 500 cursor = conn.cursor(dictionary=True) # 檢查使用者是否存在 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() if not user: cursor.close() conn.close() return make_response("error", 404, f"User with id {user_id} not found"), 404 # 建立更新查詢 update_fields = [] params = [] if 'name' in data: update_fields.append("name = %s") params.append(data['name']) if 'email' in data: update_fields.append("email = %s") params.append(data['email']) if 'age' in data: update_fields.append("age = %s") params.append(data['age']) # 加入使用者 ID 到參數列表 params.append(user_id) # 執行更新查詢 query = f"UPDATE users SET {', '.join(update_fields)} WHERE id = %s" cursor.execute(query, params) conn.commit() # 取得更新後的使用者資料 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) updated_user = cursor.fetchone() cursor.close() conn.close() return make_response("success", 200, "User updated successfully", {"user": updated_user}), 200 except Exception as e: print(f"Error: {e}") return make_response("error", 500, str(e)), 500 # 路由: DELETE /v1/users/ @app.route('/v1/users/', methods=['DELETE']) def delete_user(user_id): try: conn = get_db_connection() if not conn: return make_response("error", 500, "Database connection error"), 500 cursor = conn.cursor() # 檢查使用者是否存在 cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() if not user: cursor.close() conn.close() return make_response("error", 404, f"User with id {user_id} not found"), 404 # 刪除使用者 cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) conn.commit() cursor.close() conn.close() return make_response("success", 204, "User deleted successfully"), 204 except Exception as e: print(f"Error: {e}") return make_response("error", 500, str(e)), 500 # 檢查檔案副檔名是否允許 def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # 解析 Excel 檔案 def parse_excel(file_path): try: df = pd.read_excel(file_path) return df.to_dict('records') except Exception as e: print(f"Excel 解析錯誤: {e}") return None # 解析 CSV 檔案 def parse_csv(file_path): try: df = pd.read_csv(file_path) return df.to_dict('records') except Exception as e: print(f"CSV 解析錯誤: {e}") return None # 將使用者資料匯入資料庫 def import_users_to_db(users_data): try: conn = get_db_connection() if not conn: return False, "資料庫連接錯誤" cursor = conn.cursor() success_count = 0 error_count = 0 errors = [] for user in users_data: try: # 確保必要欄位存在 if 'name' not in user or 'email' not in user or 'age' not in user: error_count += 1 errors.append(f"缺少必要欄位: {user}") continue # 驗證資料類型 if not isinstance(user['name'], str) or not user['name'].strip(): error_count += 1 errors.append(f"名稱必須是非空字串: {user}") continue if not isinstance(user['email'], str) or '@' not in user['email']: error_count += 1 errors.append(f"無效的電子郵件格式: {user}") continue try: age = int(user['age']) if age < 0: error_count += 1 errors.append(f"年齡必須是正整數: {user}") continue except (ValueError, TypeError): error_count += 1 errors.append(f"年齡必須是整數: {user}") continue # 插入資料 query = "INSERT INTO users (name, email, age) VALUES (%s, %s, %s)" cursor.execute(query, (user['name'], user['email'], age)) success_count += 1 except Exception as e: error_count += 1 errors.append(f"插入錯誤: {str(e)}, 資料: {user}") conn.commit() cursor.close() conn.close() return True, { "success_count": success_count, "error_count": error_count, "errors": errors[:10] # 只返回前 10 個錯誤 } except Exception as e: print(f"匯入錯誤: {e}") return False, str(e) # 主頁路由 @app.route('/') def index(): return send_from_directory('static', 'index.html') # 靜態文件路由 @app.route('/') def static_files(path): return send_from_directory('static', path) # 檔案上傳和匯入路由 @app.route('/v1/users/import', methods=['POST']) def import_users(): try: # 檢查是否有檔案 if 'file' not in request.files: return make_response("error", 400, "未找到檔案"), 400 file = request.files['file'] # 檢查檔案名稱 if file.filename == '': return make_response("error", 400, "未選擇檔案"), 400 # 檢查檔案類型 if not allowed_file(file.filename): return make_response("error", 400, f"不支援的檔案類型,僅支援 {', '.join(ALLOWED_EXTENSIONS)}"), 400 # 安全地保存檔案 filename = secure_filename(file.filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) # 根據檔案類型解析 file_ext = filename.rsplit('.', 1)[1].lower() if file_ext in ['xlsx', 'xls']: users_data = parse_excel(file_path) elif file_ext == 'csv': users_data = parse_csv(file_path) else: # 刪除檔案 os.remove(file_path) return make_response("error", 400, "不支援的檔案類型"), 400 # 檢查解析結果 if not users_data or len(users_data) == 0: # 刪除檔案 os.remove(file_path) return make_response("error", 400, "檔案解析失敗或沒有資料"), 400 # 匯入資料庫 success, result = import_users_to_db(users_data) # 刪除檔案 os.remove(file_path) if not success: return make_response("error", 500, f"匯入失敗: {result}"), 500 return make_response("success", 200, "使用者資料匯入成功", result), 200 except Exception as e: print(f"Error: {e}") return make_response("error", 500, str(e)), 500 if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)