Files
pj_password/app.py
2025-09-21 19:56:14 +08:00

684 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, request, jsonify
from flask_swagger_ui import get_swaggerui_blueprint
from flask_cors import CORS
import pymysql
import jwt
from datetime import datetime
import hashlib
import hmac
import base64
from config import Config
app = Flask(__name__)
# 啟用 CORS 支援
CORS(app, resources={
r"/api/*": {
"origins": "*",
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
# 使用配置類別
DB_CONFIG = Config.get_db_config()
JWT_SECRET = Config.JWT_SECRET
# Swagger UI 配置
SWAGGER_URL = '/api/docs' # Swagger UI 的 URL
API_URL = '/api/swagger.json' # API 文件的 JSON 位置
# 建立 Swagger UI Blueprint
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={
'app_name': Config.APP_NAME,
'app_version': Config.APP_VERSION,
'description': Config.APP_DESCRIPTION
}
)
# 註冊 Swagger UI Blueprint
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
@app.route('/api/swagger.json')
def swagger():
"""
Swagger API 文件 JSON
"""
return jsonify({
"swagger": "2.0",
"info": {
"title": Config.APP_NAME,
"version": Config.APP_VERSION,
"description": Config.APP_DESCRIPTION,
"contact": {
"name": "Panjit IT Department",
"email": "it@panjit.com.tw"
}
},
"host": f"{Config.API_HOST}:{Config.API_PORT}",
"basePath": "/api",
"schemes": ["http", "https"],
"consumes": ["application/json"],
"produces": ["application/json"],
"paths": {
"/login": {
"post": {
"tags": ["Authentication"],
"summary": "用戶登入驗證",
"description": "使用帳號和密碼進行登入驗證",
"parameters": [
{
"name": "body",
"in": "body",
"required": True,
"schema": {
"type": "object",
"properties": {
"account": {
"type": "string",
"description": "用戶帳號loginid 或 email",
"example": "92460"
},
"password": {
"type": "string",
"description": "用戶密碼",
"example": "Panjit92460"
}
},
"required": ["account", "password"]
}
}
],
"responses": {
"200": {
"description": "登入成功",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"user": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"loginid": {"type": "string"},
"username": {"type": "string"},
"email": {"type": "string"}
}
}
}
}
},
"401": {
"description": "登入失敗",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
}
}
}
},
"/change-password": {
"post": {
"tags": ["Password Management"],
"summary": "修改用戶密碼",
"description": "修改用戶的登入密碼",
"parameters": [
{
"name": "body",
"in": "body",
"required": True,
"schema": {
"type": "object",
"properties": {
"account": {
"type": "string",
"description": "用戶帳號loginid 或 email",
"example": "92460"
},
"old_password": {
"type": "string",
"description": "舊密碼",
"example": "Panjit92460"
},
"new_password": {
"type": "string",
"description": "新密碼6-50字元需包含字母和數字",
"example": "MyNewPassword123"
}
},
"required": ["account", "old_password", "new_password"]
}
}
],
"responses": {
"200": {
"description": "密碼修改成功",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"}
}
}
},
"400": {
"description": "請求錯誤",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
},
"401": {
"description": "舊密碼不正確",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
}
}
}
},
"/health": {
"get": {
"tags": ["System"],
"summary": "系統健康檢查",
"description": "檢查 API 服務和資料庫連線狀態",
"responses": {
"200": {
"description": "系統正常",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"database": {"type": "string"},
"timestamp": {"type": "string"}
}
}
}
}
}
},
"/user/{account}": {
"get": {
"tags": ["User Management"],
"summary": "查詢用戶資訊",
"description": "根據帳號查詢用戶基本資訊",
"parameters": [
{
"name": "account",
"in": "path",
"required": True,
"type": "string",
"description": "用戶帳號loginid 或 email",
"example": "92460"
}
],
"responses": {
"200": {
"description": "查詢成功",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"user": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"loginid": {"type": "string"},
"username": {"type": "string"},
"email": {"type": "string"}
}
}
}
}
},
"404": {
"description": "用戶不存在",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
}
}
}
}
},
"definitions": {
"Error": {
"type": "object",
"properties": {
"success": {"type": "boolean", "example": False},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
},
"User": {
"type": "object",
"properties": {
"id": {"type": "integer", "example": 1},
"loginid": {"type": "string", "example": "92460"},
"username": {"type": "string", "example": "丁于軒"},
"email": {"type": "string", "example": "sharnading@panjit.com.tw"}
}
}
}
})
def verify_password(input_password, hashed_password):
"""
驗證密碼是否正確
"""
try:
# 解碼 JWT token
decoded = jwt.decode(hashed_password, JWT_SECRET, algorithms=['HS256'])
# 檢查 token 類型
if decoded.get('type') != 'password_hash':
return False
# 比較密碼
stored_password = decoded.get('password', '')
return input_password == stored_password
except jwt.ExpiredSignatureError:
print("JWT token 已過期")
return False
except jwt.InvalidTokenError:
print("無效的 JWT token")
return False
except Exception as e:
print(f"密碼驗證錯誤: {e}")
return False
def hash_password_with_jwt(password):
"""
使用 JWT 將密碼進行雜湊處理
"""
try:
# 建立 JWT payload包含密碼和時間戳
payload = {
'password': password,
'timestamp': datetime.now().isoformat(),
'type': 'password_hash'
}
# 使用 JWT 編碼密碼
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
return token
except Exception as e:
print(f"密碼雜湊錯誤: {e}")
return None
def validate_password_strength(password):
"""
驗證密碼強度
"""
if len(password) < 6:
return False, "密碼長度至少需要 6 個字元"
if len(password) > 50:
return False, "密碼長度不能超過 50 個字元"
# 檢查是否包含至少一個字母和一個數字
has_letter = any(c.isalpha() for c in password)
has_digit = any(c.isdigit() for c in password)
if not has_letter:
return False, "密碼必須包含至少一個字母"
if not has_digit:
return False, "密碼必須包含至少一個數字"
return True, "密碼強度符合要求"
def update_user_password(loginid_or_email, old_password, new_password):
"""
更新用戶密碼
"""
try:
connection = pymysql.connect(**DB_CONFIG)
cursor = connection.cursor(pymysql.cursors.DictCursor)
# 先查詢用戶資料
query = """
SELECT id, loginid, username, email, hashed_password
FROM user_accounts
WHERE loginid = %s OR email = %s
"""
cursor.execute(query, (loginid_or_email, loginid_or_email))
user = cursor.fetchone()
if not user:
return False, "用戶不存在"
# 驗證舊密碼
if not verify_password(old_password, user['hashed_password']):
return False, "舊密碼不正確"
# 驗證新密碼強度
is_valid, message = validate_password_strength(new_password)
if not is_valid:
return False, message
# 生成新密碼的雜湊
new_hashed_password = hash_password_with_jwt(new_password)
if not new_hashed_password:
return False, "密碼雜湊失敗"
# 更新密碼
update_query = """
UPDATE user_accounts
SET hashed_password = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
cursor.execute(update_query, (new_hashed_password, user['id']))
connection.commit()
return True, "密碼修改成功"
except Exception as e:
print(f"更新密碼錯誤: {e}")
return False, f"系統錯誤: {str(e)}"
finally:
if 'connection' in locals():
connection.close()
def get_user_by_loginid_or_email(loginid_or_email):
"""
根據 loginid 或 email 查詢用戶資料
"""
try:
connection = pymysql.connect(**DB_CONFIG)
cursor = connection.cursor(pymysql.cursors.DictCursor)
# 查詢用戶資料
query = """
SELECT id, loginid, username, email, hashed_password
FROM user_accounts
WHERE loginid = %s OR email = %s
"""
cursor.execute(query, (loginid_or_email, loginid_or_email))
user = cursor.fetchone()
return user
except Exception as e:
print(f"資料庫查詢錯誤: {e}")
return None
finally:
if 'connection' in locals():
connection.close()
@app.route('/api/login', methods=['POST'])
def login():
"""
登入 API
接收 JSON 格式: {"account": "帳號或email", "password": "密碼"}
"""
try:
# 取得請求資料
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '請提供 JSON 格式的資料',
'error_code': 'INVALID_REQUEST'
}), 400
account = data.get('account', '').strip()
password = data.get('password', '').strip()
# 檢查必要欄位
if not account or not password:
return jsonify({
'success': False,
'message': '請提供帳號和密碼',
'error_code': 'MISSING_FIELDS'
}), 400
# 查詢用戶資料
user = get_user_by_loginid_or_email(account)
if not user:
return jsonify({
'success': False,
'message': '無權限,請詢問資訊部同仁',
'error_code': 'USER_NOT_FOUND'
}), 401
# 驗證密碼
if not verify_password(password, user['hashed_password']):
return jsonify({
'success': False,
'message': '無權限,請詢問資訊部同仁',
'error_code': 'INVALID_PASSWORD'
}), 401
# 登入成功
return jsonify({
'success': True,
'message': '登入成功',
'user': {
'id': user['id'],
'loginid': user['loginid'],
'username': user['username'],
'email': user['email']
}
}), 200
except Exception as e:
print(f"登入 API 錯誤: {e}")
return jsonify({
'success': False,
'message': '系統錯誤,請稍後再試',
'error_code': 'SYSTEM_ERROR'
}), 500
@app.route('/api/health', methods=['GET'])
def health_check():
"""
健康檢查 API
"""
try:
# 測試資料庫連線
connection = pymysql.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
connection.close()
return jsonify({
'success': True,
'message': 'API 服務正常',
'database': '連線正常',
'timestamp': datetime.now().isoformat()
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': 'API 服務異常',
'database': '連線失敗',
'error': str(e),
'timestamp': datetime.now().isoformat()
}), 500
@app.route('/api/user/<account>', methods=['GET'])
def get_user_info(account):
"""
查詢用戶資訊 API不包含密碼
"""
try:
user = get_user_by_loginid_or_email(account)
if not user:
return jsonify({
'success': False,
'message': '用戶不存在',
'error_code': 'USER_NOT_FOUND'
}), 404
# 回傳用戶資訊(不包含密碼)
return jsonify({
'success': True,
'user': {
'id': user['id'],
'loginid': user['loginid'],
'username': user['username'],
'email': user['email']
}
}), 200
except Exception as e:
print(f"查詢用戶資訊錯誤: {e}")
return jsonify({
'success': False,
'message': '系統錯誤,請稍後再試',
'error_code': 'SYSTEM_ERROR'
}), 500
@app.route('/api/change-password', methods=['POST'])
def change_password():
"""
修改密碼 API
接收 JSON 格式: {"account": "帳號或email", "old_password": "舊密碼", "new_password": "新密碼"}
"""
try:
# 取得請求資料
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '請提供 JSON 格式的資料',
'error_code': 'INVALID_REQUEST'
}), 400
account = data.get('account', '').strip()
old_password = data.get('old_password', '').strip()
new_password = data.get('new_password', '').strip()
# 檢查必要欄位
if not account or not old_password or not new_password:
return jsonify({
'success': False,
'message': '請提供帳號、舊密碼和新密碼',
'error_code': 'MISSING_FIELDS'
}), 400
# 檢查新密碼不能與舊密碼相同
if old_password == new_password:
return jsonify({
'success': False,
'message': '新密碼不能與舊密碼相同',
'error_code': 'SAME_PASSWORD'
}), 400
# 更新密碼
success, message = update_user_password(account, old_password, new_password)
if success:
return jsonify({
'success': True,
'message': message
}), 200
else:
# 根據錯誤類型返回不同的狀態碼
if "用戶不存在" in message:
return jsonify({
'success': False,
'message': message,
'error_code': 'USER_NOT_FOUND'
}), 404
elif "舊密碼不正確" in message:
return jsonify({
'success': False,
'message': message,
'error_code': 'INVALID_OLD_PASSWORD'
}), 401
elif "密碼長度" in message or "密碼必須" in message:
return jsonify({
'success': False,
'message': message,
'error_code': 'WEAK_PASSWORD'
}), 400
else:
return jsonify({
'success': False,
'message': message,
'error_code': 'UPDATE_FAILED'
}), 500
except Exception as e:
print(f"修改密碼 API 錯誤: {e}")
return jsonify({
'success': False,
'message': '系統錯誤,請稍後再試',
'error_code': 'SYSTEM_ERROR'
}), 500
@app.errorhandler(404)
def not_found(error):
return jsonify({
'success': False,
'message': 'API 端點不存在',
'error_code': 'NOT_FOUND'
}), 404
@app.errorhandler(405)
def method_not_allowed(error):
return jsonify({
'success': False,
'message': '不支援的 HTTP 方法',
'error_code': 'METHOD_NOT_ALLOWED'
}), 405
if __name__ == '__main__':
print(f"啟動 {Config.APP_NAME} v{Config.APP_VERSION}...")
print("API 端點:")
print(" POST /api/login - 登入驗證")
print(" POST /api/change-password - 修改密碼")
print(" GET /api/health - 健康檢查")
print(" GET /api/user/<account> - 查詢用戶資訊")
print(f" GET {SWAGGER_URL} - Swagger API 文件")
print(f"\n服務地址: http://{Config.API_HOST}:{Config.API_PORT}")
print(f"Swagger 文件: http://{Config.API_HOST}:{Config.API_PORT}{SWAGGER_URL}")
print("\n範例請求:")
print("登入驗證:")
print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/login \\')
print(' -H "Content-Type: application/json" \\')
print(' -d \'{"account": "92460", "password": "Panjit92460"}\'')
print("\n修改密碼:")
print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/change-password \\')
print(' -H "Content-Type: application/json" \\')
print(' -d \'{"account": "92460", "old_password": "Panjit92460", "new_password": "MyNewPassword123"}\'')
print("\n服務啟動中...")
app.run(host=Config.API_HOST, port=Config.API_PORT, debug=Config.API_DEBUG)