Files
Document_Translator/app/api/auth_old.py
2025-09-02 10:31:35 +08:00

317 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
認證 API
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
from flask import Blueprint, request, jsonify, current_app
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity
from app.utils.ldap_auth import LDAPAuthService
from app.utils.decorators import login_required, validate_json, rate_limit
from app.utils.exceptions import AuthenticationError
from app.utils.logger import get_logger
from app.models.user import User
from app.models.log import SystemLog
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
logger = get_logger(__name__)
@auth_bp.route('/login', methods=['POST'])
@rate_limit(max_requests=10, per_seconds=300) # 5分鐘內最多10次嘗試
@validate_json(['username', 'password'])
def login():
"""使用者登入"""
try:
data = request.get_json()
username = data['username'].strip()
password = data['password']
if not username or not password:
return jsonify({
'success': False,
'error': 'INVALID_INPUT',
'message': '帳號和密碼不能為空'
}), 400
# LDAP 認證
ldap_service = LDAPAuthService()
user_info = ldap_service.authenticate_user(username, password)
# 取得或建立使用者
user = User.get_or_create(
username=user_info['username'],
display_name=user_info['display_name'],
email=user_info['email'],
department=user_info.get('department')
)
# 更新登入時間
user.update_last_login()
# 創建 JWT tokens
access_token = create_access_token(
identity=user.username,
additional_claims={
'user_id': user.id,
'is_admin': user.is_admin,
'display_name': user.display_name,
'email': user.email
}
)
refresh_token = create_refresh_token(identity=user.username)
# 記錄登入日誌
SystemLog.info(
'auth.login',
f'User {username} logged in successfully',
user_id=user.id,
extra_data={
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent')
}
)
logger.info(f"🔑 [JWT Created] User: {username}, UserID: {user.id}")
logger.info(f"User {username} logged in successfully")
return jsonify({
'success': True,
'data': {
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
},
'message': '登入成功'
})
except AuthenticationError as e:
# 記錄認證失敗
SystemLog.warning(
'auth.login_failed',
f'Authentication failed for user {username}: {str(e)}',
extra_data={
'username': username,
'ip_address': request.remote_addr,
'error': str(e)
}
)
logger.warning(f"Authentication failed for user {username}: {str(e)}")
return jsonify({
'success': False,
'error': 'INVALID_CREDENTIALS',
'message': str(e)
}), 401
except Exception as e:
logger.error(f"Login error: {str(e)}")
SystemLog.error(
'auth.login_error',
f'Login system error: {str(e)}',
extra_data={
'username': username,
'error': str(e)
}
)
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '系統錯誤,請稍後再試'
}), 500
@auth_bp.route('/logout', methods=['POST'])
@jwt_required()
def logout():
"""使用者登出"""
try:
username = get_jwt_identity()
# 記錄登出日誌
SystemLog.info(
'auth.logout',
f'User {username} logged out'
)
logger.info(f"🚪 [JWT Logout] User: {username}")
logger.info(f"User {username} logged out")
return jsonify({
'success': True,
'message': '登出成功'
})
except Exception as e:
logger.error(f"Logout error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '登出時發生錯誤'
}), 500
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
"""取得當前使用者資訊"""
try:
from flask_jwt_extended import get_jwt
username = get_jwt_identity()
claims = get_jwt()
user_data = {
'username': username,
'user_id': claims.get('user_id'),
'is_admin': claims.get('is_admin'),
'display_name': claims.get('display_name'),
'email': claims.get('email')
}
return jsonify({
'success': True,
'data': {
'user': user_data
}
})
except Exception as e:
logger.error(f"Get current user error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '取得使用者資訊時發生錯誤'
}), 500
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh_token():
"""刷新 Session"""
try:
from flask import g
user = g.current_user
# 更新 Session 資訊
session['user_id'] = user.id
session['username'] = user.username
session['is_admin'] = user.is_admin
session.permanent = True
logger.info(f"Session refreshed for user {user.username}")
return jsonify({
'success': True,
'data': {
'user': user.to_dict(),
'session_refreshed': True
},
'message': 'Session 已刷新'
})
except Exception as e:
logger.error(f"Session refresh error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '刷新 Session 時發生錯誤'
}), 500
@auth_bp.route('/check', methods=['GET'])
def check_auth():
"""檢查認證狀態"""
try:
user_id = session.get('user_id')
if not user_id:
return jsonify({
'success': False,
'authenticated': False,
'message': '未登入'
}), 401
# 驗證使用者是否仍然存在
user = User.query.get(user_id)
if not user:
session.clear()
return jsonify({
'success': False,
'authenticated': False,
'message': '使用者不存在'
}), 401
return jsonify({
'success': True,
'authenticated': True,
'data': {
'user': user.to_dict()
}
})
except Exception as e:
logger.error(f"Auth check error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '檢查認證狀態時發生錯誤'
}), 500
@auth_bp.route('/search-users', methods=['GET'])
@login_required
def search_users():
"""搜尋使用者LDAP"""
try:
search_term = request.args.get('q', '').strip()
limit = min(int(request.args.get('limit', 20)), 50)
if len(search_term) < 2:
return jsonify({
'success': False,
'error': 'INVALID_SEARCH_TERM',
'message': '搜尋關鍵字至少需要2個字元'
}), 400
ldap_service = LDAPAuthService()
users = ldap_service.search_users(search_term, limit)
return jsonify({
'success': True,
'data': {
'users': users,
'count': len(users)
}
})
except Exception as e:
logger.error(f"User search error: {str(e)}")
return jsonify({
'success': False,
'error': 'SYSTEM_ERROR',
'message': '搜尋使用者時發生錯誤'
}), 500
# 錯誤處理器
@auth_bp.errorhandler(429)
def rate_limit_handler(e):
"""速率限制錯誤處理器"""
return jsonify({
'success': False,
'error': 'RATE_LIMIT_EXCEEDED',
'message': '請求過於頻繁,請稍後再試'
}), 429