277 lines
9.4 KiB
Python
277 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
API 認證服務
|
||
用於與 PANJIT Auth API 整合認證
|
||
|
||
Author: PANJIT IT Team
|
||
Created: 2025-10-01
|
||
"""
|
||
|
||
import requests
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, Dict, Any, Tuple
|
||
from flask import current_app
|
||
from .logger import get_logger
|
||
from .exceptions import AuthenticationError
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class APIAuthService:
|
||
"""API 認證服務"""
|
||
|
||
def __init__(self):
|
||
self.config = current_app.config
|
||
self.api_base_url = "https://pj-auth-api.vercel.app"
|
||
self.login_endpoint = "/api/auth/login"
|
||
self.logout_endpoint = "/api/auth/logout"
|
||
self.timeout = 30 # 30 秒超時
|
||
|
||
def authenticate_user(self, username: str, password: str) -> Dict[str, Any]:
|
||
"""
|
||
透過 API 驗證使用者憑證
|
||
|
||
Args:
|
||
username: 使用者帳號
|
||
password: 密碼
|
||
|
||
Returns:
|
||
Dict: 包含使用者資訊和 Token 的字典
|
||
|
||
Raises:
|
||
AuthenticationError: 認證失敗時拋出
|
||
"""
|
||
try:
|
||
login_url = f"{self.api_base_url}{self.login_endpoint}"
|
||
|
||
payload = {
|
||
"username": username,
|
||
"password": password
|
||
}
|
||
|
||
headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
logger.info(f"正在透過 API 驗證使用者: {username}")
|
||
|
||
# 發送認證請求
|
||
response = requests.post(
|
||
login_url,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=self.timeout
|
||
)
|
||
|
||
# 解析回應
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
|
||
if data.get('success'):
|
||
logger.info(f"API 認證成功: {username}")
|
||
return self._parse_auth_response(data)
|
||
else:
|
||
error_msg = data.get('error', '認證失敗')
|
||
logger.warning(f"API 認證失敗: {username} - {error_msg}")
|
||
raise AuthenticationError(f"認證失敗: {error_msg}")
|
||
|
||
elif response.status_code == 401:
|
||
data = response.json()
|
||
error_msg = data.get('error', '帳號或密碼錯誤')
|
||
logger.warning(f"API 認證失敗 (401): {username} - {error_msg}")
|
||
raise AuthenticationError("帳號或密碼錯誤")
|
||
|
||
else:
|
||
logger.error(f"API 認證請求失敗: HTTP {response.status_code}")
|
||
raise AuthenticationError(f"認證服務錯誤 (HTTP {response.status_code})")
|
||
|
||
except requests.exceptions.Timeout:
|
||
logger.error(f"API 認證請求超時: {username}")
|
||
raise AuthenticationError("認證服務回應超時,請稍後再試")
|
||
|
||
except requests.exceptions.ConnectionError:
|
||
logger.error(f"API 認證連線錯誤: {username}")
|
||
raise AuthenticationError("無法連接認證服務,請檢查網路連線")
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"API 認證請求錯誤: {username} - {str(e)}")
|
||
raise AuthenticationError(f"認證服務錯誤: {str(e)}")
|
||
|
||
except json.JSONDecodeError:
|
||
logger.error(f"API 認證回應格式錯誤: {username}")
|
||
raise AuthenticationError("認證服務回應格式錯誤")
|
||
|
||
except Exception as e:
|
||
logger.error(f"API 認證未知錯誤: {username} - {str(e)}")
|
||
raise AuthenticationError(f"認證過程發生錯誤: {str(e)}")
|
||
|
||
def _parse_auth_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
解析 API 認證回應
|
||
|
||
Args:
|
||
data: API 回應資料
|
||
|
||
Returns:
|
||
Dict: 標準化的使用者資訊
|
||
"""
|
||
try:
|
||
auth_data = data.get('data', {})
|
||
user_info = auth_data.get('userInfo', {})
|
||
|
||
# 解析 Token 過期時間
|
||
expires_at = None
|
||
issued_at = None
|
||
|
||
if 'expiresAt' in auth_data:
|
||
try:
|
||
expires_at = datetime.fromisoformat(auth_data['expiresAt'].replace('Z', '+00:00'))
|
||
except (ValueError, AttributeError):
|
||
logger.warning("無法解析 API Token 過期時間")
|
||
|
||
if 'issuedAt' in auth_data:
|
||
try:
|
||
issued_at = datetime.fromisoformat(auth_data['issuedAt'].replace('Z', '+00:00'))
|
||
except (ValueError, AttributeError):
|
||
logger.warning("無法解析 API Token 發行時間")
|
||
|
||
# 標準化使用者資訊 (方案 A: API name 是姓名+email 格式)
|
||
api_name = user_info.get('name', '') # 例: "劉怡明 ymirliu@panjit.com.tw"
|
||
api_email = user_info.get('email', '') # 例: "ymirliu@panjit.com.tw"
|
||
|
||
result = {
|
||
# 基本使用者資訊 (方案 A: username 和 display_name 都用 API name)
|
||
'username': api_name, # 姓名+email 格式
|
||
'display_name': api_name, # 姓名+email 格式
|
||
'email': api_email, # 純 email
|
||
'department': user_info.get('jobTitle'), # 使用 jobTitle 作為部門
|
||
'user_principal_name': api_email,
|
||
|
||
# API 特有資訊
|
||
'api_user_id': user_info.get('id', ''), # Azure Object ID
|
||
'job_title': user_info.get('jobTitle'),
|
||
'office_location': user_info.get('officeLocation'),
|
||
'business_phones': user_info.get('businessPhones', []),
|
||
|
||
# Token 資訊
|
||
'api_access_token': auth_data.get('access_token', ''),
|
||
'api_id_token': auth_data.get('id_token', ''),
|
||
'api_token_type': auth_data.get('token_type', 'Bearer'),
|
||
'api_expires_in': auth_data.get('expires_in', 0),
|
||
'api_issued_at': issued_at,
|
||
'api_expires_at': expires_at,
|
||
|
||
# 完整的 API 回應 (用於記錄)
|
||
'full_api_response': data,
|
||
'api_user_info': user_info
|
||
}
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析 API 回應時發生錯誤: {str(e)}")
|
||
raise AuthenticationError(f"解析認證回應時發生錯誤: {str(e)}")
|
||
|
||
def logout_user(self, access_token: str) -> bool:
|
||
"""
|
||
透過 API 登出使用者
|
||
|
||
Args:
|
||
access_token: 使用者的 access token
|
||
|
||
Returns:
|
||
bool: 登出是否成功
|
||
"""
|
||
try:
|
||
logout_url = f"{self.api_base_url}{self.logout_endpoint}"
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {access_token}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
response = requests.post(
|
||
logout_url,
|
||
headers=headers,
|
||
timeout=self.timeout
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('success'):
|
||
logger.info("API 登出成功")
|
||
return True
|
||
|
||
logger.warning(f"API 登出失敗: HTTP {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"API 登出時發生錯誤: {str(e)}")
|
||
return False
|
||
|
||
def validate_token(self, access_token: str) -> bool:
|
||
"""
|
||
驗證 Token 是否有效
|
||
|
||
Args:
|
||
access_token: 要驗證的 token
|
||
|
||
Returns:
|
||
bool: Token 是否有效
|
||
"""
|
||
try:
|
||
# 這裡可以實作 Token 驗證邏輯
|
||
# 目前 API 沒有提供專門的驗證端點,可以考慮解析 JWT 或調用其他端點
|
||
|
||
# 簡單的檢查:Token 不能為空且格式看起來像 JWT
|
||
if not access_token or len(access_token.split('.')) != 3:
|
||
return False
|
||
|
||
# TODO: 實作更完整的 JWT 驗證邏輯
|
||
# 可以解析 JWT payload 檢查過期時間等
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"驗證 Token 時發生錯誤: {str(e)}")
|
||
return False
|
||
|
||
def test_connection(self) -> bool:
|
||
"""
|
||
測試 API 連線
|
||
|
||
Returns:
|
||
bool: 連線是否正常
|
||
"""
|
||
try:
|
||
# 嘗試連接 API 基礎端點
|
||
response = requests.get(
|
||
self.api_base_url,
|
||
timeout=10
|
||
)
|
||
|
||
return response.status_code in [200, 404] # 404 也算正常,表示能連接到伺服器
|
||
|
||
except Exception as e:
|
||
logger.error(f"API 連線測試失敗: {str(e)}")
|
||
return False
|
||
|
||
def calculate_internal_expiry(self, api_expires_at: Optional[datetime], extend_days: int = 3) -> datetime:
|
||
"""
|
||
計算內部 Token 過期時間
|
||
|
||
Args:
|
||
api_expires_at: API Token 過期時間
|
||
extend_days: 延長天數
|
||
|
||
Returns:
|
||
datetime: 內部 Token 過期時間
|
||
"""
|
||
if api_expires_at:
|
||
# 基於 API Token 過期時間延長
|
||
return api_expires_at + timedelta(days=extend_days)
|
||
else:
|
||
# 如果沒有 API 過期時間,從現在開始計算
|
||
return datetime.utcnow() + timedelta(days=extend_days) |