from flask import Flask, request, jsonify from flask_swagger_ui import get_swaggerui_blueprint from flask_cors import CORS import pymysql import jwt from datetime import datetime import hashlib import hmac import base64 from config import Config app = Flask(__name__) # 啟用 CORS 支援 CORS(app, resources={ r"/api/*": { "origins": "*", "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization"] } }) # 使用配置類別 DB_CONFIG = Config.get_db_config() JWT_SECRET = Config.JWT_SECRET # Swagger UI 配置 SWAGGER_URL = '/api/docs' # Swagger UI 的 URL API_URL = '/api/swagger.json' # API 文件的 JSON 位置 # 建立 Swagger UI Blueprint swaggerui_blueprint = get_swaggerui_blueprint( SWAGGER_URL, API_URL, config={ 'app_name': Config.APP_NAME, 'app_version': Config.APP_VERSION, 'description': Config.APP_DESCRIPTION } ) # 註冊 Swagger UI Blueprint app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL) @app.route('/api/swagger.json') def swagger(): """ Swagger API 文件 JSON """ return jsonify({ "swagger": "2.0", "info": { "title": Config.APP_NAME, "version": Config.APP_VERSION, "description": Config.APP_DESCRIPTION, "contact": { "name": "Panjit IT Department", "email": "it@panjit.com.tw" } }, "host": f"{Config.API_HOST}:{Config.API_PORT}", "basePath": "/api", "schemes": ["http", "https"], "consumes": ["application/json"], "produces": ["application/json"], "paths": { "/login": { "post": { "tags": ["Authentication"], "summary": "用戶登入驗證", "description": "使用帳號和密碼進行登入驗證", "parameters": [ { "name": "body", "in": "body", "required": True, "schema": { "type": "object", "properties": { "account": { "type": "string", "description": "用戶帳號(loginid 或 email)", "example": "92460" }, "password": { "type": "string", "description": "用戶密碼", "example": "Panjit92460" } }, "required": ["account", "password"] } } ], "responses": { "200": { "description": "登入成功", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"}, "user": { "type": "object", "properties": { "id": {"type": "integer"}, "loginid": {"type": "string"}, "username": {"type": "string"}, "email": {"type": "string"} } } } } }, "401": { "description": "登入失敗", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"}, "error_code": {"type": "string"} } } } } } }, "/change-password": { "post": { "tags": ["Password Management"], "summary": "修改用戶密碼", "description": "修改用戶的登入密碼", "parameters": [ { "name": "body", "in": "body", "required": True, "schema": { "type": "object", "properties": { "account": { "type": "string", "description": "用戶帳號(loginid 或 email)", "example": "92460" }, "old_password": { "type": "string", "description": "舊密碼", "example": "Panjit92460" }, "new_password": { "type": "string", "description": "新密碼(6-50字元,需包含字母和數字)", "example": "MyNewPassword123" } }, "required": ["account", "old_password", "new_password"] } } ], "responses": { "200": { "description": "密碼修改成功", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"} } } }, "400": { "description": "請求錯誤", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"}, "error_code": {"type": "string"} } } }, "401": { "description": "舊密碼不正確", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"}, "error_code": {"type": "string"} } } } } } }, "/health": { "get": { "tags": ["System"], "summary": "系統健康檢查", "description": "檢查 API 服務和資料庫連線狀態", "responses": { "200": { "description": "系統正常", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"}, "database": {"type": "string"}, "timestamp": {"type": "string"} } } } } } }, "/user/{account}": { "get": { "tags": ["User Management"], "summary": "查詢用戶資訊", "description": "根據帳號查詢用戶基本資訊", "parameters": [ { "name": "account", "in": "path", "required": True, "type": "string", "description": "用戶帳號(loginid 或 email)", "example": "92460" } ], "responses": { "200": { "description": "查詢成功", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "user": { "type": "object", "properties": { "id": {"type": "integer"}, "loginid": {"type": "string"}, "username": {"type": "string"}, "email": {"type": "string"} } } } } }, "404": { "description": "用戶不存在", "schema": { "type": "object", "properties": { "success": {"type": "boolean"}, "message": {"type": "string"}, "error_code": {"type": "string"} } } } } } } }, "definitions": { "Error": { "type": "object", "properties": { "success": {"type": "boolean", "example": False}, "message": {"type": "string"}, "error_code": {"type": "string"} } }, "User": { "type": "object", "properties": { "id": {"type": "integer", "example": 1}, "loginid": {"type": "string", "example": "92460"}, "username": {"type": "string", "example": "丁于軒"}, "email": {"type": "string", "example": "sharnading@panjit.com.tw"} } } } }) def verify_password(input_password, hashed_password): """ 驗證密碼是否正確 """ try: # 解碼 JWT token decoded = jwt.decode(hashed_password, JWT_SECRET, algorithms=['HS256']) # 檢查 token 類型 if decoded.get('type') != 'password_hash': return False # 比較密碼 stored_password = decoded.get('password', '') return input_password == stored_password except jwt.ExpiredSignatureError: print("JWT token 已過期") return False except jwt.InvalidTokenError: print("無效的 JWT token") return False except Exception as e: print(f"密碼驗證錯誤: {e}") return False def hash_password_with_jwt(password): """ 使用 JWT 將密碼進行雜湊處理 """ try: # 建立 JWT payload,包含密碼和時間戳 payload = { 'password': password, 'timestamp': datetime.now().isoformat(), 'type': 'password_hash' } # 使用 JWT 編碼密碼 token = jwt.encode(payload, JWT_SECRET, algorithm='HS256') return token except Exception as e: print(f"密碼雜湊錯誤: {e}") return None def validate_password_strength(password): """ 驗證密碼強度 """ if len(password) < 6: return False, "密碼長度至少需要 6 個字元" if len(password) > 50: return False, "密碼長度不能超過 50 個字元" # 檢查是否包含至少一個字母和一個數字 has_letter = any(c.isalpha() for c in password) has_digit = any(c.isdigit() for c in password) if not has_letter: return False, "密碼必須包含至少一個字母" if not has_digit: return False, "密碼必須包含至少一個數字" return True, "密碼強度符合要求" def update_user_password(loginid_or_email, old_password, new_password): """ 更新用戶密碼 """ try: connection = pymysql.connect(**DB_CONFIG) cursor = connection.cursor(pymysql.cursors.DictCursor) # 先查詢用戶資料 query = """ SELECT id, loginid, username, email, hashed_password FROM user_accounts WHERE loginid = %s OR email = %s """ cursor.execute(query, (loginid_or_email, loginid_or_email)) user = cursor.fetchone() if not user: return False, "用戶不存在" # 驗證舊密碼 if not verify_password(old_password, user['hashed_password']): return False, "舊密碼不正確" # 驗證新密碼強度 is_valid, message = validate_password_strength(new_password) if not is_valid: return False, message # 生成新密碼的雜湊 new_hashed_password = hash_password_with_jwt(new_password) if not new_hashed_password: return False, "密碼雜湊失敗" # 更新密碼 update_query = """ UPDATE user_accounts SET hashed_password = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s """ cursor.execute(update_query, (new_hashed_password, user['id'])) connection.commit() return True, "密碼修改成功" except Exception as e: print(f"更新密碼錯誤: {e}") return False, f"系統錯誤: {str(e)}" finally: if 'connection' in locals(): connection.close() def get_user_by_loginid_or_email(loginid_or_email): """ 根據 loginid 或 email 查詢用戶資料 """ try: connection = pymysql.connect(**DB_CONFIG) cursor = connection.cursor(pymysql.cursors.DictCursor) # 查詢用戶資料 query = """ SELECT id, loginid, username, email, hashed_password FROM user_accounts WHERE loginid = %s OR email = %s """ cursor.execute(query, (loginid_or_email, loginid_or_email)) user = cursor.fetchone() return user except Exception as e: print(f"資料庫查詢錯誤: {e}") return None finally: if 'connection' in locals(): connection.close() @app.route('/api/login', methods=['POST']) def login(): """ 登入 API 接收 JSON 格式: {"account": "帳號或email", "password": "密碼"} """ try: # 取得請求資料 data = request.get_json() if not data: return jsonify({ 'success': False, 'message': '請提供 JSON 格式的資料', 'error_code': 'INVALID_REQUEST' }), 400 account = data.get('account', '').strip() password = data.get('password', '').strip() # 檢查必要欄位 if not account or not password: return jsonify({ 'success': False, 'message': '請提供帳號和密碼', 'error_code': 'MISSING_FIELDS' }), 400 # 查詢用戶資料 user = get_user_by_loginid_or_email(account) if not user: return jsonify({ 'success': False, 'message': '無權限,請詢問資訊部同仁', 'error_code': 'USER_NOT_FOUND' }), 401 # 驗證密碼 if not verify_password(password, user['hashed_password']): return jsonify({ 'success': False, 'message': '無權限,請詢問資訊部同仁', 'error_code': 'INVALID_PASSWORD' }), 401 # 登入成功 return jsonify({ 'success': True, 'message': '登入成功', 'user': { 'id': user['id'], 'loginid': user['loginid'], 'username': user['username'], 'email': user['email'] } }), 200 except Exception as e: print(f"登入 API 錯誤: {e}") return jsonify({ 'success': False, 'message': '系統錯誤,請稍後再試', 'error_code': 'SYSTEM_ERROR' }), 500 @app.route('/api/health', methods=['GET']) def health_check(): """ 健康檢查 API """ try: # 測試資料庫連線 connection = pymysql.connect(**DB_CONFIG) cursor = connection.cursor() cursor.execute("SELECT 1") cursor.fetchone() connection.close() return jsonify({ 'success': True, 'message': 'API 服務正常', 'database': '連線正常', 'timestamp': datetime.now().isoformat() }), 200 except Exception as e: return jsonify({ 'success': False, 'message': 'API 服務異常', 'database': '連線失敗', 'error': str(e), 'timestamp': datetime.now().isoformat() }), 500 @app.route('/api/user/', methods=['GET']) def get_user_info(account): """ 查詢用戶資訊 API(不包含密碼) """ try: user = get_user_by_loginid_or_email(account) if not user: return jsonify({ 'success': False, 'message': '用戶不存在', 'error_code': 'USER_NOT_FOUND' }), 404 # 回傳用戶資訊(不包含密碼) return jsonify({ 'success': True, 'user': { 'id': user['id'], 'loginid': user['loginid'], 'username': user['username'], 'email': user['email'] } }), 200 except Exception as e: print(f"查詢用戶資訊錯誤: {e}") return jsonify({ 'success': False, 'message': '系統錯誤,請稍後再試', 'error_code': 'SYSTEM_ERROR' }), 500 @app.route('/api/change-password', methods=['POST']) def change_password(): """ 修改密碼 API 接收 JSON 格式: {"account": "帳號或email", "old_password": "舊密碼", "new_password": "新密碼"} """ try: # 取得請求資料 data = request.get_json() if not data: return jsonify({ 'success': False, 'message': '請提供 JSON 格式的資料', 'error_code': 'INVALID_REQUEST' }), 400 account = data.get('account', '').strip() old_password = data.get('old_password', '').strip() new_password = data.get('new_password', '').strip() # 檢查必要欄位 if not account or not old_password or not new_password: return jsonify({ 'success': False, 'message': '請提供帳號、舊密碼和新密碼', 'error_code': 'MISSING_FIELDS' }), 400 # 檢查新密碼不能與舊密碼相同 if old_password == new_password: return jsonify({ 'success': False, 'message': '新密碼不能與舊密碼相同', 'error_code': 'SAME_PASSWORD' }), 400 # 更新密碼 success, message = update_user_password(account, old_password, new_password) if success: return jsonify({ 'success': True, 'message': message }), 200 else: # 根據錯誤類型返回不同的狀態碼 if "用戶不存在" in message: return jsonify({ 'success': False, 'message': message, 'error_code': 'USER_NOT_FOUND' }), 404 elif "舊密碼不正確" in message: return jsonify({ 'success': False, 'message': message, 'error_code': 'INVALID_OLD_PASSWORD' }), 401 elif "密碼長度" in message or "密碼必須" in message: return jsonify({ 'success': False, 'message': message, 'error_code': 'WEAK_PASSWORD' }), 400 else: return jsonify({ 'success': False, 'message': message, 'error_code': 'UPDATE_FAILED' }), 500 except Exception as e: print(f"修改密碼 API 錯誤: {e}") return jsonify({ 'success': False, 'message': '系統錯誤,請稍後再試', 'error_code': 'SYSTEM_ERROR' }), 500 @app.errorhandler(404) def not_found(error): return jsonify({ 'success': False, 'message': 'API 端點不存在', 'error_code': 'NOT_FOUND' }), 404 @app.errorhandler(405) def method_not_allowed(error): return jsonify({ 'success': False, 'message': '不支援的 HTTP 方法', 'error_code': 'METHOD_NOT_ALLOWED' }), 405 if __name__ == '__main__': print(f"啟動 {Config.APP_NAME} v{Config.APP_VERSION}...") print("API 端點:") print(" POST /api/login - 登入驗證") print(" POST /api/change-password - 修改密碼") print(" GET /api/health - 健康檢查") print(" GET /api/user/ - 查詢用戶資訊") print(f" GET {SWAGGER_URL} - Swagger API 文件") print(f"\n服務地址: http://{Config.API_HOST}:{Config.API_PORT}") print(f"Swagger 文件: http://{Config.API_HOST}:{Config.API_PORT}{SWAGGER_URL}") print("\n範例請求:") print("登入驗證:") print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/login \\') print(' -H "Content-Type: application/json" \\') print(' -d \'{"account": "92460", "password": "Panjit92460"}\'') print("\n修改密碼:") print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/change-password \\') print(' -H "Content-Type: application/json" \\') print(' -d \'{"account": "92460", "old_password": "Panjit92460", "new_password": "MyNewPassword123"}\'') print("\n服務啟動中...") app.run(host=Config.API_HOST, port=Config.API_PORT, debug=Config.API_DEBUG)