改用API驗證

This commit is contained in:
beabigegg
2025-10-02 17:13:24 +08:00
parent 0a89c19fc9
commit adecdf0cce
48 changed files with 6136 additions and 1239 deletions

277
app/utils/api_auth.py Normal file
View File

@@ -0,0 +1,277 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 認證服務
用於與 PANJIT Auth API 整合認證
Author: PANJIT IT Team
Created: 2025-10-01
"""
import requests
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
from flask import current_app
from .logger import get_logger
from .exceptions import AuthenticationError
logger = get_logger(__name__)
class APIAuthService:
"""API 認證服務"""
def __init__(self):
self.config = current_app.config
self.api_base_url = "https://pj-auth-api.vercel.app"
self.login_endpoint = "/api/auth/login"
self.logout_endpoint = "/api/auth/logout"
self.timeout = 30 # 30 秒超時
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
"""
透過 API 驗證使用者憑證
Args:
username: 使用者帳號
password: 密碼
Returns:
Dict: 包含使用者資訊和 Token 的字典
Raises:
AuthenticationError: 認證失敗時拋出
"""
try:
login_url = f"{self.api_base_url}{self.login_endpoint}"
payload = {
"username": username,
"password": password
}
headers = {
"Content-Type": "application/json"
}
logger.info(f"正在透過 API 驗證使用者: {username}")
# 發送認證請求
response = requests.post(
login_url,
json=payload,
headers=headers,
timeout=self.timeout
)
# 解析回應
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info(f"API 認證成功: {username}")
return self._parse_auth_response(data)
else:
error_msg = data.get('error', '認證失敗')
logger.warning(f"API 認證失敗: {username} - {error_msg}")
raise AuthenticationError(f"認證失敗: {error_msg}")
elif response.status_code == 401:
data = response.json()
error_msg = data.get('error', '帳號或密碼錯誤')
logger.warning(f"API 認證失敗 (401): {username} - {error_msg}")
raise AuthenticationError("帳號或密碼錯誤")
else:
logger.error(f"API 認證請求失敗: HTTP {response.status_code}")
raise AuthenticationError(f"認證服務錯誤 (HTTP {response.status_code})")
except requests.exceptions.Timeout:
logger.error(f"API 認證請求超時: {username}")
raise AuthenticationError("認證服務回應超時,請稍後再試")
except requests.exceptions.ConnectionError:
logger.error(f"API 認證連線錯誤: {username}")
raise AuthenticationError("無法連接認證服務,請檢查網路連線")
except requests.exceptions.RequestException as e:
logger.error(f"API 認證請求錯誤: {username} - {str(e)}")
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
except json.JSONDecodeError:
logger.error(f"API 認證回應格式錯誤: {username}")
raise AuthenticationError("認證服務回應格式錯誤")
except Exception as e:
logger.error(f"API 認證未知錯誤: {username} - {str(e)}")
raise AuthenticationError(f"認證過程發生錯誤: {str(e)}")
def _parse_auth_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 API 認證回應
Args:
data: API 回應資料
Returns:
Dict: 標準化的使用者資訊
"""
try:
auth_data = data.get('data', {})
user_info = auth_data.get('userInfo', {})
# 解析 Token 過期時間
expires_at = None
issued_at = None
if 'expiresAt' in auth_data:
try:
expires_at = datetime.fromisoformat(auth_data['expiresAt'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
logger.warning("無法解析 API Token 過期時間")
if 'issuedAt' in auth_data:
try:
issued_at = datetime.fromisoformat(auth_data['issuedAt'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
logger.warning("無法解析 API Token 發行時間")
# 標準化使用者資訊 (方案 A: API name 是姓名+email 格式)
api_name = user_info.get('name', '') # 例: "劉怡明 ymirliu@panjit.com.tw"
api_email = user_info.get('email', '') # 例: "ymirliu@panjit.com.tw"
result = {
# 基本使用者資訊 (方案 A: username 和 display_name 都用 API name)
'username': api_name, # 姓名+email 格式
'display_name': api_name, # 姓名+email 格式
'email': api_email, # 純 email
'department': user_info.get('jobTitle'), # 使用 jobTitle 作為部門
'user_principal_name': api_email,
# API 特有資訊
'api_user_id': user_info.get('id', ''), # Azure Object ID
'job_title': user_info.get('jobTitle'),
'office_location': user_info.get('officeLocation'),
'business_phones': user_info.get('businessPhones', []),
# Token 資訊
'api_access_token': auth_data.get('access_token', ''),
'api_id_token': auth_data.get('id_token', ''),
'api_token_type': auth_data.get('token_type', 'Bearer'),
'api_expires_in': auth_data.get('expires_in', 0),
'api_issued_at': issued_at,
'api_expires_at': expires_at,
# 完整的 API 回應 (用於記錄)
'full_api_response': data,
'api_user_info': user_info
}
return result
except Exception as e:
logger.error(f"解析 API 回應時發生錯誤: {str(e)}")
raise AuthenticationError(f"解析認證回應時發生錯誤: {str(e)}")
def logout_user(self, access_token: str) -> bool:
"""
透過 API 登出使用者
Args:
access_token: 使用者的 access token
Returns:
bool: 登出是否成功
"""
try:
logout_url = f"{self.api_base_url}{self.logout_endpoint}"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.post(
logout_url,
headers=headers,
timeout=self.timeout
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info("API 登出成功")
return True
logger.warning(f"API 登出失敗: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"API 登出時發生錯誤: {str(e)}")
return False
def validate_token(self, access_token: str) -> bool:
"""
驗證 Token 是否有效
Args:
access_token: 要驗證的 token
Returns:
bool: Token 是否有效
"""
try:
# 這裡可以實作 Token 驗證邏輯
# 目前 API 沒有提供專門的驗證端點,可以考慮解析 JWT 或調用其他端點
# 簡單的檢查Token 不能為空且格式看起來像 JWT
if not access_token or len(access_token.split('.')) != 3:
return False
# TODO: 實作更完整的 JWT 驗證邏輯
# 可以解析 JWT payload 檢查過期時間等
return True
except Exception as e:
logger.error(f"驗證 Token 時發生錯誤: {str(e)}")
return False
def test_connection(self) -> bool:
"""
測試 API 連線
Returns:
bool: 連線是否正常
"""
try:
# 嘗試連接 API 基礎端點
response = requests.get(
self.api_base_url,
timeout=10
)
return response.status_code in [200, 404] # 404 也算正常,表示能連接到伺服器
except Exception as e:
logger.error(f"API 連線測試失敗: {str(e)}")
return False
def calculate_internal_expiry(self, api_expires_at: Optional[datetime], extend_days: int = 3) -> datetime:
"""
計算內部 Token 過期時間
Args:
api_expires_at: API Token 過期時間
extend_days: 延長天數
Returns:
datetime: 內部 Token 過期時間
"""
if api_expires_at:
# 基於 API Token 過期時間延長
return api_expires_at + timedelta(days=extend_days)
else:
# 如果沒有 API 過期時間,從現在開始計算
return datetime.utcnow() + timedelta(days=extend_days)

View File

@@ -0,0 +1,248 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
圖像預處理工具 - 用於提升 OCR 識別準確度
Author: PANJIT IT Team
Created: 2025-10-01
Modified: 2025-10-01
"""
import io
import numpy as np
from PIL import Image, ImageEnhance, ImageFilter
from typing import Optional, Tuple
from app.utils.logger import get_logger
logger = get_logger(__name__)
# 檢查 OpenCV 是否可用
try:
import cv2
_HAS_OPENCV = True
logger.info("OpenCV is available for advanced image preprocessing")
except ImportError:
_HAS_OPENCV = False
logger.warning("OpenCV not available, using PIL-only preprocessing")
class ImagePreprocessor:
"""圖像預處理器 - 提升掃描文件 OCR 品質"""
def __init__(self, use_opencv: bool = True):
"""
初始化圖像預處理器
Args:
use_opencv: 是否使用 OpenCV 進行進階處理(若可用)
"""
self.use_opencv = use_opencv and _HAS_OPENCV
logger.info(f"ImagePreprocessor initialized (OpenCV: {self.use_opencv})")
def preprocess_for_ocr(self, image_bytes: bytes,
enhance_level: str = 'medium') -> bytes:
"""
對圖像進行 OCR 前處理
Args:
image_bytes: 原始圖像字節數據
enhance_level: 增強級別 ('low', 'medium', 'high')
Returns:
處理後的圖像字節數據 (PNG格式)
"""
try:
# 1. 載入圖像
image = Image.open(io.BytesIO(image_bytes))
original_mode = image.mode
logger.debug(f"Original image: {image.size}, mode={original_mode}")
# 2. 轉換為 RGB (如果需要)
if image.mode not in ('RGB', 'L'):
image = image.convert('RGB')
logger.debug(f"Converted to RGB mode")
# 3. 根據增強級別選擇處理流程
if self.use_opencv:
processed_image = self._preprocess_with_opencv(image, enhance_level)
else:
processed_image = self._preprocess_with_pil(image, enhance_level)
# 4. 轉換為 PNG 字節
output_buffer = io.BytesIO()
processed_image.save(output_buffer, format='PNG', optimize=True)
processed_bytes = output_buffer.getvalue()
logger.info(f"Image preprocessed: {len(image_bytes)} -> {len(processed_bytes)} bytes (level={enhance_level})")
return processed_bytes
except Exception as e:
logger.error(f"Image preprocessing failed: {e}, returning original image")
return image_bytes # 失敗時返回原圖
def _preprocess_with_opencv(self, image: Image.Image, level: str) -> Image.Image:
"""使用 OpenCV 進行進階圖像處理"""
# PIL Image -> NumPy array
img_array = np.array(image)
# 轉換為 BGR (OpenCV 格式)
if len(img_array.shape) == 3 and img_array.shape[2] == 3:
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
else:
img_bgr = img_array
# 1. 灰階化
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
logger.debug("Applied grayscale conversion (OpenCV)")
# 2. 去噪 - 根據級別調整
if level == 'high':
# 高級別:較強去噪
denoised = cv2.fastNlMeansDenoising(gray, None, h=10, templateWindowSize=7, searchWindowSize=21)
logger.debug("Applied strong denoising (h=10)")
elif level == 'medium':
# 中級別:中等去噪
denoised = cv2.fastNlMeansDenoising(gray, None, h=7, templateWindowSize=7, searchWindowSize=21)
logger.debug("Applied medium denoising (h=7)")
else:
# 低級別:輕度去噪
denoised = cv2.bilateralFilter(gray, 5, 50, 50)
logger.debug("Applied light denoising (bilateral)")
# 3. 對比度增強 - CLAHE
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(denoised)
logger.debug("Applied CLAHE contrast enhancement")
# 4. 銳化 (高級別才使用)
if level == 'high':
kernel = np.array([[-1,-1,-1],
[-1, 9,-1],
[-1,-1,-1]])
sharpened = cv2.filter2D(enhanced, -1, kernel)
logger.debug("Applied sharpening filter")
else:
sharpened = enhanced
# 5. 自適應二值化 (根據級別決定是否使用)
if level in ('medium', 'high'):
# 使用自適應閾值
binary = cv2.adaptiveThreshold(
sharpened, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
blockSize=11,
C=2
)
logger.debug("Applied adaptive thresholding")
final_image = binary
else:
final_image = sharpened
# NumPy array -> PIL Image
return Image.fromarray(final_image)
def _preprocess_with_pil(self, image: Image.Image, level: str) -> Image.Image:
"""使用 PIL 進行基礎圖像處理(當 OpenCV 不可用時)"""
# 1. 灰階化
gray = image.convert('L')
logger.debug("Applied grayscale conversion (PIL)")
# 2. 對比度增強
enhancer = ImageEnhance.Contrast(gray)
if level == 'high':
contrast_factor = 2.0
elif level == 'medium':
contrast_factor = 1.5
else:
contrast_factor = 1.2
enhanced = enhancer.enhance(contrast_factor)
logger.debug(f"Applied contrast enhancement (factor={contrast_factor})")
# 3. 銳化
if level in ('medium', 'high'):
sharpness = ImageEnhance.Sharpness(enhanced)
sharp_factor = 2.0 if level == 'high' else 1.5
sharpened = sharpness.enhance(sharp_factor)
logger.debug(f"Applied sharpening (factor={sharp_factor})")
else:
sharpened = enhanced
# 4. 去噪 (使用中值濾波)
if level == 'high':
denoised = sharpened.filter(ImageFilter.MedianFilter(size=3))
logger.debug("Applied median filter (size=3)")
else:
denoised = sharpened
return denoised
def auto_detect_enhance_level(self, image_bytes: bytes) -> str:
"""
自動偵測最佳增強級別
Args:
image_bytes: 圖像字節數據
Returns:
建議的增強級別 ('low', 'medium', 'high')
"""
try:
image = Image.open(io.BytesIO(image_bytes))
if self.use_opencv:
# 使用 OpenCV 計算圖像品質指標
img_array = np.array(image.convert('L'))
# 計算拉普拉斯方差 (評估清晰度)
laplacian_var = cv2.Laplacian(img_array, cv2.CV_64F).var()
# 計算對比度 (標準差)
contrast = np.std(img_array)
logger.debug(f"Image quality metrics: laplacian_var={laplacian_var:.2f}, contrast={contrast:.2f}")
# 根據指標決定增強級別
if laplacian_var < 50 or contrast < 40:
# 模糊或低對比度 -> 高級別增強
return 'high'
elif laplacian_var < 100 or contrast < 60:
# 中等品質 -> 中級別增強
return 'medium'
else:
# 高品質 -> 低級別增強
return 'low'
else:
# PIL 簡易判斷
gray = image.convert('L')
img_array = np.array(gray)
# 簡單對比度評估
contrast = np.std(img_array)
if contrast < 40:
return 'high'
elif contrast < 60:
return 'medium'
else:
return 'low'
except Exception as e:
logger.error(f"Auto enhance level detection failed: {e}")
return 'medium' # 預設使用中級別
def preprocess_smart(self, image_bytes: bytes) -> bytes:
"""
智能預處理 - 自動偵測並應用最佳處理級別
Args:
image_bytes: 原始圖像字節數據
Returns:
處理後的圖像字節數據
"""
enhance_level = self.auto_detect_enhance_level(image_bytes)
logger.info(f"Auto-detected enhancement level: {enhance_level}")
return self.preprocess_for_ocr(image_bytes, enhance_level)