Files
Document_translator_V2_nodo…/app/utils/api_auth.py
beabigegg 4cace93934 NO docker
2025-10-02 18:50:53 +08:00

277 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 認證服務
用於與 PANJIT Auth API 整合認證
Author: PANJIT IT Team
Created: 2025-10-01
"""
import requests
import json
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, Tuple
from flask import current_app
from .logger import get_logger
from .exceptions import AuthenticationError
logger = get_logger(__name__)
class APIAuthService:
"""API 認證服務"""
def __init__(self):
self.config = current_app.config
self.api_base_url = "https://pj-auth-api.vercel.app"
self.login_endpoint = "/api/auth/login"
self.logout_endpoint = "/api/auth/logout"
self.timeout = 30 # 30 秒超時
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
"""
透過 API 驗證使用者憑證
Args:
username: 使用者帳號
password: 密碼
Returns:
Dict: 包含使用者資訊和 Token 的字典
Raises:
AuthenticationError: 認證失敗時拋出
"""
try:
login_url = f"{self.api_base_url}{self.login_endpoint}"
payload = {
"username": username,
"password": password
}
headers = {
"Content-Type": "application/json"
}
logger.info(f"正在透過 API 驗證使用者: {username}")
# 發送認證請求
response = requests.post(
login_url,
json=payload,
headers=headers,
timeout=self.timeout
)
# 解析回應
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info(f"API 認證成功: {username}")
return self._parse_auth_response(data)
else:
error_msg = data.get('error', '認證失敗')
logger.warning(f"API 認證失敗: {username} - {error_msg}")
raise AuthenticationError(f"認證失敗: {error_msg}")
elif response.status_code == 401:
data = response.json()
error_msg = data.get('error', '帳號或密碼錯誤')
logger.warning(f"API 認證失敗 (401): {username} - {error_msg}")
raise AuthenticationError("帳號或密碼錯誤")
else:
logger.error(f"API 認證請求失敗: HTTP {response.status_code}")
raise AuthenticationError(f"認證服務錯誤 (HTTP {response.status_code})")
except requests.exceptions.Timeout:
logger.error(f"API 認證請求超時: {username}")
raise AuthenticationError("認證服務回應超時,請稍後再試")
except requests.exceptions.ConnectionError:
logger.error(f"API 認證連線錯誤: {username}")
raise AuthenticationError("無法連接認證服務,請檢查網路連線")
except requests.exceptions.RequestException as e:
logger.error(f"API 認證請求錯誤: {username} - {str(e)}")
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
except json.JSONDecodeError:
logger.error(f"API 認證回應格式錯誤: {username}")
raise AuthenticationError("認證服務回應格式錯誤")
except Exception as e:
logger.error(f"API 認證未知錯誤: {username} - {str(e)}")
raise AuthenticationError(f"認證過程發生錯誤: {str(e)}")
def _parse_auth_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 API 認證回應
Args:
data: API 回應資料
Returns:
Dict: 標準化的使用者資訊
"""
try:
auth_data = data.get('data', {})
user_info = auth_data.get('userInfo', {})
# 解析 Token 過期時間
expires_at = None
issued_at = None
if 'expiresAt' in auth_data:
try:
expires_at = datetime.fromisoformat(auth_data['expiresAt'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
logger.warning("無法解析 API Token 過期時間")
if 'issuedAt' in auth_data:
try:
issued_at = datetime.fromisoformat(auth_data['issuedAt'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
logger.warning("無法解析 API Token 發行時間")
# 標準化使用者資訊 (方案 A: API name 是姓名+email 格式)
api_name = user_info.get('name', '') # 例: "劉怡明 ymirliu@panjit.com.tw"
api_email = user_info.get('email', '') # 例: "ymirliu@panjit.com.tw"
result = {
# 基本使用者資訊 (方案 A: username 和 display_name 都用 API name)
'username': api_name, # 姓名+email 格式
'display_name': api_name, # 姓名+email 格式
'email': api_email, # 純 email
'department': user_info.get('jobTitle'), # 使用 jobTitle 作為部門
'user_principal_name': api_email,
# API 特有資訊
'api_user_id': user_info.get('id', ''), # Azure Object ID
'job_title': user_info.get('jobTitle'),
'office_location': user_info.get('officeLocation'),
'business_phones': user_info.get('businessPhones', []),
# Token 資訊
'api_access_token': auth_data.get('access_token', ''),
'api_id_token': auth_data.get('id_token', ''),
'api_token_type': auth_data.get('token_type', 'Bearer'),
'api_expires_in': auth_data.get('expires_in', 0),
'api_issued_at': issued_at,
'api_expires_at': expires_at,
# 完整的 API 回應 (用於記錄)
'full_api_response': data,
'api_user_info': user_info
}
return result
except Exception as e:
logger.error(f"解析 API 回應時發生錯誤: {str(e)}")
raise AuthenticationError(f"解析認證回應時發生錯誤: {str(e)}")
def logout_user(self, access_token: str) -> bool:
"""
透過 API 登出使用者
Args:
access_token: 使用者的 access token
Returns:
bool: 登出是否成功
"""
try:
logout_url = f"{self.api_base_url}{self.logout_endpoint}"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.post(
logout_url,
headers=headers,
timeout=self.timeout
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info("API 登出成功")
return True
logger.warning(f"API 登出失敗: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"API 登出時發生錯誤: {str(e)}")
return False
def validate_token(self, access_token: str) -> bool:
"""
驗證 Token 是否有效
Args:
access_token: 要驗證的 token
Returns:
bool: Token 是否有效
"""
try:
# 這裡可以實作 Token 驗證邏輯
# 目前 API 沒有提供專門的驗證端點,可以考慮解析 JWT 或調用其他端點
# 簡單的檢查Token 不能為空且格式看起來像 JWT
if not access_token or len(access_token.split('.')) != 3:
return False
# TODO: 實作更完整的 JWT 驗證邏輯
# 可以解析 JWT payload 檢查過期時間等
return True
except Exception as e:
logger.error(f"驗證 Token 時發生錯誤: {str(e)}")
return False
def test_connection(self) -> bool:
"""
測試 API 連線
Returns:
bool: 連線是否正常
"""
try:
# 嘗試連接 API 基礎端點
response = requests.get(
self.api_base_url,
timeout=10
)
return response.status_code in [200, 404] # 404 也算正常,表示能連接到伺服器
except Exception as e:
logger.error(f"API 連線測試失敗: {str(e)}")
return False
def calculate_internal_expiry(self, api_expires_at: Optional[datetime], extend_days: int = 3) -> datetime:
"""
計算內部 Token 過期時間
Args:
api_expires_at: API Token 過期時間
extend_days: 延長天數
Returns:
datetime: 內部 Token 過期時間
"""
if api_expires_at:
# 基於 API Token 過期時間延長
return api_expires_at + timedelta(days=extend_days)
else:
# 如果沒有 API 過期時間,從現在開始計算
return datetime.utcnow() + timedelta(days=extend_days)