Initial commit

This commit is contained in:
2025-09-21 19:56:14 +08:00
commit 25f85f3e8c
7 changed files with 936 additions and 0 deletions

683
app.py Normal file
View File

@@ -0,0 +1,683 @@
from flask import Flask, request, jsonify
from flask_swagger_ui import get_swaggerui_blueprint
from flask_cors import CORS
import pymysql
import jwt
from datetime import datetime
import hashlib
import hmac
import base64
from config import Config
app = Flask(__name__)
# 啟用 CORS 支援
CORS(app, resources={
r"/api/*": {
"origins": "*",
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
# 使用配置類別
DB_CONFIG = Config.get_db_config()
JWT_SECRET = Config.JWT_SECRET
# Swagger UI 配置
SWAGGER_URL = '/api/docs' # Swagger UI 的 URL
API_URL = '/api/swagger.json' # API 文件的 JSON 位置
# 建立 Swagger UI Blueprint
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={
'app_name': Config.APP_NAME,
'app_version': Config.APP_VERSION,
'description': Config.APP_DESCRIPTION
}
)
# 註冊 Swagger UI Blueprint
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
@app.route('/api/swagger.json')
def swagger():
"""
Swagger API 文件 JSON
"""
return jsonify({
"swagger": "2.0",
"info": {
"title": Config.APP_NAME,
"version": Config.APP_VERSION,
"description": Config.APP_DESCRIPTION,
"contact": {
"name": "Panjit IT Department",
"email": "it@panjit.com.tw"
}
},
"host": f"{Config.API_HOST}:{Config.API_PORT}",
"basePath": "/api",
"schemes": ["http", "https"],
"consumes": ["application/json"],
"produces": ["application/json"],
"paths": {
"/login": {
"post": {
"tags": ["Authentication"],
"summary": "用戶登入驗證",
"description": "使用帳號和密碼進行登入驗證",
"parameters": [
{
"name": "body",
"in": "body",
"required": True,
"schema": {
"type": "object",
"properties": {
"account": {
"type": "string",
"description": "用戶帳號loginid 或 email",
"example": "92460"
},
"password": {
"type": "string",
"description": "用戶密碼",
"example": "Panjit92460"
}
},
"required": ["account", "password"]
}
}
],
"responses": {
"200": {
"description": "登入成功",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"user": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"loginid": {"type": "string"},
"username": {"type": "string"},
"email": {"type": "string"}
}
}
}
}
},
"401": {
"description": "登入失敗",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
}
}
}
},
"/change-password": {
"post": {
"tags": ["Password Management"],
"summary": "修改用戶密碼",
"description": "修改用戶的登入密碼",
"parameters": [
{
"name": "body",
"in": "body",
"required": True,
"schema": {
"type": "object",
"properties": {
"account": {
"type": "string",
"description": "用戶帳號loginid 或 email",
"example": "92460"
},
"old_password": {
"type": "string",
"description": "舊密碼",
"example": "Panjit92460"
},
"new_password": {
"type": "string",
"description": "新密碼6-50字元需包含字母和數字",
"example": "MyNewPassword123"
}
},
"required": ["account", "old_password", "new_password"]
}
}
],
"responses": {
"200": {
"description": "密碼修改成功",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"}
}
}
},
"400": {
"description": "請求錯誤",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
},
"401": {
"description": "舊密碼不正確",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
}
}
}
},
"/health": {
"get": {
"tags": ["System"],
"summary": "系統健康檢查",
"description": "檢查 API 服務和資料庫連線狀態",
"responses": {
"200": {
"description": "系統正常",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"database": {"type": "string"},
"timestamp": {"type": "string"}
}
}
}
}
}
},
"/user/{account}": {
"get": {
"tags": ["User Management"],
"summary": "查詢用戶資訊",
"description": "根據帳號查詢用戶基本資訊",
"parameters": [
{
"name": "account",
"in": "path",
"required": True,
"type": "string",
"description": "用戶帳號loginid 或 email",
"example": "92460"
}
],
"responses": {
"200": {
"description": "查詢成功",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"user": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"loginid": {"type": "string"},
"username": {"type": "string"},
"email": {"type": "string"}
}
}
}
}
},
"404": {
"description": "用戶不存在",
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean"},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
}
}
}
}
}
},
"definitions": {
"Error": {
"type": "object",
"properties": {
"success": {"type": "boolean", "example": False},
"message": {"type": "string"},
"error_code": {"type": "string"}
}
},
"User": {
"type": "object",
"properties": {
"id": {"type": "integer", "example": 1},
"loginid": {"type": "string", "example": "92460"},
"username": {"type": "string", "example": "丁于軒"},
"email": {"type": "string", "example": "sharnading@panjit.com.tw"}
}
}
}
})
def verify_password(input_password, hashed_password):
"""
驗證密碼是否正確
"""
try:
# 解碼 JWT token
decoded = jwt.decode(hashed_password, JWT_SECRET, algorithms=['HS256'])
# 檢查 token 類型
if decoded.get('type') != 'password_hash':
return False
# 比較密碼
stored_password = decoded.get('password', '')
return input_password == stored_password
except jwt.ExpiredSignatureError:
print("JWT token 已過期")
return False
except jwt.InvalidTokenError:
print("無效的 JWT token")
return False
except Exception as e:
print(f"密碼驗證錯誤: {e}")
return False
def hash_password_with_jwt(password):
"""
使用 JWT 將密碼進行雜湊處理
"""
try:
# 建立 JWT payload包含密碼和時間戳
payload = {
'password': password,
'timestamp': datetime.now().isoformat(),
'type': 'password_hash'
}
# 使用 JWT 編碼密碼
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
return token
except Exception as e:
print(f"密碼雜湊錯誤: {e}")
return None
def validate_password_strength(password):
"""
驗證密碼強度
"""
if len(password) < 6:
return False, "密碼長度至少需要 6 個字元"
if len(password) > 50:
return False, "密碼長度不能超過 50 個字元"
# 檢查是否包含至少一個字母和一個數字
has_letter = any(c.isalpha() for c in password)
has_digit = any(c.isdigit() for c in password)
if not has_letter:
return False, "密碼必須包含至少一個字母"
if not has_digit:
return False, "密碼必須包含至少一個數字"
return True, "密碼強度符合要求"
def update_user_password(loginid_or_email, old_password, new_password):
"""
更新用戶密碼
"""
try:
connection = pymysql.connect(**DB_CONFIG)
cursor = connection.cursor(pymysql.cursors.DictCursor)
# 先查詢用戶資料
query = """
SELECT id, loginid, username, email, hashed_password
FROM user_accounts
WHERE loginid = %s OR email = %s
"""
cursor.execute(query, (loginid_or_email, loginid_or_email))
user = cursor.fetchone()
if not user:
return False, "用戶不存在"
# 驗證舊密碼
if not verify_password(old_password, user['hashed_password']):
return False, "舊密碼不正確"
# 驗證新密碼強度
is_valid, message = validate_password_strength(new_password)
if not is_valid:
return False, message
# 生成新密碼的雜湊
new_hashed_password = hash_password_with_jwt(new_password)
if not new_hashed_password:
return False, "密碼雜湊失敗"
# 更新密碼
update_query = """
UPDATE user_accounts
SET hashed_password = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
"""
cursor.execute(update_query, (new_hashed_password, user['id']))
connection.commit()
return True, "密碼修改成功"
except Exception as e:
print(f"更新密碼錯誤: {e}")
return False, f"系統錯誤: {str(e)}"
finally:
if 'connection' in locals():
connection.close()
def get_user_by_loginid_or_email(loginid_or_email):
"""
根據 loginid 或 email 查詢用戶資料
"""
try:
connection = pymysql.connect(**DB_CONFIG)
cursor = connection.cursor(pymysql.cursors.DictCursor)
# 查詢用戶資料
query = """
SELECT id, loginid, username, email, hashed_password
FROM user_accounts
WHERE loginid = %s OR email = %s
"""
cursor.execute(query, (loginid_or_email, loginid_or_email))
user = cursor.fetchone()
return user
except Exception as e:
print(f"資料庫查詢錯誤: {e}")
return None
finally:
if 'connection' in locals():
connection.close()
@app.route('/api/login', methods=['POST'])
def login():
"""
登入 API
接收 JSON 格式: {"account": "帳號或email", "password": "密碼"}
"""
try:
# 取得請求資料
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '請提供 JSON 格式的資料',
'error_code': 'INVALID_REQUEST'
}), 400
account = data.get('account', '').strip()
password = data.get('password', '').strip()
# 檢查必要欄位
if not account or not password:
return jsonify({
'success': False,
'message': '請提供帳號和密碼',
'error_code': 'MISSING_FIELDS'
}), 400
# 查詢用戶資料
user = get_user_by_loginid_or_email(account)
if not user:
return jsonify({
'success': False,
'message': '無權限,請詢問資訊部同仁',
'error_code': 'USER_NOT_FOUND'
}), 401
# 驗證密碼
if not verify_password(password, user['hashed_password']):
return jsonify({
'success': False,
'message': '無權限,請詢問資訊部同仁',
'error_code': 'INVALID_PASSWORD'
}), 401
# 登入成功
return jsonify({
'success': True,
'message': '登入成功',
'user': {
'id': user['id'],
'loginid': user['loginid'],
'username': user['username'],
'email': user['email']
}
}), 200
except Exception as e:
print(f"登入 API 錯誤: {e}")
return jsonify({
'success': False,
'message': '系統錯誤,請稍後再試',
'error_code': 'SYSTEM_ERROR'
}), 500
@app.route('/api/health', methods=['GET'])
def health_check():
"""
健康檢查 API
"""
try:
# 測試資料庫連線
connection = pymysql.connect(**DB_CONFIG)
cursor = connection.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
connection.close()
return jsonify({
'success': True,
'message': 'API 服務正常',
'database': '連線正常',
'timestamp': datetime.now().isoformat()
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': 'API 服務異常',
'database': '連線失敗',
'error': str(e),
'timestamp': datetime.now().isoformat()
}), 500
@app.route('/api/user/<account>', methods=['GET'])
def get_user_info(account):
"""
查詢用戶資訊 API不包含密碼
"""
try:
user = get_user_by_loginid_or_email(account)
if not user:
return jsonify({
'success': False,
'message': '用戶不存在',
'error_code': 'USER_NOT_FOUND'
}), 404
# 回傳用戶資訊(不包含密碼)
return jsonify({
'success': True,
'user': {
'id': user['id'],
'loginid': user['loginid'],
'username': user['username'],
'email': user['email']
}
}), 200
except Exception as e:
print(f"查詢用戶資訊錯誤: {e}")
return jsonify({
'success': False,
'message': '系統錯誤,請稍後再試',
'error_code': 'SYSTEM_ERROR'
}), 500
@app.route('/api/change-password', methods=['POST'])
def change_password():
"""
修改密碼 API
接收 JSON 格式: {"account": "帳號或email", "old_password": "舊密碼", "new_password": "新密碼"}
"""
try:
# 取得請求資料
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '請提供 JSON 格式的資料',
'error_code': 'INVALID_REQUEST'
}), 400
account = data.get('account', '').strip()
old_password = data.get('old_password', '').strip()
new_password = data.get('new_password', '').strip()
# 檢查必要欄位
if not account or not old_password or not new_password:
return jsonify({
'success': False,
'message': '請提供帳號、舊密碼和新密碼',
'error_code': 'MISSING_FIELDS'
}), 400
# 檢查新密碼不能與舊密碼相同
if old_password == new_password:
return jsonify({
'success': False,
'message': '新密碼不能與舊密碼相同',
'error_code': 'SAME_PASSWORD'
}), 400
# 更新密碼
success, message = update_user_password(account, old_password, new_password)
if success:
return jsonify({
'success': True,
'message': message
}), 200
else:
# 根據錯誤類型返回不同的狀態碼
if "用戶不存在" in message:
return jsonify({
'success': False,
'message': message,
'error_code': 'USER_NOT_FOUND'
}), 404
elif "舊密碼不正確" in message:
return jsonify({
'success': False,
'message': message,
'error_code': 'INVALID_OLD_PASSWORD'
}), 401
elif "密碼長度" in message or "密碼必須" in message:
return jsonify({
'success': False,
'message': message,
'error_code': 'WEAK_PASSWORD'
}), 400
else:
return jsonify({
'success': False,
'message': message,
'error_code': 'UPDATE_FAILED'
}), 500
except Exception as e:
print(f"修改密碼 API 錯誤: {e}")
return jsonify({
'success': False,
'message': '系統錯誤,請稍後再試',
'error_code': 'SYSTEM_ERROR'
}), 500
@app.errorhandler(404)
def not_found(error):
return jsonify({
'success': False,
'message': 'API 端點不存在',
'error_code': 'NOT_FOUND'
}), 404
@app.errorhandler(405)
def method_not_allowed(error):
return jsonify({
'success': False,
'message': '不支援的 HTTP 方法',
'error_code': 'METHOD_NOT_ALLOWED'
}), 405
if __name__ == '__main__':
print(f"啟動 {Config.APP_NAME} v{Config.APP_VERSION}...")
print("API 端點:")
print(" POST /api/login - 登入驗證")
print(" POST /api/change-password - 修改密碼")
print(" GET /api/health - 健康檢查")
print(" GET /api/user/<account> - 查詢用戶資訊")
print(f" GET {SWAGGER_URL} - Swagger API 文件")
print(f"\n服務地址: http://{Config.API_HOST}:{Config.API_PORT}")
print(f"Swagger 文件: http://{Config.API_HOST}:{Config.API_PORT}{SWAGGER_URL}")
print("\n範例請求:")
print("登入驗證:")
print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/login \\')
print(' -H "Content-Type: application/json" \\')
print(' -d \'{"account": "92460", "password": "Panjit92460"}\'')
print("\n修改密碼:")
print(f'curl -X POST http://{Config.API_HOST}:{Config.API_PORT}/api/change-password \\')
print(' -H "Content-Type: application/json" \\')
print(' -d \'{"account": "92460", "old_password": "Panjit92460", "new_password": "MyNewPassword123"}\'')
print("\n服務啟動中...")
app.run(host=Config.API_HOST, port=Config.API_PORT, debug=Config.API_DEBUG)