#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API 認證服務 用於與 PANJIT Auth API 整合認證 Author: PANJIT IT Team Created: 2025-10-01 """ import requests import json from datetime import datetime, timedelta from typing import Optional, Dict, Any, Tuple from flask import current_app from .logger import get_logger from .exceptions import AuthenticationError logger = get_logger(__name__) class APIAuthService: """API 認證服務""" def __init__(self): self.config = current_app.config self.api_base_url = "https://pj-auth-api.vercel.app" self.login_endpoint = "/api/auth/login" self.logout_endpoint = "/api/auth/logout" self.timeout = 30 # 30 秒超時 def authenticate_user(self, username: str, password: str) -> Dict[str, Any]: """ 透過 API 驗證使用者憑證 Args: username: 使用者帳號 password: 密碼 Returns: Dict: 包含使用者資訊和 Token 的字典 Raises: AuthenticationError: 認證失敗時拋出 """ try: login_url = f"{self.api_base_url}{self.login_endpoint}" payload = { "username": username, "password": password } headers = { "Content-Type": "application/json" } logger.info(f"正在透過 API 驗證使用者: {username}") # 發送認證請求 response = requests.post( login_url, json=payload, headers=headers, timeout=self.timeout ) # 解析回應 if response.status_code == 200: data = response.json() if data.get('success'): logger.info(f"API 認證成功: {username}") return self._parse_auth_response(data) else: error_msg = data.get('error', '認證失敗') logger.warning(f"API 認證失敗: {username} - {error_msg}") raise AuthenticationError(f"認證失敗: {error_msg}") elif response.status_code == 401: data = response.json() error_msg = data.get('error', '帳號或密碼錯誤') logger.warning(f"API 認證失敗 (401): {username} - {error_msg}") raise AuthenticationError("帳號或密碼錯誤") else: logger.error(f"API 認證請求失敗: HTTP {response.status_code}") raise AuthenticationError(f"認證服務錯誤 (HTTP {response.status_code})") except requests.exceptions.Timeout: logger.error(f"API 認證請求超時: {username}") raise AuthenticationError("認證服務回應超時,請稍後再試") except requests.exceptions.ConnectionError: logger.error(f"API 認證連線錯誤: {username}") raise AuthenticationError("無法連接認證服務,請檢查網路連線") except requests.exceptions.RequestException as e: logger.error(f"API 認證請求錯誤: {username} - {str(e)}") raise AuthenticationError(f"認證服務錯誤: {str(e)}") except json.JSONDecodeError: logger.error(f"API 認證回應格式錯誤: {username}") raise AuthenticationError("認證服務回應格式錯誤") except Exception as e: logger.error(f"API 認證未知錯誤: {username} - {str(e)}") raise AuthenticationError(f"認證過程發生錯誤: {str(e)}") def _parse_auth_response(self, data: Dict[str, Any]) -> Dict[str, Any]: """ 解析 API 認證回應 Args: data: API 回應資料 Returns: Dict: 標準化的使用者資訊 """ try: auth_data = data.get('data', {}) user_info = auth_data.get('userInfo', {}) # 解析 Token 過期時間 expires_at = None issued_at = None if 'expiresAt' in auth_data: try: expires_at = datetime.fromisoformat(auth_data['expiresAt'].replace('Z', '+00:00')) except (ValueError, AttributeError): logger.warning("無法解析 API Token 過期時間") if 'issuedAt' in auth_data: try: issued_at = datetime.fromisoformat(auth_data['issuedAt'].replace('Z', '+00:00')) except (ValueError, AttributeError): logger.warning("無法解析 API Token 發行時間") # 標準化使用者資訊 (方案 A: API name 是姓名+email 格式) api_name = user_info.get('name', '') # 例: "劉怡明 ymirliu@panjit.com.tw" api_email = user_info.get('email', '') # 例: "ymirliu@panjit.com.tw" result = { # 基本使用者資訊 (方案 A: username 和 display_name 都用 API name) 'username': api_name, # 姓名+email 格式 'display_name': api_name, # 姓名+email 格式 'email': api_email, # 純 email 'department': user_info.get('jobTitle'), # 使用 jobTitle 作為部門 'user_principal_name': api_email, # API 特有資訊 'api_user_id': user_info.get('id', ''), # Azure Object ID 'job_title': user_info.get('jobTitle'), 'office_location': user_info.get('officeLocation'), 'business_phones': user_info.get('businessPhones', []), # Token 資訊 'api_access_token': auth_data.get('access_token', ''), 'api_id_token': auth_data.get('id_token', ''), 'api_token_type': auth_data.get('token_type', 'Bearer'), 'api_expires_in': auth_data.get('expires_in', 0), 'api_issued_at': issued_at, 'api_expires_at': expires_at, # 完整的 API 回應 (用於記錄) 'full_api_response': data, 'api_user_info': user_info } return result except Exception as e: logger.error(f"解析 API 回應時發生錯誤: {str(e)}") raise AuthenticationError(f"解析認證回應時發生錯誤: {str(e)}") def logout_user(self, access_token: str) -> bool: """ 透過 API 登出使用者 Args: access_token: 使用者的 access token Returns: bool: 登出是否成功 """ try: logout_url = f"{self.api_base_url}{self.logout_endpoint}" headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } response = requests.post( logout_url, headers=headers, timeout=self.timeout ) if response.status_code == 200: data = response.json() if data.get('success'): logger.info("API 登出成功") return True logger.warning(f"API 登出失敗: HTTP {response.status_code}") return False except Exception as e: logger.error(f"API 登出時發生錯誤: {str(e)}") return False def validate_token(self, access_token: str) -> bool: """ 驗證 Token 是否有效 Args: access_token: 要驗證的 token Returns: bool: Token 是否有效 """ try: # 這裡可以實作 Token 驗證邏輯 # 目前 API 沒有提供專門的驗證端點,可以考慮解析 JWT 或調用其他端點 # 簡單的檢查:Token 不能為空且格式看起來像 JWT if not access_token or len(access_token.split('.')) != 3: return False # TODO: 實作更完整的 JWT 驗證邏輯 # 可以解析 JWT payload 檢查過期時間等 return True except Exception as e: logger.error(f"驗證 Token 時發生錯誤: {str(e)}") return False def test_connection(self) -> bool: """ 測試 API 連線 Returns: bool: 連線是否正常 """ try: # 嘗試連接 API 基礎端點 response = requests.get( self.api_base_url, timeout=10 ) return response.status_code in [200, 404] # 404 也算正常,表示能連接到伺服器 except Exception as e: logger.error(f"API 連線測試失敗: {str(e)}") return False def calculate_internal_expiry(self, api_expires_at: Optional[datetime], extend_days: int = 3) -> datetime: """ 計算內部 Token 過期時間 Args: api_expires_at: API Token 過期時間 extend_days: 延長天數 Returns: datetime: 內部 Token 過期時間 """ if api_expires_at: # 基於 API Token 過期時間延長 return api_expires_at + timedelta(days=extend_days) else: # 如果沒有 API 過期時間,從現在開始計算 return datetime.utcnow() + timedelta(days=extend_days)