684 lines
24 KiB
Python
684 lines
24 KiB
Python
from flask import Flask, request, jsonify
|
||
from flask_swagger_ui import get_swaggerui_blueprint
|
||
from flask_cors import CORS
|
||
import pymysql
|
||
import jwt
|
||
from datetime import datetime
|
||
import hashlib
|
||
import hmac
|
||
import base64
|
||
from config import Config
|
||
|
||
app = Flask(__name__)
|
||
|
||
# 啟用 CORS 支援
|
||
CORS(app, resources={
|
||
r"/api/*": {
|
||
"origins": "*",
|
||
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
||
"allow_headers": ["Content-Type", "Authorization"]
|
||
}
|
||
})
|
||
|
||
# 使用配置類別
|
||
DB_CONFIG = Config.get_db_config()
|
||
JWT_SECRET = Config.JWT_SECRET
|
||
|
||
# Swagger UI 配置
|
||
SWAGGER_URL = '/api/docs' # Swagger UI 的 URL
|
||
API_URL = '/api/swagger.json' # API 文件的 JSON 位置
|
||
|
||
# 建立 Swagger UI Blueprint
|
||
swaggerui_blueprint = get_swaggerui_blueprint(
|
||
SWAGGER_URL,
|
||
API_URL,
|
||
config={
|
||
'app_name': Config.APP_NAME,
|
||
'app_version': Config.APP_VERSION,
|
||
'description': Config.APP_DESCRIPTION
|
||
}
|
||
)
|
||
|
||
# 註冊 Swagger UI Blueprint
|
||
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
|
||
|
||
@app.route('/api/swagger.json')
|
||
def swagger():
|
||
"""
|
||
Swagger API 文件 JSON
|
||
"""
|
||
return jsonify({
|
||
"swagger": "2.0",
|
||
"info": {
|
||
"title": Config.APP_NAME,
|
||
"version": Config.APP_VERSION,
|
||
"description": Config.APP_DESCRIPTION,
|
||
"contact": {
|
||
"name": "Panjit IT Department",
|
||
"email": "it@panjit.com.tw"
|
||
}
|
||
},
|
||
"host": f"{Config.API_HOST}:{Config.API_PORT}",
|
||
"basePath": "/api",
|
||
"schemes": ["http", "https"],
|
||
"consumes": ["application/json"],
|
||
"produces": ["application/json"],
|
||
"paths": {
|
||
"/login": {
|
||
"post": {
|
||
"tags": ["Authentication"],
|
||
"summary": "用戶登入驗證",
|
||
"description": "使用帳號和密碼進行登入驗證",
|
||
"parameters": [
|
||
{
|
||
"name": "body",
|
||
"in": "body",
|
||
"required": True,
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"account": {
|
||
"type": "string",
|
||
"description": "用戶帳號(loginid 或 email)",
|
||
"example": "92460"
|
||
},
|
||
"password": {
|
||
"type": "string",
|
||
"description": "用戶密碼",
|
||
"example": "Panjit92460"
|
||
}
|
||
},
|
||
"required": ["account", "password"]
|
||
}
|
||
}
|
||
],
|
||
"responses": {
|
||
"200": {
|
||
"description": "登入成功",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"},
|
||
"user": {
|
||
"type": "object",
|
||
"properties": {
|
||
"id": {"type": "integer"},
|
||
"loginid": {"type": "string"},
|
||
"username": {"type": "string"},
|
||
"email": {"type": "string"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
},
|
||
"401": {
|
||
"description": "登入失敗",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"},
|
||
"error_code": {"type": "string"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
},
|
||
"/change-password": {
|
||
"post": {
|
||
"tags": ["Password Management"],
|
||
"summary": "修改用戶密碼",
|
||
"description": "修改用戶的登入密碼",
|
||
"parameters": [
|
||
{
|
||
"name": "body",
|
||
"in": "body",
|
||
"required": True,
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"account": {
|
||
"type": "string",
|
||
"description": "用戶帳號(loginid 或 email)",
|
||
"example": "92460"
|
||
},
|
||
"old_password": {
|
||
"type": "string",
|
||
"description": "舊密碼",
|
||
"example": "Panjit92460"
|
||
},
|
||
"new_password": {
|
||
"type": "string",
|
||
"description": "新密碼(6-50字元,需包含字母和數字)",
|
||
"example": "MyNewPassword123"
|
||
}
|
||
},
|
||
"required": ["account", "old_password", "new_password"]
|
||
}
|
||
}
|
||
],
|
||
"responses": {
|
||
"200": {
|
||
"description": "密碼修改成功",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"}
|
||
}
|
||
}
|
||
},
|
||
"400": {
|
||
"description": "請求錯誤",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"},
|
||
"error_code": {"type": "string"}
|
||
}
|
||
}
|
||
},
|
||
"401": {
|
||
"description": "舊密碼不正確",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"},
|
||
"error_code": {"type": "string"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
},
|
||
"/health": {
|
||
"get": {
|
||
"tags": ["System"],
|
||
"summary": "系統健康檢查",
|
||
"description": "檢查 API 服務和資料庫連線狀態",
|
||
"responses": {
|
||
"200": {
|
||
"description": "系統正常",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"},
|
||
"database": {"type": "string"},
|
||
"timestamp": {"type": "string"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
},
|
||
"/user/{account}": {
|
||
"get": {
|
||
"tags": ["User Management"],
|
||
"summary": "查詢用戶資訊",
|
||
"description": "根據帳號查詢用戶基本資訊",
|
||
"parameters": [
|
||
{
|
||
"name": "account",
|
||
"in": "path",
|
||
"required": True,
|
||
"type": "string",
|
||
"description": "用戶帳號(loginid 或 email)",
|
||
"example": "92460"
|
||
}
|
||
],
|
||
"responses": {
|
||
"200": {
|
||
"description": "查詢成功",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"user": {
|
||
"type": "object",
|
||
"properties": {
|
||
"id": {"type": "integer"},
|
||
"loginid": {"type": "string"},
|
||
"username": {"type": "string"},
|
||
"email": {"type": "string"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
},
|
||
"404": {
|
||
"description": "用戶不存在",
|
||
"schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean"},
|
||
"message": {"type": "string"},
|
||
"error_code": {"type": "string"}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
},
|
||
"definitions": {
|
||
"Error": {
|
||
"type": "object",
|
||
"properties": {
|
||
"success": {"type": "boolean", "example": False},
|
||
"message": {"type": "string"},
|
||
"error_code": {"type": "string"}
|
||
}
|
||
},
|
||
"User": {
|
||
"type": "object",
|
||
"properties": {
|
||
"id": {"type": "integer", "example": 1},
|
||
"loginid": {"type": "string", "example": "92460"},
|
||
"username": {"type": "string", "example": "丁于軒"},
|
||
"email": {"type": "string", "example": "sharnading@panjit.com.tw"}
|
||
}
|
||
}
|
||
}
|
||
})
|
||
|
||
def verify_password(input_password, hashed_password):
|
||
"""
|
||
驗證密碼是否正確
|
||
"""
|
||
try:
|
||
# 解碼 JWT token
|
||
decoded = jwt.decode(hashed_password, JWT_SECRET, algorithms=['HS256'])
|
||
|
||
# 檢查 token 類型
|
||
if decoded.get('type') != 'password_hash':
|
||
return False
|
||
|
||
# 比較密碼
|
||
stored_password = decoded.get('password', '')
|
||
return input_password == stored_password
|
||
|
||
except jwt.ExpiredSignatureError:
|
||
print("JWT token 已過期")
|
||
return False
|
||
except jwt.InvalidTokenError:
|
||
print("無效的 JWT token")
|
||
return False
|
||
except Exception as e:
|
||
print(f"密碼驗證錯誤: {e}")
|
||
return False
|
||
|
||
def hash_password_with_jwt(password):
|
||
"""
|
||
使用 JWT 將密碼進行雜湊處理
|
||
"""
|
||
try:
|
||
# 建立 JWT payload,包含密碼和時間戳
|
||
payload = {
|
||
'password': password,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'type': 'password_hash'
|
||
}
|
||
|
||
# 使用 JWT 編碼密碼
|
||
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
|
||
return token
|
||
except Exception as e:
|
||
print(f"密碼雜湊錯誤: {e}")
|
||
return None
|
||
|
||
def validate_password_strength(password):
|
||
"""
|
||
驗證密碼強度
|
||
"""
|
||
if len(password) < 6:
|
||
return False, "密碼長度至少需要 6 個字元"
|
||
|
||
if len(password) > 50:
|
||
return False, "密碼長度不能超過 50 個字元"
|
||
|
||
# 檢查是否包含至少一個字母和一個數字
|
||
has_letter = any(c.isalpha() for c in password)
|
||
has_digit = any(c.isdigit() for c in password)
|
||
|
||
if not has_letter:
|
||
return False, "密碼必須包含至少一個字母"
|
||
|
||
if not has_digit:
|
||
return False, "密碼必須包含至少一個數字"
|
||
|
||
return True, "密碼強度符合要求"
|
||
|
||
def update_user_password(loginid_or_email, old_password, new_password):
|
||
"""
|
||
更新用戶密碼
|
||
"""
|
||
try:
|
||
connection = pymysql.connect(**DB_CONFIG)
|
||
cursor = connection.cursor(pymysql.cursors.DictCursor)
|
||
|
||
# 先查詢用戶資料
|
||
query = """
|
||
SELECT id, loginid, username, email, hashed_password
|
||
FROM user_accounts
|
||
WHERE loginid = %s OR email = %s
|
||
"""
|
||
|
||
cursor.execute(query, (loginid_or_email, loginid_or_email))
|
||
user = cursor.fetchone()
|
||
|
||
if not user:
|
||
return False, "用戶不存在"
|
||
|
||
# 驗證舊密碼
|
||
if not verify_password(old_password, user['hashed_password']):
|
||
return False, "舊密碼不正確"
|
||
|
||
# 驗證新密碼強度
|
||
is_valid, message = validate_password_strength(new_password)
|
||
if not is_valid:
|
||
return False, message
|
||
|
||
# 生成新密碼的雜湊
|
||
new_hashed_password = hash_password_with_jwt(new_password)
|
||
if not new_hashed_password:
|
||
return False, "密碼雜湊失敗"
|
||
|
||
# 更新密碼
|
||
update_query = """
|
||
UPDATE user_accounts
|
||
SET hashed_password = %s, updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = %s
|
||
"""
|
||
|
||
cursor.execute(update_query, (new_hashed_password, user['id']))
|
||
connection.commit()
|
||
|
||
return True, "密碼修改成功"
|
||
|
||
except Exception as e:
|
||
print(f"更新密碼錯誤: {e}")
|
||
return False, f"系統錯誤: {str(e)}"
|
||
finally:
|
||
if 'connection' in locals():
|
||
connection.close()
|
||
|
||
def get_user_by_loginid_or_email(loginid_or_email):
|
||
"""
|
||
根據 loginid 或 email 查詢用戶資料
|
||
"""
|
||
try:
|
||
connection = pymysql.connect(**DB_CONFIG)
|
||
cursor = connection.cursor(pymysql.cursors.DictCursor)
|
||
|
||
# 查詢用戶資料
|
||
query = """
|
||
SELECT id, loginid, username, email, hashed_password
|
||
FROM user_accounts
|
||
WHERE loginid = %s OR email = %s
|
||
"""
|
||
|
||
cursor.execute(query, (loginid_or_email, loginid_or_email))
|
||
user = cursor.fetchone()
|
||
|
||
return user
|
||
|
||
except Exception as e:
|
||
print(f"資料庫查詢錯誤: {e}")
|
||
return None
|
||
finally:
|
||
if 'connection' in locals():
|
||
connection.close()
|
||
|
||
@app.route('/api/login', methods=['POST'])
|
||
def login():
|
||
"""
|
||
登入 API
|
||
接收 JSON 格式: {"account": "帳號或email", "password": "密碼"}
|
||
"""
|
||
try:
|
||
# 取得請求資料
|
||
data = request.get_json()
|
||
|
||
if not data:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '請提供 JSON 格式的資料',
|
||
'error_code': 'INVALID_REQUEST'
|
||
}), 400
|
||
|
||
account = data.get('account', '').strip()
|
||
password = data.get('password', '').strip()
|
||
|
||
# 檢查必要欄位
|
||
if not account or not password:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '請提供帳號和密碼',
|
||
'error_code': 'MISSING_FIELDS'
|
||
}), 400
|
||
|
||
# 查詢用戶資料
|
||
user = get_user_by_loginid_or_email(account)
|
||
|
||
if not user:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '無權限,請詢問資訊部同仁',
|
||
'error_code': 'USER_NOT_FOUND'
|
||
}), 401
|
||
|
||
# 驗證密碼
|
||
if not verify_password(password, user['hashed_password']):
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '無權限,請詢問資訊部同仁',
|
||
'error_code': 'INVALID_PASSWORD'
|
||
}), 401
|
||
|
||
# 登入成功
|
||
return jsonify({
|
||
'success': True,
|
||
'message': '登入成功',
|
||
'user': {
|
||
'id': user['id'],
|
||
'loginid': user['loginid'],
|
||
'username': user['username'],
|
||
'email': user['email']
|
||
}
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
print(f"登入 API 錯誤: {e}")
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '系統錯誤,請稍後再試',
|
||
'error_code': 'SYSTEM_ERROR'
|
||
}), 500
|
||
|
||
@app.route('/api/health', methods=['GET'])
|
||
def health_check():
|
||
"""
|
||
健康檢查 API
|
||
"""
|
||
try:
|
||
# 測試資料庫連線
|
||
connection = pymysql.connect(**DB_CONFIG)
|
||
cursor = connection.cursor()
|
||
cursor.execute("SELECT 1")
|
||
cursor.fetchone()
|
||
connection.close()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'message': 'API 服務正常',
|
||
'database': '連線正常',
|
||
'timestamp': datetime.now().isoformat()
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'API 服務異常',
|
||
'database': '連線失敗',
|
||
'error': str(e),
|
||
'timestamp': datetime.now().isoformat()
|
||
}), 500
|
||
|
||
@app.route('/api/user/<account>', methods=['GET'])
|
||
def get_user_info(account):
|
||
"""
|
||
查詢用戶資訊 API(不包含密碼)
|
||
"""
|
||
try:
|
||
user = get_user_by_loginid_or_email(account)
|
||
|
||
if not user:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '用戶不存在',
|
||
'error_code': 'USER_NOT_FOUND'
|
||
}), 404
|
||
|
||
# 回傳用戶資訊(不包含密碼)
|
||
return jsonify({
|
||
'success': True,
|
||
'user': {
|
||
'id': user['id'],
|
||
'loginid': user['loginid'],
|
||
'username': user['username'],
|
||
'email': user['email']
|
||
}
|
||
}), 200
|
||
|
||
except Exception as e:
|
||
print(f"查詢用戶資訊錯誤: {e}")
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '系統錯誤,請稍後再試',
|
||
'error_code': 'SYSTEM_ERROR'
|
||
}), 500
|
||
|
||
@app.route('/api/change-password', methods=['POST'])
|
||
def change_password():
|
||
"""
|
||
修改密碼 API
|
||
接收 JSON 格式: {"account": "帳號或email", "old_password": "舊密碼", "new_password": "新密碼"}
|
||
"""
|
||
try:
|
||
# 取得請求資料
|
||
data = request.get_json()
|
||
|
||
if not data:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '請提供 JSON 格式的資料',
|
||
'error_code': 'INVALID_REQUEST'
|
||
}), 400
|
||
|
||
account = data.get('account', '').strip()
|
||
old_password = data.get('old_password', '').strip()
|
||
new_password = data.get('new_password', '').strip()
|
||
|
||
# 檢查必要欄位
|
||
if not account or not old_password or not new_password:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '請提供帳號、舊密碼和新密碼',
|
||
'error_code': 'MISSING_FIELDS'
|
||
}), 400
|
||
|
||
# 檢查新密碼不能與舊密碼相同
|
||
if old_password == new_password:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '新密碼不能與舊密碼相同',
|
||
'error_code': 'SAME_PASSWORD'
|
||
}), 400
|
||
|
||
# 更新密碼
|
||
success, message = update_user_password(account, old_password, new_password)
|
||
|
||
if success:
|
||
return jsonify({
|
||
'success': True,
|
||
'message': message
|
||
}), 200
|
||
else:
|
||
# 根據錯誤類型返回不同的狀態碼
|
||
if "用戶不存在" in message:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': message,
|
||
'error_code': 'USER_NOT_FOUND'
|
||
}), 404
|
||
elif "舊密碼不正確" in message:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': message,
|
||
'error_code': 'INVALID_OLD_PASSWORD'
|
||
}), 401
|
||
elif "密碼長度" in message or "密碼必須" in message:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': message,
|
||
'error_code': 'WEAK_PASSWORD'
|
||
}), 400
|
||
else:
|
||
return jsonify({
|
||
'success': False,
|
||
'message': message,
|
||
'error_code': 'UPDATE_FAILED'
|
||
}), 500
|
||
|
||
except Exception as e:
|
||
print(f"修改密碼 API 錯誤: {e}")
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '系統錯誤,請稍後再試',
|
||
'error_code': 'SYSTEM_ERROR'
|
||
}), 500
|
||
|
||
@app.errorhandler(404)
|
||
def not_found(error):
|
||
return jsonify({
|
||
'success': False,
|
||
'message': 'API 端點不存在',
|
||
'error_code': 'NOT_FOUND'
|
||
}), 404
|
||
|
||
@app.errorhandler(405)
|
||
def method_not_allowed(error):
|
||
return jsonify({
|
||
'success': False,
|
||
'message': '不支援的 HTTP 方法',
|
||
'error_code': 'METHOD_NOT_ALLOWED'
|
||
}), 405
|
||
|
||
if __name__ == '__main__':
|
||
print(f"啟動 {Config.APP_NAME} v{Config.APP_VERSION}...")
|
||
print("API 端點:")
|
||
print(" POST /api/login - 登入驗證")
|
||
print(" POST /api/change-password - 修改密碼")
|
||
print(" GET /api/health - 健康檢查")
|
||
print(" GET /api/user/<account> - 查詢用戶資訊")
|
||
print(f" GET {SWAGGER_URL} - Swagger API 文件")
|
||
print(f"\n服務地址: http://{Config.API_HOST}:{Config.API_PORT}")
|
||
print(f"Swagger 文件: http://{Config.API_HOST}:{Config.API_PORT}{SWAGGER_URL}")
|
||
print("\n範例請求:")
|
||
print("登入驗證:")
|
||
print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/login \\')
|
||
print(' -H "Content-Type: application/json" \\')
|
||
print(' -d \'{"account": "92460", "password": "Panjit92460"}\'')
|
||
print("\n修改密碼:")
|
||
print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/change-password \\')
|
||
print(' -H "Content-Type: application/json" \\')
|
||
print(' -d \'{"account": "92460", "old_password": "Panjit92460", "new_password": "MyNewPassword123"}\'')
|
||
print("\n服務啟動中...")
|
||
|
||
app.run(host=Config.API_HOST, port=Config.API_PORT, debug=Config.API_DEBUG)
|