""" Tool_OCR - External Authentication Service Handles authentication via external API (Microsoft Azure AD) """ import httpx from typing import Optional, Dict, Any from datetime import datetime, timedelta from pydantic import BaseModel, Field import logging from app.core.config import settings logger = logging.getLogger(__name__) class UserInfo(BaseModel): """User information from external API""" id: str name: str email: str job_title: Optional[str] = Field(alias="jobTitle", default=None) office_location: Optional[str] = Field(alias="officeLocation", default=None) business_phones: Optional[list[str]] = Field(alias="businessPhones", default=None) class Config: populate_by_name = True class AuthResponse(BaseModel): """Authentication response from external API""" access_token: str id_token: str expires_in: int token_type: str user_info: UserInfo = Field(alias="userInfo") issued_at: str = Field(alias="issuedAt") expires_at: str = Field(alias="expiresAt") class Config: populate_by_name = True class ExternalAuthService: """Service for external API authentication""" def __init__(self): self.api_url = settings.external_auth_full_url self.timeout = settings.external_auth_timeout self.max_retries = 3 self.retry_delay = 1 # seconds async def authenticate_user( self, username: str, password: str ) -> tuple[bool, Optional[AuthResponse], Optional[str]]: """ Authenticate user via external API Args: username: User's username (email) password: User's password Returns: Tuple of (success, auth_response, error_message) """ try: # Prepare request payload payload = {"username": username, "password": password} # Make HTTP request with timeout and retries async with httpx.AsyncClient(timeout=self.timeout) as client: for attempt in range(self.max_retries): try: response = await client.post( self.api_url, json=payload, headers={"Content-Type": "application/json"} ) # Success response (200) if response.status_code == 200: data = response.json() if data.get("success"): auth_data = AuthResponse(**data["data"]) logger.info( f"Authentication successful for user: {username}" ) return True, auth_data, None else: error_msg = data.get("error", "Unknown error") logger.warning( f"Authentication failed for user {username}: {error_msg}" ) return False, None, error_msg # Unauthorized (401) elif response.status_code == 401: data = response.json() error_msg = data.get("error", "Invalid credentials") logger.warning( f"Authentication failed for user {username}: {error_msg}" ) return False, None, error_msg # Other error codes else: error_msg = f"API returned status {response.status_code}" logger.error( f"Authentication API error for user {username}: {error_msg}" ) # Retry on 5xx errors if response.status_code >= 500 and attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (attempt + 1)) continue return False, None, error_msg except httpx.TimeoutException: logger.error( f"Authentication API timeout for user {username} (attempt {attempt + 1}/{self.max_retries})" ) if attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (attempt + 1)) continue return False, None, "Authentication API timeout" except httpx.RequestError as e: logger.error( f"Authentication API request error for user {username}: {str(e)}" ) if attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (attempt + 1)) continue return False, None, f"Network error: {str(e)}" # All retries exhausted return False, None, "Authentication API unavailable after retries" except Exception as e: logger.exception(f"Unexpected error during authentication for user {username}") return False, None, f"Internal error: {str(e)}" async def validate_token(self, access_token: str) -> tuple[bool, Optional[Dict[str, Any]]]: """ Validate access token (basic check, full validation would require token introspection endpoint) Args: access_token: JWT access token Returns: Tuple of (is_valid, token_payload) """ # Note: For full validation, you would need to: # 1. Verify JWT signature using Azure AD public keys # 2. Check token expiration # 3. Validate issuer, audience, etc. # For now, we rely on database session expiration tracking # TODO: Implement full JWT validation when needed # This is a placeholder that returns True for non-empty tokens if not access_token or not access_token.strip(): return False, None return True, {"valid": True} async def get_user_info(self, user_id: str) -> Optional[UserInfo]: """ Fetch user information from external API (if endpoint available) Args: user_id: User's ID from Azure AD Returns: UserInfo object or None if unavailable """ # TODO: Implement if external API provides user info endpoint # For now, we rely on user info stored in database from login logger.warning("get_user_info not implemented - use cached user info from database") return None def is_token_expiring_soon(self, expires_at: datetime) -> bool: """ Check if token is expiring soon (within TOKEN_REFRESH_BUFFER) Args: expires_at: Token expiration timestamp Returns: True if token expires within buffer time """ buffer_seconds = settings.token_refresh_buffer threshold = datetime.utcnow() + timedelta(seconds=buffer_seconds) return expires_at <= threshold # Import asyncio after class definition to avoid circular imports import asyncio # Global service instance external_auth_service = ExternalAuthService()