238 lines
8.2 KiB
Python
238 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
裝飾器模組
|
||
|
||
Author: PANJIT IT Team
|
||
Created: 2024-01-28
|
||
Modified: 2024-01-28
|
||
"""
|
||
|
||
from functools import wraps
|
||
from flask import session, jsonify, g, current_app
|
||
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
|
||
|
||
|
||
def login_required(f):
|
||
"""登入驗證裝飾器"""
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
from app.utils.logger import get_logger
|
||
from flask import request
|
||
logger = get_logger(__name__)
|
||
|
||
user_id = session.get('user_id')
|
||
|
||
# 調試:記錄 session 檢查
|
||
logger.info(f"🔐 [Session Check] Endpoint: {request.endpoint}, Method: {request.method}, URL: {request.url}")
|
||
logger.info(f"🔐 [Session Data] UserID: {user_id}, SessionData: {dict(session)}, SessionID: {session.get('_id', 'unknown')}")
|
||
|
||
if not user_id:
|
||
logger.warning(f"❌ [Auth Failed] No user_id in session for {request.endpoint}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'AUTHENTICATION_REQUIRED',
|
||
'message': '請先登入'
|
||
}), 401
|
||
|
||
# 取得使用者資訊並設定到 g 物件
|
||
from app.models import User
|
||
user = User.query.get(user_id)
|
||
if not user:
|
||
# 清除無效的 session
|
||
session.clear()
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'USER_NOT_FOUND',
|
||
'message': '使用者不存在'
|
||
}), 401
|
||
|
||
g.current_user = user
|
||
g.current_user_id = user.id
|
||
g.is_admin = user.is_admin
|
||
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated_function
|
||
|
||
|
||
def jwt_login_required(f):
|
||
"""JWT 登入驗證裝飾器"""
|
||
@wraps(f)
|
||
@jwt_required()
|
||
def decorated_function(*args, **kwargs):
|
||
from app.utils.logger import get_logger
|
||
from flask import request
|
||
logger = get_logger(__name__)
|
||
|
||
try:
|
||
username = get_jwt_identity()
|
||
claims = get_jwt()
|
||
|
||
# 設定到 g 物件供其他地方使用
|
||
g.current_user_username = username
|
||
g.current_user_id = claims.get('user_id')
|
||
g.is_admin = claims.get('is_admin', False)
|
||
|
||
logger.info(f"🔑 [JWT Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [JWT Auth] JWT validation failed: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'AUTHENTICATION_REQUIRED',
|
||
'message': '認證失效,請重新登入'
|
||
}), 401
|
||
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated_function
|
||
|
||
|
||
def admin_required(f):
|
||
"""管理員權限裝飾器(使用JWT認證)"""
|
||
@wraps(f)
|
||
@jwt_required()
|
||
def decorated_function(*args, **kwargs):
|
||
from app.utils.logger import get_logger
|
||
from flask import request
|
||
logger = get_logger(__name__)
|
||
|
||
try:
|
||
username = get_jwt_identity()
|
||
claims = get_jwt()
|
||
|
||
# 設定到 g 物件供其他地方使用
|
||
g.current_user_username = username
|
||
g.current_user_id = claims.get('user_id')
|
||
g.is_admin = claims.get('is_admin', False)
|
||
|
||
logger.info(f"🔑 [JWT Admin Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
|
||
|
||
# 檢查管理員權限
|
||
if not claims.get('is_admin', False):
|
||
logger.warning(f"❌ [Admin Auth] Permission denied for user: {username}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'PERMISSION_DENIED',
|
||
'message': '權限不足,需要管理員權限'
|
||
}), 403
|
||
|
||
# 驗證用戶是否存在且仍為管理員
|
||
from app.models import User
|
||
user = User.query.get(claims.get('user_id'))
|
||
if not user:
|
||
logger.error(f"❌ [Admin Auth] User not found: {claims.get('user_id')}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'USER_NOT_FOUND',
|
||
'message': '使用者不存在'
|
||
}), 401
|
||
|
||
if not user.is_admin:
|
||
logger.warning(f"❌ [Admin Auth] User no longer admin: {username}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'PERMISSION_DENIED',
|
||
'message': '權限不足,需要管理員權限'
|
||
}), 403
|
||
|
||
# 設定完整用戶資訊
|
||
g.current_user = user
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [Admin Auth] JWT validation failed: {str(e)}")
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'AUTHENTICATION_REQUIRED',
|
||
'message': '認證失效,請重新登入'
|
||
}), 401
|
||
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated_function
|
||
|
||
|
||
def validate_json(required_fields=None):
|
||
"""JSON 驗證裝飾器"""
|
||
def decorator(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
from flask import request
|
||
|
||
if not request.is_json:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'INVALID_CONTENT_TYPE',
|
||
'message': '請求必須為 JSON 格式'
|
||
}), 400
|
||
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'INVALID_JSON',
|
||
'message': 'JSON 資料格式錯誤'
|
||
}), 400
|
||
|
||
# 檢查必要欄位
|
||
if required_fields:
|
||
missing_fields = [field for field in required_fields if field not in data]
|
||
if missing_fields:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'MISSING_FIELDS',
|
||
'message': f'缺少必要欄位: {", ".join(missing_fields)}'
|
||
}), 400
|
||
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated_function
|
||
return decorator
|
||
|
||
|
||
def rate_limit(max_requests=100, per_seconds=3600):
|
||
"""簡單的速率限制裝飾器"""
|
||
def decorator(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
from flask import request
|
||
import redis
|
||
import time
|
||
|
||
try:
|
||
# 使用 Redis 進行速率限制
|
||
redis_client = redis.from_url(current_app.config['REDIS_URL'])
|
||
|
||
# 使用 IP 地址作為 key
|
||
client_id = request.remote_addr
|
||
key = f"rate_limit:{f.__name__}:{client_id}"
|
||
|
||
current_time = int(time.time())
|
||
window_start = current_time - per_seconds
|
||
|
||
# 清理過期的請求記錄
|
||
redis_client.zremrangebyscore(key, 0, window_start)
|
||
|
||
# 取得當前窗口內的請求數
|
||
current_requests = redis_client.zcard(key)
|
||
|
||
if current_requests >= max_requests:
|
||
return jsonify({
|
||
'success': False,
|
||
'error': 'RATE_LIMIT_EXCEEDED',
|
||
'message': '請求過於頻繁,請稍後再試'
|
||
}), 429
|
||
|
||
# 記錄當前請求
|
||
redis_client.zadd(key, {str(current_time): current_time})
|
||
redis_client.expire(key, per_seconds)
|
||
|
||
except Exception:
|
||
# 如果 Redis 不可用,不阻擋請求
|
||
pass
|
||
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated_function
|
||
return decorator |