""" Tool_OCR - FastAPI Dependencies Authentication and database session dependencies """ from typing import Generator, Optional import logging from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from app.core.database import SessionLocal from app.core.security import decode_access_token from app.models.user import User from app.models.user_v2 import User as UserV2 from app.models.session import Session as UserSession from app.services.admin_service import admin_service logger = logging.getLogger(__name__) # HTTP Bearer token security scheme security = HTTPBearer() def get_db() -> Generator: """ Database session dependency Yields: Session: SQLAlchemy database session """ db = SessionLocal() try: yield db finally: db.close() def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """ Get current authenticated user from JWT token Args: credentials: HTTP Bearer credentials db: Database session Returns: User: Current user object Raises: HTTPException: If token is invalid or user not found """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Extract token token = credentials.credentials # Decode token payload = decode_access_token(token) if payload is None: raise credentials_exception # Extract user ID from token (convert from string to int) user_id_str: Optional[str] = payload.get("sub") if user_id_str is None: raise credentials_exception try: user_id: int = int(user_id_str) except (ValueError, TypeError): raise credentials_exception # Query user from database user = db.query(User).filter(User.id == user_id).first() if user is None: raise credentials_exception # Check if user is active if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return user def get_current_active_user( current_user: User = Depends(get_current_user) ) -> User: """ Get current active user Args: current_user: Current user from get_current_user Returns: User: Current active user Raises: HTTPException: If user is inactive """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return current_user def get_current_admin_user( current_user: User = Depends(get_current_user) ) -> User: """ Get current admin user Args: current_user: Current user from get_current_user Returns: User: Current admin user Raises: HTTPException: If user is not admin """ if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough privileges" ) return current_user # ===== V2 Dependencies for External Authentication ===== def get_current_user_v2( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> UserV2: """ Get current authenticated user from JWT token (V2 with external auth) Args: credentials: HTTP Bearer credentials db: Database session Returns: UserV2: Current user object Raises: HTTPException: If token is invalid or user not found """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Extract token token = credentials.credentials # Decode token payload = decode_access_token(token) if payload is None: raise credentials_exception # Extract user ID from token user_id_str: Optional[str] = payload.get("sub") if user_id_str is None: raise credentials_exception try: user_id: int = int(user_id_str) except (ValueError, TypeError): raise credentials_exception # Extract session ID from token (optional) session_id: Optional[int] = payload.get("session_id") # Query user from database (using V2 model) user = db.query(UserV2).filter(UserV2.id == user_id).first() if user is None: logger.warning(f"User {user_id} not found in V2 table") raise credentials_exception # Check if user is active if not user.is_active: logger.warning(f"Inactive user {user.email} attempted access") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) # Validate session if session_id is present if session_id: session = db.query(UserSession).filter( UserSession.id == session_id, UserSession.user_id == user.id ).first() if not session: logger.warning(f"Session {session_id} not found for user {user.email}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid session", headers={"WWW-Authenticate": "Bearer"}, ) # Check if session is expired if session.is_expired: logger.warning(f"Expired session {session_id} for user {user.email}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session expired, please login again", headers={"WWW-Authenticate": "Bearer"}, ) # Update last accessed time from datetime import datetime session.last_accessed_at = datetime.utcnow() db.commit() logger.debug(f"Authenticated user: {user.email} (ID: {user.id})") return user def get_current_active_user_v2( current_user: UserV2 = Depends(get_current_user_v2) ) -> UserV2: """ Get current active user (V2) Args: current_user: Current user from get_current_user_v2 Returns: UserV2: Current active user Raises: HTTPException: If user is inactive """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return current_user def get_current_admin_user_v2( current_user: UserV2 = Depends(get_current_user_v2) ) -> UserV2: """ Get current admin user (V2) Args: current_user: Current user from get_current_user_v2 Returns: UserV2: Current admin user Raises: HTTPException: If user is not admin """ if not admin_service.is_admin(current_user.email): logger.warning(f"Non-admin user {current_user.email} attempted admin access") raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required" ) return current_user