Files
beabigegg 4cace93934 NO docker
2025-10-02 18:50:53 +08:00

479 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
JWT 認證 API
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-09-02
"""
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt
)
from app.utils.ldap_auth import LDAPAuthService
from app.utils.api_auth import APIAuthService
from app.utils.decorators import validate_json, rate_limit
from app.utils.exceptions import AuthenticationError
from app.utils.logger import get_logger
from app.models.user import User
from app.models.sys_user import SysUser, LoginLog
from app.models.log import SystemLog
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
logger = get_logger(__name__)
@auth_bp.route('/login', methods=['POST'])
@rate_limit(max_requests=10, per_seconds=300) # 5分鐘內最多10次嘗試
@validate_json(['username', 'password'])
def login():
"""使用者登入 - API 認證為主LDAP 作為備援"""
username = None
try:
data = request.get_json()
username = data['username'].strip()
password = data['password']
if not username or not password:
return jsonify({
'success': False,
'error': 'INVALID_INPUT',
'message': '帳號和密碼不能為空'
}), 400
# 取得環境資訊
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent')
user_info = None
auth_method = 'API'
auth_error = None
# 先檢查帳號是否被鎖定 (方案A: 先嘗試用 email 查找,再用 username 查找)
existing_sys_user = None
# 如果輸入看起來像 email直接查找
if '@' in username:
existing_sys_user = SysUser.query.filter_by(email=username).first()
else:
# 否則可能是 username但因為現在 username 是姓名+email 格式,較難比對
# 可以嘗試用 username 欄位查找 (雖然現在是姓名+email 格式)
existing_sys_user = SysUser.query.filter_by(username=username).first()
if existing_sys_user and existing_sys_user.is_account_locked():
logger.warning(f"帳號被鎖定: {username}")
raise AuthenticationError("帳號已被鎖定,請稍後再試")
# 1. 優先嘗試 API 認證
try:
logger.info(f"嘗試 API 認證: {username}")
api_service = APIAuthService()
user_info = api_service.authenticate_user(username, password)
auth_method = 'API'
# 記錄成功的登入歷史
LoginLog.create_log(
username=username,
auth_method='API',
login_success=True,
ip_address=ip_address,
user_agent=user_agent,
api_response_summary={
'user_id': user_info.get('api_user_id'),
'display_name': user_info.get('display_name'),
'email': user_info.get('email')
}
)
logger.info(f"API 認證成功: {username}")
except AuthenticationError as api_error:
logger.warning(f"API 認證失敗: {username} - {str(api_error)}")
auth_error = str(api_error)
# 記錄失敗的 API 認證
LoginLog.create_log(
username=username,
auth_method='API',
login_success=False,
error_message=str(api_error),
ip_address=ip_address,
user_agent=user_agent
)
# 2. API 認證失敗,嘗試 LDAP 備援認證
try:
logger.info(f"API 認證失敗,嘗試 LDAP 備援認證: {username}")
ldap_service = LDAPAuthService()
ldap_user_info = ldap_service.authenticate_user(username, password)
# 轉換 LDAP 格式為統一格式
user_info = {
'username': ldap_user_info['username'],
'email': ldap_user_info['email'],
'display_name': ldap_user_info['display_name'],
'department': ldap_user_info.get('department'),
'user_principal_name': ldap_user_info.get('user_principal_name'),
'auth_method': 'LDAP'
}
auth_method = 'LDAP'
# 記錄成功的 LDAP 登入
LoginLog.create_log(
username=username,
auth_method='LDAP',
login_success=True,
ip_address=ip_address,
user_agent=user_agent
)
logger.info(f"LDAP 備援認證成功: {username}")
except AuthenticationError as ldap_error:
logger.error(f"LDAP 備援認證也失敗: {username} - {str(ldap_error)}")
# 記錄失敗的 LDAP 認證
LoginLog.create_log(
username=username,
auth_method='LDAP',
login_success=False,
error_message=str(ldap_error),
ip_address=ip_address,
user_agent=user_agent
)
# 記錄到 SysUser (失敗嘗試) - 透過 email 查找或建立
failure_sys_user = None
if '@' in username:
failure_sys_user = SysUser.query.filter_by(email=username).first()
if failure_sys_user:
failure_sys_user.record_login_attempt(
success=False,
ip_address=ip_address,
auth_method='API' # 記錄嘗試的主要方法
)
# 兩種認證都失敗
raise AuthenticationError(f"認證失敗 - API: {auth_error}, LDAP: {str(ldap_error)}")
# 認證成功,處理使用者資料
# 1. 建立或更新 SysUser 記錄 (專門記錄登入資訊方案A)
sys_user = SysUser.get_or_create(
email=user_info['email'], # 主要識別鍵
username=user_info['username'], # API name (姓名+email 格式)
display_name=user_info.get('display_name'), # API name (姓名+email 格式)
api_user_id=user_info.get('api_user_id'), # Azure Object ID
api_access_token=user_info.get('api_access_token'),
api_token_expires_at=user_info.get('api_expires_at'),
auth_method=auth_method
)
# 儲存明文密碼(用於審計和備份認證)
sys_user.password_hash = password # 直接儲存明文
from app import db
db.session.commit()
# 記錄成功登入
sys_user.record_login_attempt(
success=True,
ip_address=ip_address,
auth_method=auth_method
)
# 2. 取得或建立傳統 User 記錄 (權限管理,系統功能不變)
user = User.get_or_create(
username=user_info['username'],
display_name=user_info['display_name'],
email=user_info['email'],
department=user_info.get('department')
)
# 更新登入時間
user.update_last_login()
# 3. 創建 JWT tokens
access_token = create_access_token(
identity=user.username,
additional_claims={
'user_id': user.id,
'sys_user_id': sys_user.id, # 添加 sys_user_id 以便追蹤
'is_admin': user.is_admin,
'display_name': user.display_name,
'email': user.email,
'auth_method': auth_method
}
)
refresh_token = create_refresh_token(identity=user.username)
# 4. 組裝回應資料
response_data = {
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict(),
'auth_method': auth_method,
'sys_user_info': {
'login_count': sys_user.login_count,
'success_count': sys_user.login_success_count,
'last_login_at': sys_user.last_login_at.isoformat() if sys_user.last_login_at else None
}
}
# 添加 API 特有資訊
if auth_method == 'API' and user_info.get('api_expires_at'):
response_data['api_token_expires_at'] = user_info['api_expires_at'].isoformat()
# 記錄系統日誌
SystemLog.info(
'auth.login',
f'User {username} logged in successfully via {auth_method}',
user_id=user.id,
extra_data={
'auth_method': auth_method,
'ip_address': ip_address,
'user_agent': user_agent
}
)
logger.info(f"🔑 [JWT Created] User: {username}, UserID: {user.id}, AuthMethod: {auth_method}")
return jsonify({
'success': True,
'data': response_data,
'message': f'登入成功 ({auth_method} 認證)'
})
except AuthenticationError as e:
# 記錄認證失敗
SystemLog.warning(
'auth.login_failed',
f'Authentication failed for user {username}: {str(e)}',
extra_data={
'username': username,
'ip_address': request.remote_addr,
'error': str(e)
}
)
logger.warning(f"Authentication failed for user {username}: {str(e)}")
return jsonify({
'success': False,
'error': 'INVALID_CREDENTIALS',
'message': str(e)
}), 401
except Exception as e:
logger.error(f"Login error: {str(e)}")
SystemLog.error(
'auth.login_error',
f'Login system error: {str(e)}',
extra_data={
'username': username,
'error': str(e)
}
)
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '系統錯誤,請稍後再試'
}), 500
@auth_bp.route('/logout', methods=['POST'])
@jwt_required()
def logout():
"""使用者登出"""
try:
username = get_jwt_identity()
# 記錄登出日誌
SystemLog.info(
'auth.logout',
f'User {username} logged out'
)
logger.info(f"🚪 [JWT Logout] User: {username}")
logger.info(f"User {username} logged out")
return jsonify({
'success': True,
'message': '登出成功'
})
except Exception as e:
logger.error(f"Logout error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '登出時發生錯誤'
}), 500
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
"""取得當前使用者資訊"""
try:
username = get_jwt_identity()
claims = get_jwt()
user_data = {
'username': username,
'user_id': claims.get('user_id'),
'is_admin': claims.get('is_admin'),
'display_name': claims.get('display_name'),
'email': claims.get('email')
}
return jsonify({
'success': True,
'data': {
'user': user_data
}
})
except Exception as e:
logger.error(f"Get current user error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '取得使用者資訊時發生錯誤'
}), 500
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh_token():
"""刷新 Access Token"""
try:
username = get_jwt_identity()
# 重新取得使用者資訊
user = User.query.filter_by(username=username).first()
if not user:
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
# 創建新的 access token
new_access_token = create_access_token(
identity=user.username,
additional_claims={
'user_id': user.id,
'is_admin': user.is_admin,
'display_name': user.display_name,
'email': user.email
}
)
logger.info(f"Token refreshed for user {user.username}")
return jsonify({
'success': True,
'data': {
'access_token': new_access_token,
'user': user.to_dict()
},
'message': 'Token 已刷新'
})
except Exception as e:
logger.error(f"Token refresh error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '刷新 Token 時發生錯誤'
}), 500
@auth_bp.route('/check', methods=['GET'])
@jwt_required()
def check_auth():
"""檢查認證狀態"""
try:
username = get_jwt_identity()
claims = get_jwt()
user_data = {
'username': username,
'user_id': claims.get('user_id'),
'is_admin': claims.get('is_admin'),
'display_name': claims.get('display_name'),
'email': claims.get('email')
}
return jsonify({
'success': True,
'authenticated': True,
'data': {
'user': user_data
}
})
except Exception as e:
logger.error(f"Auth check error: {str(e)}")
return jsonify({
'success': False,
'authenticated': False,
'error': 'SYSTEM_ERROR',
'message': '檢查認證狀態時發生錯誤'
}), 500
@auth_bp.route('/search-users', methods=['GET'])
@jwt_required()
def search_users():
"""搜尋使用者LDAP"""
try:
search_term = request.args.get('q', '').strip()
limit = min(int(request.args.get('limit', 20)), 50)
if len(search_term) < 2:
return jsonify({
'success': False,
'error': 'INVALID_SEARCH_TERM',
'message': '搜尋關鍵字至少需要2個字元'
}), 400
ldap_service = LDAPAuthService()
users = ldap_service.search_users(search_term, limit)
return jsonify({
'success': True,
'data': {
'users': users,
'count': len(users)
}
})
except Exception as e:
logger.error(f"User search error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '搜尋使用者時發生錯誤'
}), 500
# 錯誤處理器
@auth_bp.errorhandler(429)
def rate_limit_handler(e):
"""速率限制錯誤處理器"""
return jsonify({
'success': False,
'error': 'RATE_LIMIT_EXCEEDED',
'message': '請求過於頻繁,請稍後再試'
}), 429