Files
Document_Translator/app/utils/decorators.py
2025-09-02 16:47:16 +08:00

238 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
裝飾器模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
from functools import wraps
from flask import session, jsonify, g, current_app
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
def login_required(f):
"""登入驗證裝飾器"""
@wraps(f)
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
user_id = session.get('user_id')
# 調試:記錄 session 檢查
logger.info(f"🔐 [Session Check] Endpoint: {request.endpoint}, Method: {request.method}, URL: {request.url}")
logger.info(f"🔐 [Session Data] UserID: {user_id}, SessionData: {dict(session)}, SessionID: {session.get('_id', 'unknown')}")
if not user_id:
logger.warning(f"❌ [Auth Failed] No user_id in session for {request.endpoint}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '請先登入'
}), 401
# 取得使用者資訊並設定到 g 物件
from app.models import User
user = User.query.get(user_id)
if not user:
# 清除無效的 session
session.clear()
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
g.current_user = user
g.current_user_id = user.id
g.is_admin = user.is_admin
return f(*args, **kwargs)
return decorated_function
def jwt_login_required(f):
"""JWT 登入驗證裝飾器"""
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
try:
username = get_jwt_identity()
claims = get_jwt()
# 設定到 g 物件供其他地方使用
g.current_user_username = username
g.current_user_id = claims.get('user_id')
g.is_admin = claims.get('is_admin', False)
logger.info(f"🔑 [JWT Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
except Exception as e:
logger.error(f"❌ [JWT Auth] JWT validation failed: {str(e)}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '認證失效,請重新登入'
}), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""管理員權限裝飾器使用JWT認證"""
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
try:
username = get_jwt_identity()
claims = get_jwt()
# 設定到 g 物件供其他地方使用
g.current_user_username = username
g.current_user_id = claims.get('user_id')
g.is_admin = claims.get('is_admin', False)
logger.info(f"🔑 [JWT Admin Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
# 檢查管理員權限
if not claims.get('is_admin', False):
logger.warning(f"❌ [Admin Auth] Permission denied for user: {username}")
return jsonify({
'success': False,
'error': 'PERMISSION_DENIED',
'message': '權限不足,需要管理員權限'
}), 403
# 驗證用戶是否存在且仍為管理員
from app.models import User
user = User.query.get(claims.get('user_id'))
if not user:
logger.error(f"❌ [Admin Auth] User not found: {claims.get('user_id')}")
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
if not user.is_admin:
logger.warning(f"❌ [Admin Auth] User no longer admin: {username}")
return jsonify({
'success': False,
'error': 'PERMISSION_DENIED',
'message': '權限不足,需要管理員權限'
}), 403
# 設定完整用戶資訊
g.current_user = user
except Exception as e:
logger.error(f"❌ [Admin Auth] JWT validation failed: {str(e)}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '認證失效,請重新登入'
}), 401
return f(*args, **kwargs)
return decorated_function
def validate_json(required_fields=None):
"""JSON 驗證裝飾器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request
if not request.is_json:
return jsonify({
'success': False,
'error': 'INVALID_CONTENT_TYPE',
'message': '請求必須為 JSON 格式'
}), 400
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'INVALID_JSON',
'message': 'JSON 資料格式錯誤'
}), 400
# 檢查必要欄位
if required_fields:
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
'success': False,
'error': 'MISSING_FIELDS',
'message': f'缺少必要欄位: {", ".join(missing_fields)}'
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def rate_limit(max_requests=100, per_seconds=3600):
"""簡單的速率限制裝飾器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request
import redis
import time
try:
# 使用 Redis 進行速率限制
redis_client = redis.from_url(current_app.config['REDIS_URL'])
# 使用 IP 地址作為 key
client_id = request.remote_addr
key = f"rate_limit:{f.__name__}:{client_id}"
current_time = int(time.time())
window_start = current_time - per_seconds
# 清理過期的請求記錄
redis_client.zremrangebyscore(key, 0, window_start)
# 取得當前窗口內的請求數
current_requests = redis_client.zcard(key)
if current_requests >= max_requests:
return jsonify({
'success': False,
'error': 'RATE_LIMIT_EXCEEDED',
'message': '請求過於頻繁,請稍後再試'
}), 429
# 記錄當前請求
redis_client.zadd(key, {str(current_time): current_time})
redis_client.expire(key, per_seconds)
except Exception:
# 如果 Redis 不可用,不阻擋請求
pass
return f(*args, **kwargs)
return decorated_function
return decorator