1st_fix_login_issue

This commit is contained in:
beabigegg
2025-09-02 10:31:35 +08:00
commit a60d965317
103 changed files with 12402 additions and 0 deletions

34
app/utils/__init__.py Normal file
View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
工具模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
from .decorators import login_required, admin_required
from .validators import validate_file, validate_languages
from .helpers import generate_filename, format_file_size
from .exceptions import (
DocumentTranslatorError,
AuthenticationError,
ValidationError,
TranslationError,
FileProcessingError
)
__all__ = [
'login_required',
'admin_required',
'validate_file',
'validate_languages',
'generate_filename',
'format_file_size',
'DocumentTranslatorError',
'AuthenticationError',
'ValidationError',
'TranslationError',
'FileProcessingError'
]

216
app/utils/decorators.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
裝飾器模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
from functools import wraps
from flask import session, jsonify, g, current_app
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
def login_required(f):
"""登入驗證裝飾器"""
@wraps(f)
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
user_id = session.get('user_id')
# 調試:記錄 session 檢查
logger.info(f"🔐 [Session Check] Endpoint: {request.endpoint}, Method: {request.method}, URL: {request.url}")
logger.info(f"🔐 [Session Data] UserID: {user_id}, SessionData: {dict(session)}, SessionID: {session.get('_id', 'unknown')}")
if not user_id:
logger.warning(f"❌ [Auth Failed] No user_id in session for {request.endpoint}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '請先登入'
}), 401
# 取得使用者資訊並設定到 g 物件
from app.models import User
user = User.query.get(user_id)
if not user:
# 清除無效的 session
session.clear()
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
g.current_user = user
g.current_user_id = user.id
g.is_admin = user.is_admin
return f(*args, **kwargs)
return decorated_function
def jwt_login_required(f):
"""JWT 登入驗證裝飾器"""
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
try:
username = get_jwt_identity()
claims = get_jwt()
# 設定到 g 物件供其他地方使用
g.current_user_username = username
g.current_user_id = claims.get('user_id')
g.is_admin = claims.get('is_admin', False)
logger.info(f"🔑 [JWT Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
except Exception as e:
logger.error(f"❌ [JWT Auth] JWT validation failed: {str(e)}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '認證失效,請重新登入'
}), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""管理員權限裝飾器"""
@wraps(f)
def decorated_function(*args, **kwargs):
# 先檢查是否已登入
user_id = session.get('user_id')
if not user_id:
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '請先登入'
}), 401
# 取得使用者資訊
from app.models import User
user = User.query.get(user_id)
if not user:
session.clear()
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
# 檢查管理員權限
if not user.is_admin:
return jsonify({
'success': False,
'error': 'PERMISSION_DENIED',
'message': '權限不足,需要管理員權限'
}), 403
g.current_user = user
g.current_user_id = user.id
g.is_admin = True
return f(*args, **kwargs)
return decorated_function
def validate_json(required_fields=None):
"""JSON 驗證裝飾器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request
if not request.is_json:
return jsonify({
'success': False,
'error': 'INVALID_CONTENT_TYPE',
'message': '請求必須為 JSON 格式'
}), 400
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'INVALID_JSON',
'message': 'JSON 資料格式錯誤'
}), 400
# 檢查必要欄位
if required_fields:
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
'success': False,
'error': 'MISSING_FIELDS',
'message': f'缺少必要欄位: {", ".join(missing_fields)}'
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def rate_limit(max_requests=100, per_seconds=3600):
"""簡單的速率限制裝飾器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request
import redis
import time
try:
# 使用 Redis 進行速率限制
redis_client = redis.from_url(current_app.config['REDIS_URL'])
# 使用 IP 地址作為 key
client_id = request.remote_addr
key = f"rate_limit:{f.__name__}:{client_id}"
current_time = int(time.time())
window_start = current_time - per_seconds
# 清理過期的請求記錄
redis_client.zremrangebyscore(key, 0, window_start)
# 取得當前窗口內的請求數
current_requests = redis_client.zcard(key)
if current_requests >= max_requests:
return jsonify({
'success': False,
'error': 'RATE_LIMIT_EXCEEDED',
'message': '請求過於頻繁,請稍後再試'
}), 429
# 記錄當前請求
redis_client.zadd(key, {str(current_time): current_time})
redis_client.expire(key, per_seconds)
except Exception:
# 如果 Redis 不可用,不阻擋請求
pass
return f(*args, **kwargs)
return decorated_function
return decorator

52
app/utils/exceptions.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自定義例外模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
class DocumentTranslatorError(Exception):
"""文件翻譯系統基礎例外"""
def __init__(self, message, error_code=None):
self.message = message
self.error_code = error_code
super().__init__(self.message)
class AuthenticationError(DocumentTranslatorError):
"""認證相關例外"""
pass
class ValidationError(DocumentTranslatorError):
"""驗證相關例外"""
pass
class TranslationError(DocumentTranslatorError):
"""翻譯相關例外"""
pass
class FileProcessingError(DocumentTranslatorError):
"""檔案處理相關例外"""
pass
class APIError(DocumentTranslatorError):
"""API 相關例外"""
pass
class ConfigurationError(DocumentTranslatorError):
"""配置相關例外"""
pass
class DatabaseError(DocumentTranslatorError):
"""資料庫相關例外"""
pass

280
app/utils/helpers.py Normal file
View File

@@ -0,0 +1,280 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
輔助工具模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import os
import uuid
import shutil
from pathlib import Path
from datetime import datetime
from werkzeug.utils import secure_filename
from flask import current_app
def generate_filename(original_filename, job_uuid, file_type='original', language_code=None):
"""生成安全的檔案名稱"""
# 取得檔案副檔名
file_ext = Path(original_filename).suffix.lower()
# 清理原始檔名
clean_name = Path(original_filename).stem
clean_name = secure_filename(clean_name)[:50] # 限制長度
if file_type == 'original':
return f"original_{clean_name}_{job_uuid[:8]}{file_ext}"
elif file_type == 'translated':
return f"translated_{clean_name}_{language_code}_{job_uuid[:8]}{file_ext}"
else:
return f"{file_type}_{clean_name}_{job_uuid[:8]}{file_ext}"
def create_job_directory(job_uuid):
"""建立任務專用目錄"""
upload_folder = current_app.config.get('UPLOAD_FOLDER')
job_dir = Path(upload_folder) / job_uuid
# 建立目錄
job_dir.mkdir(parents=True, exist_ok=True)
return job_dir
def save_uploaded_file(file_obj, job_uuid):
"""儲存上傳的檔案"""
try:
# 建立任務目錄
job_dir = create_job_directory(job_uuid)
# 生成檔案名稱
filename = generate_filename(file_obj.filename, job_uuid, 'original')
file_path = job_dir / filename
# 儲存檔案
file_obj.save(str(file_path))
# 取得檔案大小
file_size = file_path.stat().st_size
return {
'success': True,
'filename': filename,
'file_path': str(file_path),
'file_size': file_size
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def cleanup_job_directory(job_uuid):
"""清理任務目錄"""
try:
upload_folder = current_app.config.get('UPLOAD_FOLDER')
job_dir = Path(upload_folder) / job_uuid
if job_dir.exists() and job_dir.is_dir():
shutil.rmtree(job_dir)
return True
return False
except Exception:
return False
def format_file_size(size_bytes):
"""格式化檔案大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def get_file_icon(file_extension):
"""根據副檔名取得檔案圖示"""
icon_map = {
'.docx': 'file-word',
'.doc': 'file-word',
'.pptx': 'file-powerpoint',
'.ppt': 'file-powerpoint',
'.xlsx': 'file-excel',
'.xls': 'file-excel',
'.pdf': 'file-pdf'
}
return icon_map.get(file_extension.lower(), 'file')
def calculate_processing_time(start_time, end_time=None):
"""計算處理時間"""
if not start_time:
return None
if not end_time:
end_time = datetime.utcnow()
if isinstance(start_time, str):
start_time = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
if isinstance(end_time, str):
end_time = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
duration = end_time - start_time
# 轉換為秒
total_seconds = int(duration.total_seconds())
if total_seconds < 60:
return f"{total_seconds}"
elif total_seconds < 3600:
minutes = total_seconds // 60
seconds = total_seconds % 60
return f"{minutes}{seconds}"
else:
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
return f"{hours}小時{minutes}"
def generate_download_token(job_uuid, language_code, user_id):
"""生成下載令牌"""
import hashlib
import time
# 組合資料
data = f"{job_uuid}:{language_code}:{user_id}:{int(time.time())}"
# 加上應用程式密鑰
secret_key = current_app.config.get('SECRET_KEY', 'default_secret')
data_with_secret = f"{data}:{secret_key}"
# 生成 hash
token = hashlib.sha256(data_with_secret.encode()).hexdigest()
return token
def verify_download_token(token, job_uuid, language_code, user_id, max_age=3600):
"""驗證下載令牌"""
import time
try:
# 取得當前時間戳
current_time = int(time.time())
# 在有效時間範圍內嘗試匹配令牌
for i in range(max_age):
timestamp = current_time - i
expected_token = generate_download_token_with_timestamp(
job_uuid, language_code, user_id, timestamp
)
if token == expected_token:
return True
return False
except Exception:
return False
def generate_download_token_with_timestamp(job_uuid, language_code, user_id, timestamp):
"""使用指定時間戳生成下載令牌"""
import hashlib
data = f"{job_uuid}:{language_code}:{user_id}:{timestamp}"
secret_key = current_app.config.get('SECRET_KEY', 'default_secret')
data_with_secret = f"{data}:{secret_key}"
return hashlib.sha256(data_with_secret.encode()).hexdigest()
def get_supported_languages():
"""取得支援的語言列表"""
return {
'auto': '自動偵測',
'zh-CN': '簡體中文',
'zh-TW': '繁體中文',
'en': '英文',
'ja': '日文',
'ko': '韓文',
'vi': '越南文',
'th': '泰文',
'id': '印尼文',
'ms': '馬來文',
'es': '西班牙文',
'fr': '法文',
'de': '德文',
'ru': '俄文'
}
def parse_json_field(json_str):
"""安全解析JSON欄位"""
import json
if not json_str:
return None
try:
if isinstance(json_str, str):
return json.loads(json_str)
return json_str
except (json.JSONDecodeError, TypeError):
return None
def format_datetime(dt, format_type='full'):
"""格式化日期時間"""
if not dt:
return None
if isinstance(dt, str):
try:
dt = datetime.fromisoformat(dt.replace('Z', '+00:00'))
except ValueError:
return dt
if format_type == 'date':
return dt.strftime('%Y-%m-%d')
elif format_type == 'time':
return dt.strftime('%H:%M:%S')
elif format_type == 'short':
return dt.strftime('%Y-%m-%d %H:%M')
else: # full
return dt.strftime('%Y-%m-%d %H:%M:%S')
def create_response(success=True, data=None, message=None, error=None, error_code=None):
"""建立統一的API回應格式"""
response = {
'success': success
}
if data is not None:
response['data'] = data
if message:
response['message'] = message
if error:
response['error'] = error_code or 'ERROR'
if not message:
response['message'] = error
return response

232
app/utils/ldap_auth.py Normal file
View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LDAP 認證服務
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import time
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
from flask import current_app
from .logger import get_logger
from .exceptions import AuthenticationError
logger = get_logger(__name__)
class LDAPAuthService:
"""LDAP 認證服務"""
def __init__(self):
self.config = current_app.config
self.server_url = self.config.get('LDAP_SERVER')
self.port = self.config.get('LDAP_PORT', 389)
self.use_ssl = self.config.get('LDAP_USE_SSL', False)
self.bind_user_dn = self.config.get('LDAP_BIND_USER_DN')
self.bind_password = self.config.get('LDAP_BIND_USER_PASSWORD')
self.search_base = self.config.get('LDAP_SEARCH_BASE')
self.login_attr = self.config.get('LDAP_USER_LOGIN_ATTR', 'userPrincipalName')
def create_connection(self, retries=3):
"""建立 LDAP 連線(帶重試機制)"""
for attempt in range(retries):
try:
server = Server(
self.server_url,
port=self.port,
use_ssl=self.use_ssl,
get_info=ALL_ATTRIBUTES
)
conn = Connection(
server,
user=self.bind_user_dn,
password=self.bind_password,
auto_bind=True,
raise_exceptions=True
)
logger.info("LDAP connection established successfully")
return conn
except Exception as e:
logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}")
if attempt == retries - 1:
raise AuthenticationError(f"LDAP connection failed: {str(e)}")
time.sleep(1)
return None
def authenticate_user(self, username, password):
"""驗證使用者憑證"""
try:
conn = self.create_connection()
if not conn:
raise AuthenticationError("Unable to connect to LDAP server")
# 搜尋使用者
search_filter = f"(&(objectClass=person)(objectCategory=person)({self.login_attr}={username}))"
conn.search(
self.search_base,
search_filter,
SUBTREE,
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department']
)
if not conn.entries:
logger.warning(f"User not found: {username}")
raise AuthenticationError("帳號不存在")
user_entry = conn.entries[0]
user_dn = user_entry.entry_dn
# 驗證使用者密碼
try:
user_conn = Connection(
conn.server,
user=user_dn,
password=password,
auto_bind=True,
raise_exceptions=True
)
user_conn.unbind()
# 返回使用者資訊
user_info = {
'username': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username,
'display_name': str(user_entry.displayName) if user_entry.displayName else username,
'email': str(user_entry.mail) if user_entry.mail else f"{username}@panjit.com.tw",
'department': str(user_entry.department) if hasattr(user_entry, 'department') and user_entry.department else None,
'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username
}
logger.info(f"User authenticated successfully: {username}")
return user_info
except Exception as e:
logger.warning(f"Authentication failed for user {username}: {str(e)}")
raise AuthenticationError("密碼錯誤")
except AuthenticationError:
raise
except Exception as e:
logger.error(f"LDAP authentication error: {str(e)}")
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
finally:
if 'conn' in locals() and conn:
conn.unbind()
def search_users(self, search_term, limit=20):
"""搜尋使用者"""
try:
conn = self.create_connection()
if not conn:
return []
# 建構搜尋過濾器
search_filter = f"""(&
(objectClass=person)
(objectCategory=person)
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
(|
(displayName=*{search_term}*)
(mail=*{search_term}*)
(sAMAccountName=*{search_term}*)
(userPrincipalName=*{search_term}*)
)
)"""
# 移除多餘空白
search_filter = ' '.join(search_filter.split())
conn.search(
self.search_base,
search_filter,
SUBTREE,
attributes=['sAMAccountName', 'displayName', 'mail', 'department'],
size_limit=limit
)
results = []
for entry in conn.entries:
results.append({
'username': str(entry.sAMAccountName) if entry.sAMAccountName else '',
'display_name': str(entry.displayName) if entry.displayName else '',
'email': str(entry.mail) if entry.mail else '',
'department': str(entry.department) if hasattr(entry, 'department') and entry.department else ''
})
logger.info(f"LDAP search found {len(results)} results for term: {search_term}")
return results
except Exception as e:
logger.error(f"LDAP search error: {str(e)}")
return []
finally:
if 'conn' in locals() and conn:
conn.unbind()
def get_user_info(self, username):
"""取得使用者詳細資訊"""
try:
conn = self.create_connection()
if not conn:
return None
# 支援 sAMAccountName 和 userPrincipalName 格式
if '@' in username:
search_filter = f"""(&
(objectClass=person)
(|
(userPrincipalName={username})
(mail={username})
)
)"""
else:
search_filter = f"(&(objectClass=person)(sAMAccountName={username}))"
# 移除多餘空白
search_filter = ' '.join(search_filter.split())
conn.search(
self.search_base,
search_filter,
SUBTREE,
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department']
)
if not conn.entries:
return None
entry = conn.entries[0]
return {
'username': str(entry.sAMAccountName) if entry.sAMAccountName else username,
'display_name': str(entry.displayName) if entry.displayName else username,
'email': str(entry.mail) if entry.mail else f"{username}@panjit.com.tw",
'department': str(entry.department) if hasattr(entry, 'department') and entry.department else None,
'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else ''
}
except Exception as e:
logger.error(f"Error getting user info for {username}: {str(e)}")
return None
finally:
if 'conn' in locals() and conn:
conn.unbind()
def test_connection(self):
"""測試 LDAP 連線(健康檢查用)"""
try:
conn = self.create_connection(retries=1)
if conn:
conn.unbind()
return True
return False
except Exception as e:
logger.error(f"LDAP connection test failed: {str(e)}")
return False

126
app/utils/logger.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
日誌管理模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import logging
import os
from pathlib import Path
from logging.handlers import RotatingFileHandler
from flask import current_app, has_request_context, request, g
def get_logger(name):
"""取得指定名稱的日誌器"""
logger = logging.getLogger(name)
# 避免重複設定 handler
if not logger.handlers:
setup_logger(logger)
return logger
def setup_logger(logger):
"""設定日誌器"""
if has_request_context() and current_app:
log_level = current_app.config.get('LOG_LEVEL', 'INFO')
log_file = current_app.config.get('LOG_FILE', 'logs/app.log')
else:
log_level = os.environ.get('LOG_LEVEL', 'INFO')
log_file = os.environ.get('LOG_FILE', 'logs/app.log')
# 確保日誌目錄存在
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
# 設定日誌等級
logger.setLevel(getattr(logging, log_level.upper()))
# 建立格式化器
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 檔案處理器(使用輪轉)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(getattr(logging, log_level.upper()))
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 控制台處理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class DatabaseLogHandler(logging.Handler):
"""資料庫日誌處理器"""
def emit(self, record):
"""發送日誌記錄到資料庫"""
try:
from app.models.log import SystemLog
# 取得使用者和任務資訊(如果有的話)
user_id = None
job_id = None
extra_data = {}
if has_request_context():
user_id = g.get('current_user_id')
extra_data.update({
'method': request.method,
'endpoint': request.endpoint,
'url': request.url,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent')
})
# 儲存到資料庫
SystemLog.log(
level=record.levelname,
module=record.name,
message=record.getMessage(),
user_id=user_id,
job_id=job_id,
extra_data=extra_data if extra_data else None
)
except Exception:
# 避免日誌記錄失敗影響主程序
pass
def init_logging(app):
"""初始化應用程式日誌"""
# 設定根日誌器
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 添加資料庫日誌處理器(僅對重要日誌)
if app.config.get('SQLALCHEMY_DATABASE_URI'):
db_handler = DatabaseLogHandler()
db_handler.setLevel(logging.WARNING) # 只記錄警告以上等級到資料庫
root_logger.addHandler(db_handler)
# 設定 Flask 應用日誌
if not app.logger.handlers:
setup_logger(app.logger)
# 設定第三方庫日誌等級
logging.getLogger('werkzeug').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)

203
app/utils/validators.py Normal file
View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
驗證工具模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import os
from pathlib import Path
from flask import current_app
from .exceptions import ValidationError
def validate_file(file_obj):
"""驗證上傳的檔案"""
if not file_obj:
raise ValidationError("未選擇檔案", "NO_FILE")
if not file_obj.filename:
raise ValidationError("檔案名稱為空", "NO_FILENAME")
# 檢查檔案副檔名
file_ext = Path(file_obj.filename).suffix.lower()
allowed_extensions = current_app.config.get('ALLOWED_EXTENSIONS', {'.docx', '.doc', '.pptx', '.xlsx', '.xls', '.pdf'})
if file_ext not in allowed_extensions:
raise ValidationError(
f"不支援的檔案類型: {file_ext},支援的格式: {', '.join(allowed_extensions)}",
"INVALID_FILE_TYPE"
)
# 檢查檔案大小
max_size = current_app.config.get('MAX_CONTENT_LENGTH', 26214400) # 25MB
# 取得檔案大小
file_obj.seek(0, os.SEEK_END)
file_size = file_obj.tell()
file_obj.seek(0)
if file_size > max_size:
raise ValidationError(
f"檔案大小超過限制 ({format_file_size(max_size)})",
"FILE_TOO_LARGE"
)
if file_size == 0:
raise ValidationError("檔案為空", "EMPTY_FILE")
return {
'filename': file_obj.filename,
'file_extension': file_ext,
'file_size': file_size,
'valid': True
}
def validate_languages(source_language, target_languages):
"""驗證語言設定"""
# 支援的語言列表
supported_languages = {
'auto': '自動偵測',
'zh-CN': '簡體中文',
'zh-TW': '繁體中文',
'en': '英文',
'ja': '日文',
'ko': '韓文',
'vi': '越南文',
'th': '泰文',
'id': '印尼文',
'ms': '馬來文',
'es': '西班牙文',
'fr': '法文',
'de': '德文',
'ru': '俄文'
}
# 驗證來源語言
if source_language and source_language not in supported_languages:
raise ValidationError(
f"不支援的來源語言: {source_language}",
"INVALID_SOURCE_LANGUAGE"
)
# 驗證目標語言
if not target_languages or not isinstance(target_languages, list):
raise ValidationError("必須指定至少一個目標語言", "NO_TARGET_LANGUAGES")
if len(target_languages) == 0:
raise ValidationError("必須指定至少一個目標語言", "NO_TARGET_LANGUAGES")
if len(target_languages) > 10: # 限制最多10個目標語言
raise ValidationError("目標語言數量過多最多支援10個", "TOO_MANY_TARGET_LANGUAGES")
invalid_languages = [lang for lang in target_languages if lang not in supported_languages]
if invalid_languages:
raise ValidationError(
f"不支援的目標語言: {', '.join(invalid_languages)}",
"INVALID_TARGET_LANGUAGE"
)
# 檢查來源語言和目標語言是否有重疊
if source_language and source_language != 'auto' and source_language in target_languages:
raise ValidationError(
"目標語言不能包含來源語言",
"SOURCE_TARGET_OVERLAP"
)
return {
'source_language': source_language or 'auto',
'target_languages': target_languages,
'supported_languages': supported_languages,
'valid': True
}
def validate_job_uuid(job_uuid):
"""驗證任務UUID格式"""
import uuid
if not job_uuid:
raise ValidationError("任務UUID不能為空", "INVALID_UUID")
try:
uuid.UUID(job_uuid)
return True
except ValueError:
raise ValidationError("任務UUID格式錯誤", "INVALID_UUID")
def validate_pagination(page, per_page):
"""驗證分頁參數"""
try:
page = int(page) if page else 1
per_page = int(per_page) if per_page else 20
except (ValueError, TypeError):
raise ValidationError("分頁參數必須為數字", "INVALID_PAGINATION")
if page < 1:
raise ValidationError("頁數必須大於0", "INVALID_PAGE")
if per_page < 1 or per_page > 100:
raise ValidationError("每頁項目數必須在1-100之間", "INVALID_PER_PAGE")
return page, per_page
def format_file_size(size_bytes):
"""格式化檔案大小顯示"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def sanitize_filename(filename):
"""清理檔案名稱,移除不安全字元"""
import re
# 保留檔案名稱和副檔名
name = Path(filename).stem
ext = Path(filename).suffix
# 移除或替換不安全字元
safe_name = re.sub(r'[^\w\s.-]', '_', name)
safe_name = re.sub(r'\s+', '_', safe_name) # 空白替換為底線
safe_name = safe_name.strip('._') # 移除開頭結尾的點和底線
# 限制長度
if len(safe_name) > 100:
safe_name = safe_name[:100]
return f"{safe_name}{ext}"
def validate_date_range(start_date, end_date):
"""驗證日期範圍"""
from datetime import datetime
if start_date:
try:
start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except ValueError:
raise ValidationError("開始日期格式錯誤", "INVALID_START_DATE")
if end_date:
try:
end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
raise ValidationError("結束日期格式錯誤", "INVALID_END_DATE")
if start_date and end_date and start_date > end_date:
raise ValidationError("開始日期不能晚於結束日期", "INVALID_DATE_RANGE")
return start_date, end_date