NO docker

This commit is contained in:
beabigegg
2025-10-02 18:50:53 +08:00
commit 4cace93934
99 changed files with 26967 additions and 0 deletions

34
app/utils/__init__.py Normal file
View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
工具模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
from .decorators import login_required, admin_required
from .validators import validate_file, validate_languages
from .helpers import generate_filename, format_file_size
from .exceptions import (
DocumentTranslatorError,
AuthenticationError,
ValidationError,
TranslationError,
FileProcessingError
)
__all__ = [
'login_required',
'admin_required',
'validate_file',
'validate_languages',
'generate_filename',
'format_file_size',
'DocumentTranslatorError',
'AuthenticationError',
'ValidationError',
'TranslationError',
'FileProcessingError'
]

277
app/utils/api_auth.py Normal file
View File

@@ -0,0 +1,277 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 認證服務
用於與 PANJIT Auth API 整合認證
Author: PANJIT IT Team
Created: 2025-10-01
"""
import requests
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
from flask import current_app
from .logger import get_logger
from .exceptions import AuthenticationError
logger = get_logger(__name__)
class APIAuthService:
"""API 認證服務"""
def __init__(self):
self.config = current_app.config
self.api_base_url = "https://pj-auth-api.vercel.app"
self.login_endpoint = "/api/auth/login"
self.logout_endpoint = "/api/auth/logout"
self.timeout = 30 # 30 秒超時
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
"""
透過 API 驗證使用者憑證
Args:
username: 使用者帳號
password: 密碼
Returns:
Dict: 包含使用者資訊和 Token 的字典
Raises:
AuthenticationError: 認證失敗時拋出
"""
try:
login_url = f"{self.api_base_url}{self.login_endpoint}"
payload = {
"username": username,
"password": password
}
headers = {
"Content-Type": "application/json"
}
logger.info(f"正在透過 API 驗證使用者: {username}")
# 發送認證請求
response = requests.post(
login_url,
json=payload,
headers=headers,
timeout=self.timeout
)
# 解析回應
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info(f"API 認證成功: {username}")
return self._parse_auth_response(data)
else:
error_msg = data.get('error', '認證失敗')
logger.warning(f"API 認證失敗: {username} - {error_msg}")
raise AuthenticationError(f"認證失敗: {error_msg}")
elif response.status_code == 401:
data = response.json()
error_msg = data.get('error', '帳號或密碼錯誤')
logger.warning(f"API 認證失敗 (401): {username} - {error_msg}")
raise AuthenticationError("帳號或密碼錯誤")
else:
logger.error(f"API 認證請求失敗: HTTP {response.status_code}")
raise AuthenticationError(f"認證服務錯誤 (HTTP {response.status_code})")
except requests.exceptions.Timeout:
logger.error(f"API 認證請求超時: {username}")
raise AuthenticationError("認證服務回應超時,請稍後再試")
except requests.exceptions.ConnectionError:
logger.error(f"API 認證連線錯誤: {username}")
raise AuthenticationError("無法連接認證服務,請檢查網路連線")
except requests.exceptions.RequestException as e:
logger.error(f"API 認證請求錯誤: {username} - {str(e)}")
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
except json.JSONDecodeError:
logger.error(f"API 認證回應格式錯誤: {username}")
raise AuthenticationError("認證服務回應格式錯誤")
except Exception as e:
logger.error(f"API 認證未知錯誤: {username} - {str(e)}")
raise AuthenticationError(f"認證過程發生錯誤: {str(e)}")
def _parse_auth_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 API 認證回應
Args:
data: API 回應資料
Returns:
Dict: 標準化的使用者資訊
"""
try:
auth_data = data.get('data', {})
user_info = auth_data.get('userInfo', {})
# 解析 Token 過期時間
expires_at = None
issued_at = None
if 'expiresAt' in auth_data:
try:
expires_at = datetime.fromisoformat(auth_data['expiresAt'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
logger.warning("無法解析 API Token 過期時間")
if 'issuedAt' in auth_data:
try:
issued_at = datetime.fromisoformat(auth_data['issuedAt'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
logger.warning("無法解析 API Token 發行時間")
# 標準化使用者資訊 (方案 A: API name 是姓名+email 格式)
api_name = user_info.get('name', '') # 例: "劉怡明 ymirliu@panjit.com.tw"
api_email = user_info.get('email', '') # 例: "ymirliu@panjit.com.tw"
result = {
# 基本使用者資訊 (方案 A: username 和 display_name 都用 API name)
'username': api_name, # 姓名+email 格式
'display_name': api_name, # 姓名+email 格式
'email': api_email, # 純 email
'department': user_info.get('jobTitle'), # 使用 jobTitle 作為部門
'user_principal_name': api_email,
# API 特有資訊
'api_user_id': user_info.get('id', ''), # Azure Object ID
'job_title': user_info.get('jobTitle'),
'office_location': user_info.get('officeLocation'),
'business_phones': user_info.get('businessPhones', []),
# Token 資訊
'api_access_token': auth_data.get('access_token', ''),
'api_id_token': auth_data.get('id_token', ''),
'api_token_type': auth_data.get('token_type', 'Bearer'),
'api_expires_in': auth_data.get('expires_in', 0),
'api_issued_at': issued_at,
'api_expires_at': expires_at,
# 完整的 API 回應 (用於記錄)
'full_api_response': data,
'api_user_info': user_info
}
return result
except Exception as e:
logger.error(f"解析 API 回應時發生錯誤: {str(e)}")
raise AuthenticationError(f"解析認證回應時發生錯誤: {str(e)}")
def logout_user(self, access_token: str) -> bool:
"""
透過 API 登出使用者
Args:
access_token: 使用者的 access token
Returns:
bool: 登出是否成功
"""
try:
logout_url = f"{self.api_base_url}{self.logout_endpoint}"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.post(
logout_url,
headers=headers,
timeout=self.timeout
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info("API 登出成功")
return True
logger.warning(f"API 登出失敗: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"API 登出時發生錯誤: {str(e)}")
return False
def validate_token(self, access_token: str) -> bool:
"""
驗證 Token 是否有效
Args:
access_token: 要驗證的 token
Returns:
bool: Token 是否有效
"""
try:
# 這裡可以實作 Token 驗證邏輯
# 目前 API 沒有提供專門的驗證端點,可以考慮解析 JWT 或調用其他端點
# 簡單的檢查Token 不能為空且格式看起來像 JWT
if not access_token or len(access_token.split('.')) != 3:
return False
# TODO: 實作更完整的 JWT 驗證邏輯
# 可以解析 JWT payload 檢查過期時間等
return True
except Exception as e:
logger.error(f"驗證 Token 時發生錯誤: {str(e)}")
return False
def test_connection(self) -> bool:
"""
測試 API 連線
Returns:
bool: 連線是否正常
"""
try:
# 嘗試連接 API 基礎端點
response = requests.get(
self.api_base_url,
timeout=10
)
return response.status_code in [200, 404] # 404 也算正常,表示能連接到伺服器
except Exception as e:
logger.error(f"API 連線測試失敗: {str(e)}")
return False
def calculate_internal_expiry(self, api_expires_at: Optional[datetime], extend_days: int = 3) -> datetime:
"""
計算內部 Token 過期時間
Args:
api_expires_at: API Token 過期時間
extend_days: 延長天數
Returns:
datetime: 內部 Token 過期時間
"""
if api_expires_at:
# 基於 API Token 過期時間延長
return api_expires_at + timedelta(days=extend_days)
else:
# 如果沒有 API 過期時間,從現在開始計算
return datetime.utcnow() + timedelta(days=extend_days)

238
app/utils/decorators.py Normal file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
裝飾器模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
from functools import wraps
from flask import session, jsonify, g, current_app
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
def login_required(f):
"""登入驗證裝飾器"""
@wraps(f)
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
user_id = session.get('user_id')
# 調試:記錄 session 檢查
logger.info(f"🔐 [Session Check] Endpoint: {request.endpoint}, Method: {request.method}, URL: {request.url}")
logger.info(f"🔐 [Session Data] UserID: {user_id}, SessionData: {dict(session)}, SessionID: {session.get('_id', 'unknown')}")
if not user_id:
logger.warning(f"❌ [Auth Failed] No user_id in session for {request.endpoint}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '請先登入'
}), 401
# 取得使用者資訊並設定到 g 物件
from app.models import User
user = User.query.get(user_id)
if not user:
# 清除無效的 session
session.clear()
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
g.current_user = user
g.current_user_id = user.id
g.is_admin = user.is_admin
return f(*args, **kwargs)
return decorated_function
def jwt_login_required(f):
"""JWT 登入驗證裝飾器"""
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
try:
username = get_jwt_identity()
claims = get_jwt()
# 設定到 g 物件供其他地方使用
g.current_user_username = username
g.current_user_id = claims.get('user_id')
g.is_admin = claims.get('is_admin', False)
logger.info(f"🔑 [JWT Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
except Exception as e:
logger.error(f"❌ [JWT Auth] JWT validation failed: {str(e)}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '認證失效,請重新登入'
}), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""管理員權限裝飾器使用JWT認證"""
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
from app.utils.logger import get_logger
from flask import request
logger = get_logger(__name__)
try:
username = get_jwt_identity()
claims = get_jwt()
# 設定到 g 物件供其他地方使用
g.current_user_username = username
g.current_user_id = claims.get('user_id')
g.is_admin = claims.get('is_admin', False)
logger.info(f"🔑 [JWT Admin Auth] User: {username}, UserID: {claims.get('user_id')}, Admin: {claims.get('is_admin')}")
# 檢查管理員權限
if not claims.get('is_admin', False):
logger.warning(f"❌ [Admin Auth] Permission denied for user: {username}")
return jsonify({
'success': False,
'error': 'PERMISSION_DENIED',
'message': '權限不足,需要管理員權限'
}), 403
# 驗證用戶是否存在且仍為管理員
from app.models import User
user = User.query.get(claims.get('user_id'))
if not user:
logger.error(f"❌ [Admin Auth] User not found: {claims.get('user_id')}")
return jsonify({
'success': False,
'error': 'USER_NOT_FOUND',
'message': '使用者不存在'
}), 401
if not user.is_admin:
logger.warning(f"❌ [Admin Auth] User no longer admin: {username}")
return jsonify({
'success': False,
'error': 'PERMISSION_DENIED',
'message': '權限不足,需要管理員權限'
}), 403
# 設定完整用戶資訊
g.current_user = user
except Exception as e:
logger.error(f"❌ [Admin Auth] JWT validation failed: {str(e)}")
return jsonify({
'success': False,
'error': 'AUTHENTICATION_REQUIRED',
'message': '認證失效,請重新登入'
}), 401
return f(*args, **kwargs)
return decorated_function
def validate_json(required_fields=None):
"""JSON 驗證裝飾器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request
if not request.is_json:
return jsonify({
'success': False,
'error': 'INVALID_CONTENT_TYPE',
'message': '請求必須為 JSON 格式'
}), 400
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'INVALID_JSON',
'message': 'JSON 資料格式錯誤'
}), 400
# 檢查必要欄位
if required_fields:
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return jsonify({
'success': False,
'error': 'MISSING_FIELDS',
'message': f'缺少必要欄位: {", ".join(missing_fields)}'
}), 400
return f(*args, **kwargs)
return decorated_function
return decorator
def rate_limit(max_requests=100, per_seconds=3600):
"""簡單的速率限制裝飾器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
from flask import request
import redis
import time
try:
# 使用 Redis 進行速率限制
redis_client = redis.from_url(current_app.config['REDIS_URL'])
# 使用 IP 地址作為 key
client_id = request.remote_addr
key = f"rate_limit:{f.__name__}:{client_id}"
current_time = int(time.time())
window_start = current_time - per_seconds
# 清理過期的請求記錄
redis_client.zremrangebyscore(key, 0, window_start)
# 取得當前窗口內的請求數
current_requests = redis_client.zcard(key)
if current_requests >= max_requests:
return jsonify({
'success': False,
'error': 'RATE_LIMIT_EXCEEDED',
'message': '請求過於頻繁,請稍後再試'
}), 429
# 記錄當前請求
redis_client.zadd(key, {str(current_time): current_time})
redis_client.expire(key, per_seconds)
except Exception:
# 如果 Redis 不可用,不阻擋請求
pass
return f(*args, **kwargs)
return decorated_function
return decorator

52
app/utils/exceptions.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
自定義例外模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
class DocumentTranslatorError(Exception):
"""文件翻譯系統基礎例外"""
def __init__(self, message, error_code=None):
self.message = message
self.error_code = error_code
super().__init__(self.message)
class AuthenticationError(DocumentTranslatorError):
"""認證相關例外"""
pass
class ValidationError(DocumentTranslatorError):
"""驗證相關例外"""
pass
class TranslationError(DocumentTranslatorError):
"""翻譯相關例外"""
pass
class FileProcessingError(DocumentTranslatorError):
"""檔案處理相關例外"""
pass
class APIError(DocumentTranslatorError):
"""API 相關例外"""
pass
class ConfigurationError(DocumentTranslatorError):
"""配置相關例外"""
pass
class DatabaseError(DocumentTranslatorError):
"""資料庫相關例外"""
pass

280
app/utils/helpers.py Normal file
View File

@@ -0,0 +1,280 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
輔助工具模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import os
import uuid
import shutil
from pathlib import Path
from datetime import datetime
from werkzeug.utils import secure_filename
from flask import current_app
def generate_filename(original_filename, job_uuid, file_type='original', language_code=None):
"""生成安全的檔案名稱"""
# 取得檔案副檔名
file_ext = Path(original_filename).suffix.lower()
# 清理原始檔名
clean_name = Path(original_filename).stem
clean_name = secure_filename(clean_name)[:50] # 限制長度
if file_type == 'original':
return f"original_{clean_name}_{job_uuid[:8]}{file_ext}"
elif file_type == 'translated':
return f"translated_{clean_name}_{language_code}_{job_uuid[:8]}{file_ext}"
else:
return f"{file_type}_{clean_name}_{job_uuid[:8]}{file_ext}"
def create_job_directory(job_uuid):
"""建立任務專用目錄"""
upload_folder = current_app.config.get('UPLOAD_FOLDER')
job_dir = Path(upload_folder) / job_uuid
# 建立目錄
job_dir.mkdir(parents=True, exist_ok=True)
return job_dir
def save_uploaded_file(file_obj, job_uuid):
"""儲存上傳的檔案"""
try:
# 建立任務目錄
job_dir = create_job_directory(job_uuid)
# 生成檔案名稱
filename = generate_filename(file_obj.filename, job_uuid, 'original')
file_path = job_dir / filename
# 儲存檔案
file_obj.save(str(file_path))
# 取得檔案大小
file_size = file_path.stat().st_size
return {
'success': True,
'filename': filename,
'file_path': str(file_path),
'file_size': file_size
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def cleanup_job_directory(job_uuid):
"""清理任務目錄"""
try:
upload_folder = current_app.config.get('UPLOAD_FOLDER')
job_dir = Path(upload_folder) / job_uuid
if job_dir.exists() and job_dir.is_dir():
shutil.rmtree(job_dir)
return True
return False
except Exception:
return False
def format_file_size(size_bytes):
"""格式化檔案大小"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def get_file_icon(file_extension):
"""根據副檔名取得檔案圖示"""
icon_map = {
'.docx': 'file-word',
'.doc': 'file-word',
'.pptx': 'file-powerpoint',
'.ppt': 'file-powerpoint',
'.xlsx': 'file-excel',
'.xls': 'file-excel',
'.pdf': 'file-pdf'
}
return icon_map.get(file_extension.lower(), 'file')
def calculate_processing_time(start_time, end_time=None):
"""計算處理時間"""
if not start_time:
return None
if not end_time:
end_time = datetime.utcnow()
if isinstance(start_time, str):
start_time = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
if isinstance(end_time, str):
end_time = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
duration = end_time - start_time
# 轉換為秒
total_seconds = int(duration.total_seconds())
if total_seconds < 60:
return f"{total_seconds}"
elif total_seconds < 3600:
minutes = total_seconds // 60
seconds = total_seconds % 60
return f"{minutes}{seconds}"
else:
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
return f"{hours}小時{minutes}"
def generate_download_token(job_uuid, language_code, user_id):
"""生成下載令牌"""
import hashlib
import time
# 組合資料
data = f"{job_uuid}:{language_code}:{user_id}:{int(time.time())}"
# 加上應用程式密鑰
secret_key = current_app.config.get('SECRET_KEY', 'default_secret')
data_with_secret = f"{data}:{secret_key}"
# 生成 hash
token = hashlib.sha256(data_with_secret.encode()).hexdigest()
return token
def verify_download_token(token, job_uuid, language_code, user_id, max_age=3600):
"""驗證下載令牌"""
import time
try:
# 取得當前時間戳
current_time = int(time.time())
# 在有效時間範圍內嘗試匹配令牌
for i in range(max_age):
timestamp = current_time - i
expected_token = generate_download_token_with_timestamp(
job_uuid, language_code, user_id, timestamp
)
if token == expected_token:
return True
return False
except Exception:
return False
def generate_download_token_with_timestamp(job_uuid, language_code, user_id, timestamp):
"""使用指定時間戳生成下載令牌"""
import hashlib
data = f"{job_uuid}:{language_code}:{user_id}:{timestamp}"
secret_key = current_app.config.get('SECRET_KEY', 'default_secret')
data_with_secret = f"{data}:{secret_key}"
return hashlib.sha256(data_with_secret.encode()).hexdigest()
def get_supported_languages():
"""取得支援的語言列表"""
return {
'auto': '自動偵測',
'zh-CN': '簡體中文',
'zh-TW': '繁體中文',
'en': '英文',
'ja': '日文',
'ko': '韓文',
'vi': '越南文',
'th': '泰文',
'id': '印尼文',
'ms': '馬來文',
'es': '西班牙文',
'fr': '法文',
'de': '德文',
'ru': '俄文'
}
def parse_json_field(json_str):
"""安全解析JSON欄位"""
import json
if not json_str:
return None
try:
if isinstance(json_str, str):
return json.loads(json_str)
return json_str
except (json.JSONDecodeError, TypeError):
return None
def format_datetime(dt, format_type='full'):
"""格式化日期時間"""
if not dt:
return None
if isinstance(dt, str):
try:
dt = datetime.fromisoformat(dt.replace('Z', '+00:00'))
except ValueError:
return dt
if format_type == 'date':
return dt.strftime('%Y-%m-%d')
elif format_type == 'time':
return dt.strftime('%H:%M:%S')
elif format_type == 'short':
return dt.strftime('%Y-%m-%d %H:%M')
else: # full
return dt.strftime('%Y-%m-%d %H:%M:%S')
def create_response(success=True, data=None, message=None, error=None, error_code=None):
"""建立統一的API回應格式"""
response = {
'success': success
}
if data is not None:
response['data'] = data
if message:
response['message'] = message
if error:
response['error'] = error_code or 'ERROR'
if not message:
response['message'] = error
return response

View File

@@ -0,0 +1,248 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
圖像預處理工具 - 用於提升 OCR 識別準確度
Author: PANJIT IT Team
Created: 2025-10-01
Modified: 2025-10-01
"""
import io
import numpy as np
from PIL import Image, ImageEnhance, ImageFilter
from typing import Optional, Tuple
from app.utils.logger import get_logger
logger = get_logger(__name__)
# 檢查 OpenCV 是否可用
try:
import cv2
_HAS_OPENCV = True
logger.info("OpenCV is available for advanced image preprocessing")
except ImportError:
_HAS_OPENCV = False
logger.warning("OpenCV not available, using PIL-only preprocessing")
class ImagePreprocessor:
"""圖像預處理器 - 提升掃描文件 OCR 品質"""
def __init__(self, use_opencv: bool = True):
"""
初始化圖像預處理器
Args:
use_opencv: 是否使用 OpenCV 進行進階處理(若可用)
"""
self.use_opencv = use_opencv and _HAS_OPENCV
logger.info(f"ImagePreprocessor initialized (OpenCV: {self.use_opencv})")
def preprocess_for_ocr(self, image_bytes: bytes,
enhance_level: str = 'medium') -> bytes:
"""
對圖像進行 OCR 前處理
Args:
image_bytes: 原始圖像字節數據
enhance_level: 增強級別 ('low', 'medium', 'high')
Returns:
處理後的圖像字節數據 (PNG格式)
"""
try:
# 1. 載入圖像
image = Image.open(io.BytesIO(image_bytes))
original_mode = image.mode
logger.debug(f"Original image: {image.size}, mode={original_mode}")
# 2. 轉換為 RGB (如果需要)
if image.mode not in ('RGB', 'L'):
image = image.convert('RGB')
logger.debug(f"Converted to RGB mode")
# 3. 根據增強級別選擇處理流程
if self.use_opencv:
processed_image = self._preprocess_with_opencv(image, enhance_level)
else:
processed_image = self._preprocess_with_pil(image, enhance_level)
# 4. 轉換為 PNG 字節
output_buffer = io.BytesIO()
processed_image.save(output_buffer, format='PNG', optimize=True)
processed_bytes = output_buffer.getvalue()
logger.info(f"Image preprocessed: {len(image_bytes)} -> {len(processed_bytes)} bytes (level={enhance_level})")
return processed_bytes
except Exception as e:
logger.error(f"Image preprocessing failed: {e}, returning original image")
return image_bytes # 失敗時返回原圖
def _preprocess_with_opencv(self, image: Image.Image, level: str) -> Image.Image:
"""使用 OpenCV 進行進階圖像處理"""
# PIL Image -> NumPy array
img_array = np.array(image)
# 轉換為 BGR (OpenCV 格式)
if len(img_array.shape) == 3 and img_array.shape[2] == 3:
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
else:
img_bgr = img_array
# 1. 灰階化
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
logger.debug("Applied grayscale conversion (OpenCV)")
# 2. 去噪 - 根據級別調整
if level == 'high':
# 高級別:較強去噪
denoised = cv2.fastNlMeansDenoising(gray, None, h=10, templateWindowSize=7, searchWindowSize=21)
logger.debug("Applied strong denoising (h=10)")
elif level == 'medium':
# 中級別:中等去噪
denoised = cv2.fastNlMeansDenoising(gray, None, h=7, templateWindowSize=7, searchWindowSize=21)
logger.debug("Applied medium denoising (h=7)")
else:
# 低級別:輕度去噪
denoised = cv2.bilateralFilter(gray, 5, 50, 50)
logger.debug("Applied light denoising (bilateral)")
# 3. 對比度增強 - CLAHE
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(denoised)
logger.debug("Applied CLAHE contrast enhancement")
# 4. 銳化 (高級別才使用)
if level == 'high':
kernel = np.array([[-1,-1,-1],
[-1, 9,-1],
[-1,-1,-1]])
sharpened = cv2.filter2D(enhanced, -1, kernel)
logger.debug("Applied sharpening filter")
else:
sharpened = enhanced
# 5. 自適應二值化 (根據級別決定是否使用)
if level in ('medium', 'high'):
# 使用自適應閾值
binary = cv2.adaptiveThreshold(
sharpened, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
blockSize=11,
C=2
)
logger.debug("Applied adaptive thresholding")
final_image = binary
else:
final_image = sharpened
# NumPy array -> PIL Image
return Image.fromarray(final_image)
def _preprocess_with_pil(self, image: Image.Image, level: str) -> Image.Image:
"""使用 PIL 進行基礎圖像處理(當 OpenCV 不可用時)"""
# 1. 灰階化
gray = image.convert('L')
logger.debug("Applied grayscale conversion (PIL)")
# 2. 對比度增強
enhancer = ImageEnhance.Contrast(gray)
if level == 'high':
contrast_factor = 2.0
elif level == 'medium':
contrast_factor = 1.5
else:
contrast_factor = 1.2
enhanced = enhancer.enhance(contrast_factor)
logger.debug(f"Applied contrast enhancement (factor={contrast_factor})")
# 3. 銳化
if level in ('medium', 'high'):
sharpness = ImageEnhance.Sharpness(enhanced)
sharp_factor = 2.0 if level == 'high' else 1.5
sharpened = sharpness.enhance(sharp_factor)
logger.debug(f"Applied sharpening (factor={sharp_factor})")
else:
sharpened = enhanced
# 4. 去噪 (使用中值濾波)
if level == 'high':
denoised = sharpened.filter(ImageFilter.MedianFilter(size=3))
logger.debug("Applied median filter (size=3)")
else:
denoised = sharpened
return denoised
def auto_detect_enhance_level(self, image_bytes: bytes) -> str:
"""
自動偵測最佳增強級別
Args:
image_bytes: 圖像字節數據
Returns:
建議的增強級別 ('low', 'medium', 'high')
"""
try:
image = Image.open(io.BytesIO(image_bytes))
if self.use_opencv:
# 使用 OpenCV 計算圖像品質指標
img_array = np.array(image.convert('L'))
# 計算拉普拉斯方差 (評估清晰度)
laplacian_var = cv2.Laplacian(img_array, cv2.CV_64F).var()
# 計算對比度 (標準差)
contrast = np.std(img_array)
logger.debug(f"Image quality metrics: laplacian_var={laplacian_var:.2f}, contrast={contrast:.2f}")
# 根據指標決定增強級別
if laplacian_var < 50 or contrast < 40:
# 模糊或低對比度 -> 高級別增強
return 'high'
elif laplacian_var < 100 or contrast < 60:
# 中等品質 -> 中級別增強
return 'medium'
else:
# 高品質 -> 低級別增強
return 'low'
else:
# PIL 簡易判斷
gray = image.convert('L')
img_array = np.array(gray)
# 簡單對比度評估
contrast = np.std(img_array)
if contrast < 40:
return 'high'
elif contrast < 60:
return 'medium'
else:
return 'low'
except Exception as e:
logger.error(f"Auto enhance level detection failed: {e}")
return 'medium' # 預設使用中級別
def preprocess_smart(self, image_bytes: bytes) -> bytes:
"""
智能預處理 - 自動偵測並應用最佳處理級別
Args:
image_bytes: 原始圖像字節數據
Returns:
處理後的圖像字節數據
"""
enhance_level = self.auto_detect_enhance_level(image_bytes)
logger.info(f"Auto-detected enhancement level: {enhance_level}")
return self.preprocess_for_ocr(image_bytes, enhance_level)

232
app/utils/ldap_auth.py Normal file
View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LDAP 認證服務
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import time
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
from flask import current_app
from .logger import get_logger
from .exceptions import AuthenticationError
logger = get_logger(__name__)
class LDAPAuthService:
"""LDAP 認證服務"""
def __init__(self):
self.config = current_app.config
self.server_url = self.config.get('LDAP_SERVER')
self.port = self.config.get('LDAP_PORT', 389)
self.use_ssl = self.config.get('LDAP_USE_SSL', False)
self.bind_user_dn = self.config.get('LDAP_BIND_USER_DN')
self.bind_password = self.config.get('LDAP_BIND_USER_PASSWORD')
self.search_base = self.config.get('LDAP_SEARCH_BASE')
self.login_attr = self.config.get('LDAP_USER_LOGIN_ATTR', 'userPrincipalName')
def create_connection(self, retries=3):
"""建立 LDAP 連線(帶重試機制)"""
for attempt in range(retries):
try:
server = Server(
self.server_url,
port=self.port,
use_ssl=self.use_ssl,
get_info=ALL_ATTRIBUTES
)
conn = Connection(
server,
user=self.bind_user_dn,
password=self.bind_password,
auto_bind=True,
raise_exceptions=True
)
logger.info("LDAP connection established successfully")
return conn
except Exception as e:
logger.error(f"LDAP connection attempt {attempt + 1} failed: {str(e)}")
if attempt == retries - 1:
raise AuthenticationError(f"LDAP connection failed: {str(e)}")
time.sleep(1)
return None
def authenticate_user(self, username, password):
"""驗證使用者憑證"""
try:
conn = self.create_connection()
if not conn:
raise AuthenticationError("Unable to connect to LDAP server")
# 搜尋使用者
search_filter = f"(&(objectClass=person)(objectCategory=person)({self.login_attr}={username}))"
conn.search(
self.search_base,
search_filter,
SUBTREE,
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department']
)
if not conn.entries:
logger.warning(f"User not found: {username}")
raise AuthenticationError("帳號不存在")
user_entry = conn.entries[0]
user_dn = user_entry.entry_dn
# 驗證使用者密碼
try:
user_conn = Connection(
conn.server,
user=user_dn,
password=password,
auto_bind=True,
raise_exceptions=True
)
user_conn.unbind()
# 返回使用者資訊
user_info = {
'username': str(user_entry.sAMAccountName) if user_entry.sAMAccountName else username,
'display_name': str(user_entry.displayName) if user_entry.displayName else username,
'email': str(user_entry.mail) if user_entry.mail else f"{username}@panjit.com.tw",
'department': str(user_entry.department) if hasattr(user_entry, 'department') and user_entry.department else None,
'user_principal_name': str(user_entry.userPrincipalName) if user_entry.userPrincipalName else username
}
logger.info(f"User authenticated successfully: {username}")
return user_info
except Exception as e:
logger.warning(f"Authentication failed for user {username}: {str(e)}")
raise AuthenticationError("密碼錯誤")
except AuthenticationError:
raise
except Exception as e:
logger.error(f"LDAP authentication error: {str(e)}")
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
finally:
if 'conn' in locals() and conn:
conn.unbind()
def search_users(self, search_term, limit=20):
"""搜尋使用者"""
try:
conn = self.create_connection()
if not conn:
return []
# 建構搜尋過濾器
search_filter = f"""(&
(objectClass=person)
(objectCategory=person)
(!(userAccountControl:1.2.840.113556.1.4.803:=2))
(|
(displayName=*{search_term}*)
(mail=*{search_term}*)
(sAMAccountName=*{search_term}*)
(userPrincipalName=*{search_term}*)
)
)"""
# 移除多餘空白
search_filter = ' '.join(search_filter.split())
conn.search(
self.search_base,
search_filter,
SUBTREE,
attributes=['sAMAccountName', 'displayName', 'mail', 'department'],
size_limit=limit
)
results = []
for entry in conn.entries:
results.append({
'username': str(entry.sAMAccountName) if entry.sAMAccountName else '',
'display_name': str(entry.displayName) if entry.displayName else '',
'email': str(entry.mail) if entry.mail else '',
'department': str(entry.department) if hasattr(entry, 'department') and entry.department else ''
})
logger.info(f"LDAP search found {len(results)} results for term: {search_term}")
return results
except Exception as e:
logger.error(f"LDAP search error: {str(e)}")
return []
finally:
if 'conn' in locals() and conn:
conn.unbind()
def get_user_info(self, username):
"""取得使用者詳細資訊"""
try:
conn = self.create_connection()
if not conn:
return None
# 支援 sAMAccountName 和 userPrincipalName 格式
if '@' in username:
search_filter = f"""(&
(objectClass=person)
(|
(userPrincipalName={username})
(mail={username})
)
)"""
else:
search_filter = f"(&(objectClass=person)(sAMAccountName={username}))"
# 移除多餘空白
search_filter = ' '.join(search_filter.split())
conn.search(
self.search_base,
search_filter,
SUBTREE,
attributes=['displayName', 'mail', 'sAMAccountName', 'userPrincipalName', 'department']
)
if not conn.entries:
return None
entry = conn.entries[0]
return {
'username': str(entry.sAMAccountName) if entry.sAMAccountName else username,
'display_name': str(entry.displayName) if entry.displayName else username,
'email': str(entry.mail) if entry.mail else f"{username}@panjit.com.tw",
'department': str(entry.department) if hasattr(entry, 'department') and entry.department else None,
'user_principal_name': str(entry.userPrincipalName) if entry.userPrincipalName else ''
}
except Exception as e:
logger.error(f"Error getting user info for {username}: {str(e)}")
return None
finally:
if 'conn' in locals() and conn:
conn.unbind()
def test_connection(self):
"""測試 LDAP 連線(健康檢查用)"""
try:
conn = self.create_connection(retries=1)
if conn:
conn.unbind()
return True
return False
except Exception as e:
logger.error(f"LDAP connection test failed: {str(e)}")
return False

126
app/utils/logger.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
日誌管理模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import logging
import os
from pathlib import Path
from logging.handlers import RotatingFileHandler
from flask import current_app, has_request_context, request, g
def get_logger(name):
"""取得指定名稱的日誌器"""
logger = logging.getLogger(name)
# 避免重複設定 handler
if not logger.handlers:
setup_logger(logger)
return logger
def setup_logger(logger):
"""設定日誌器"""
if has_request_context() and current_app:
log_level = current_app.config.get('LOG_LEVEL', 'INFO')
log_file = current_app.config.get('LOG_FILE', 'logs/app.log')
else:
log_level = os.environ.get('LOG_LEVEL', 'INFO')
log_file = os.environ.get('LOG_FILE', 'logs/app.log')
# 確保日誌目錄存在
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
# 設定日誌等級
logger.setLevel(getattr(logging, log_level.upper()))
# 建立格式化器
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 檔案處理器(使用輪轉)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(getattr(logging, log_level.upper()))
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 控制台處理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class DatabaseLogHandler(logging.Handler):
"""資料庫日誌處理器"""
def emit(self, record):
"""發送日誌記錄到資料庫"""
try:
from app.models.log import SystemLog
# 取得使用者和任務資訊(如果有的話)
user_id = None
job_id = None
extra_data = {}
if has_request_context():
user_id = g.get('current_user_id')
extra_data.update({
'method': request.method,
'endpoint': request.endpoint,
'url': request.url,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent')
})
# 儲存到資料庫
SystemLog.log(
level=record.levelname,
module=record.name,
message=record.getMessage(),
user_id=user_id,
job_id=job_id,
extra_data=extra_data if extra_data else None
)
except Exception:
# 避免日誌記錄失敗影響主程序
pass
def init_logging(app):
"""初始化應用程式日誌"""
# 設定根日誌器
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# 添加資料庫日誌處理器(僅對重要日誌)
if app.config.get('SQLALCHEMY_DATABASE_URI'):
db_handler = DatabaseLogHandler()
db_handler.setLevel(logging.WARNING) # 只記錄警告以上等級到資料庫
root_logger.addHandler(db_handler)
# 設定 Flask 應用日誌
if not app.logger.handlers:
setup_logger(app.logger)
# 設定第三方庫日誌等級
logging.getLogger('werkzeug').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)

84
app/utils/response.py Normal file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 響應處理工具
Author: PANJIT IT Team
Created: 2025-09-02
"""
from datetime import datetime
from typing import Dict, Any, List, Union
from app.utils.timezone import to_taiwan_time, format_taiwan_time
def convert_datetime_to_taiwan(data: Union[Dict, List, Any]) -> Union[Dict, List, Any]:
"""遞迴轉換資料中的 datetime 欄位為台灣時間
Args:
data: 要轉換的資料(字典、列表或其他)
Returns:
轉換後的資料
"""
if isinstance(data, dict):
result = {}
for key, value in data.items():
if isinstance(value, datetime):
# 將 datetime 轉換為台灣時間的 ISO 字符串
taiwan_dt = to_taiwan_time(value)
result[key] = taiwan_dt.isoformat()
elif key in ['created_at', 'updated_at', 'completed_at', 'processing_started_at', 'last_login', 'timestamp']:
# 特定的時間欄位
if isinstance(value, str):
try:
# 嘗試解析 ISO 格式的時間字符串
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
taiwan_dt = to_taiwan_time(dt)
result[key] = taiwan_dt.isoformat()
except:
result[key] = value
else:
result[key] = convert_datetime_to_taiwan(value)
else:
result[key] = convert_datetime_to_taiwan(value)
return result
elif isinstance(data, list):
return [convert_datetime_to_taiwan(item) for item in data]
else:
return data
def create_taiwan_response(success: bool = True, data: Any = None, message: str = '',
error: str = '', **kwargs) -> Dict[str, Any]:
"""創建包含台灣時區轉換的 API 響應
Args:
success: 是否成功
data: 響應資料
message: 成功訊息
error: 錯誤訊息
**kwargs: 其他參數
Returns:
包含台灣時區的響應字典
"""
response = {
'success': success,
'timestamp': format_taiwan_time(datetime.now(), "%Y-%m-%d %H:%M:%S")
}
if data is not None:
response['data'] = convert_datetime_to_taiwan(data)
if message:
response['message'] = message
if error:
response['error'] = error
# 加入其他參數
for key, value in kwargs.items():
response[key] = convert_datetime_to_taiwan(value)
return response

104
app/utils/timezone.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
時區工具函數
Author: PANJIT IT Team
Created: 2025-09-02
"""
from datetime import datetime, timezone, timedelta
from typing import Optional
# 台灣時區 UTC+8
TAIWAN_TZ = timezone(timedelta(hours=8))
def now_taiwan() -> datetime:
"""取得當前台灣時間UTC+8"""
return datetime.now(TAIWAN_TZ)
def now_utc() -> datetime:
"""取得當前 UTC 時間"""
return datetime.now(timezone.utc)
def to_taiwan_time(dt: datetime) -> datetime:
"""將 datetime 轉換為台灣時間
Args:
dt: datetime 物件(可能是 naive 或 aware
Returns:
台灣時區的 datetime 物件
"""
if dt is None:
return None
# 如果是 naive datetime假設為 UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# 轉換為台灣時區
return dt.astimezone(TAIWAN_TZ)
def to_utc_time(dt: datetime) -> datetime:
"""將 datetime 轉換為 UTC 時間
Args:
dt: datetime 物件(可能是 naive 或 aware
Returns:
UTC 時區的 datetime 物件
"""
if dt is None:
return None
# 如果是 naive datetime假設為台灣時間
if dt.tzinfo is None:
dt = dt.replace(tzinfo=TAIWAN_TZ)
# 轉換為 UTC
return dt.astimezone(timezone.utc)
def format_taiwan_time(dt: datetime, format_str: str = "%Y-%m-%d %H:%M:%S") -> str:
"""格式化台灣時間為字符串
Args:
dt: datetime 物件
format_str: 格式化字符串
Returns:
格式化後的時間字符串
"""
if dt is None:
return ""
taiwan_dt = to_taiwan_time(dt)
return taiwan_dt.strftime(format_str)
def parse_taiwan_time(time_str: str, format_str: str = "%Y-%m-%d %H:%M:%S") -> datetime:
"""解析台灣時間字符串為 datetime
Args:
time_str: 時間字符串
format_str: 解析格式
Returns:
台灣時區的 datetime 物件
"""
naive_dt = datetime.strptime(time_str, format_str)
return naive_dt.replace(tzinfo=TAIWAN_TZ)
# 為了向後兼容,提供替代 datetime.utcnow() 的函數
def utcnow() -> datetime:
"""取得當前 UTC 時間(替代 datetime.utcnow()
注意:新代碼建議使用 now_taiwan() 或 now_utc()
"""
return now_utc().replace(tzinfo=None) # 返回 naive UTC datetime 以保持兼容性

203
app/utils/validators.py Normal file
View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
驗證工具模組
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import os
from pathlib import Path
from flask import current_app
from .exceptions import ValidationError
def validate_file(file_obj):
"""驗證上傳的檔案"""
if not file_obj:
raise ValidationError("未選擇檔案", "NO_FILE")
if not file_obj.filename:
raise ValidationError("檔案名稱為空", "NO_FILENAME")
# 檢查檔案副檔名
file_ext = Path(file_obj.filename).suffix.lower()
allowed_extensions = current_app.config.get('ALLOWED_EXTENSIONS', {'.docx', '.doc', '.pptx', '.xlsx', '.xls', '.pdf'})
if file_ext not in allowed_extensions:
raise ValidationError(
f"不支援的檔案類型: {file_ext},支援的格式: {', '.join(allowed_extensions)}",
"INVALID_FILE_TYPE"
)
# 檢查檔案大小
max_size = current_app.config.get('MAX_CONTENT_LENGTH', 26214400) # 25MB
# 取得檔案大小
file_obj.seek(0, os.SEEK_END)
file_size = file_obj.tell()
file_obj.seek(0)
if file_size > max_size:
raise ValidationError(
f"檔案大小超過限制 ({format_file_size(max_size)})",
"FILE_TOO_LARGE"
)
if file_size == 0:
raise ValidationError("檔案為空", "EMPTY_FILE")
return {
'filename': file_obj.filename,
'file_extension': file_ext,
'file_size': file_size,
'valid': True
}
def validate_languages(source_language, target_languages):
"""驗證語言設定"""
# 支援的語言列表
supported_languages = {
'auto': '自動偵測',
'zh-CN': '簡體中文',
'zh-TW': '繁體中文',
'en': '英文',
'ja': '日文',
'ko': '韓文',
'vi': '越南文',
'th': '泰文',
'id': '印尼文',
'ms': '馬來文',
'es': '西班牙文',
'fr': '法文',
'de': '德文',
'ru': '俄文'
}
# 驗證來源語言
if source_language and source_language not in supported_languages:
raise ValidationError(
f"不支援的來源語言: {source_language}",
"INVALID_SOURCE_LANGUAGE"
)
# 驗證目標語言
if not target_languages or not isinstance(target_languages, list):
raise ValidationError("必須指定至少一個目標語言", "NO_TARGET_LANGUAGES")
if len(target_languages) == 0:
raise ValidationError("必須指定至少一個目標語言", "NO_TARGET_LANGUAGES")
if len(target_languages) > 10: # 限制最多10個目標語言
raise ValidationError("目標語言數量過多最多支援10個", "TOO_MANY_TARGET_LANGUAGES")
invalid_languages = [lang for lang in target_languages if lang not in supported_languages]
if invalid_languages:
raise ValidationError(
f"不支援的目標語言: {', '.join(invalid_languages)}",
"INVALID_TARGET_LANGUAGE"
)
# 檢查來源語言和目標語言是否有重疊
if source_language and source_language != 'auto' and source_language in target_languages:
raise ValidationError(
"目標語言不能包含來源語言",
"SOURCE_TARGET_OVERLAP"
)
return {
'source_language': source_language or 'auto',
'target_languages': target_languages,
'supported_languages': supported_languages,
'valid': True
}
def validate_job_uuid(job_uuid):
"""驗證任務UUID格式"""
import uuid
if not job_uuid:
raise ValidationError("任務UUID不能為空", "INVALID_UUID")
try:
uuid.UUID(job_uuid)
return True
except ValueError:
raise ValidationError("任務UUID格式錯誤", "INVALID_UUID")
def validate_pagination(page, per_page):
"""驗證分頁參數"""
try:
page = int(page) if page else 1
per_page = int(per_page) if per_page else 20
except (ValueError, TypeError):
raise ValidationError("分頁參數必須為數字", "INVALID_PAGINATION")
if page < 1:
raise ValidationError("頁數必須大於0", "INVALID_PAGE")
if per_page < 1 or per_page > 100:
raise ValidationError("每頁項目數必須在1-100之間", "INVALID_PER_PAGE")
return page, per_page
def format_file_size(size_bytes):
"""格式化檔案大小顯示"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def sanitize_filename(filename):
"""清理檔案名稱,移除不安全字元"""
import re
# 保留檔案名稱和副檔名
name = Path(filename).stem
ext = Path(filename).suffix
# 移除或替換不安全字元
safe_name = re.sub(r'[^\w\s.-]', '_', name)
safe_name = re.sub(r'\s+', '_', safe_name) # 空白替換為底線
safe_name = safe_name.strip('._') # 移除開頭結尾的點和底線
# 限制長度
if len(safe_name) > 100:
safe_name = safe_name[:100]
return f"{safe_name}{ext}"
def validate_date_range(start_date, end_date):
"""驗證日期範圍"""
from datetime import datetime
if start_date:
try:
start_date = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
except ValueError:
raise ValidationError("開始日期格式錯誤", "INVALID_START_DATE")
if end_date:
try:
end_date = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
except ValueError:
raise ValidationError("結束日期格式錯誤", "INVALID_END_DATE")
if start_date and end_date and start_date > end_date:
raise ValidationError("開始日期不能晚於結束日期", "INVALID_DATE_RANGE")
return start_date, end_date