Files
OCR/backend/app/core/deps.py
egg fd98018ddd refactor: complete V1 to V2 migration and remove legacy architecture
Remove all V1 architecture components and promote V2 to primary:
- Delete all paddle_ocr_* table models (export, ocr, translation, user)
- Delete legacy routers (auth, export, ocr, translation)
- Delete legacy schemas and services
- Promote user_v2.py to user.py as primary user model
- Update all imports and dependencies to use V2 models only
- Update main.py version to 2.0.0

Database changes:
- Fix SQLAlchemy reserved word: rename audit_log.metadata to extra_data
- Add migration to drop all paddle_ocr_* tables
- Update alembic env to only import V2 models

Frontend fixes:
- Fix Select component exports in TaskHistoryPage.tsx
- Update to use simplified Select API with options prop
- Fix AxiosInstance TypeScript import syntax

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-14 21:27:39 +08:00

177 lines
4.7 KiB
Python

"""
Tool_OCR - FastAPI Dependencies (V2)
Authentication and database session dependencies with external authentication
"""
from typing import Generator, Optional
import logging
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.core.security import decode_access_token
from app.models.user import User
from app.models.session import Session as UserSession
from app.services.admin_service import admin_service
logger = logging.getLogger(__name__)
# HTTP Bearer token security scheme
security = HTTPBearer()
def get_db() -> Generator:
"""
Database session dependency
Yields:
Session: SQLAlchemy database session
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
Get current authenticated user from JWT token (External Authentication)
Args:
credentials: HTTP Bearer credentials
db: Database session
Returns:
User: Current user object
Raises:
HTTPException: If token is invalid or user not found
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract token
token = credentials.credentials
# Decode token
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
# Extract user ID from token
user_id_str: Optional[str] = payload.get("sub")
if user_id_str is None:
raise credentials_exception
try:
user_id: int = int(user_id_str)
except (ValueError, TypeError):
raise credentials_exception
# Extract session ID from token (optional)
session_id: Optional[int] = payload.get("session_id")
# Query user from database
user = db.query(User).filter(User.id == user_id).first()
if user is None:
logger.warning(f"User {user_id} not found")
raise credentials_exception
# Check if user is active
if not user.is_active:
logger.warning(f"Inactive user {user.email} attempted access")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
# Validate session if session_id is present
if session_id:
session = db.query(UserSession).filter(
UserSession.id == session_id,
UserSession.user_id == user.id
).first()
if not session:
logger.warning(f"Session {session_id} not found for user {user.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid session",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if session is expired
if session.is_expired:
logger.warning(f"Expired session {session_id} for user {user.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session expired, please login again",
headers={"WWW-Authenticate": "Bearer"},
)
# Update last accessed time
from datetime import datetime
session.last_accessed_at = datetime.utcnow()
db.commit()
logger.debug(f"Authenticated user: {user.email} (ID: {user.id})")
return user
def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Get current active user
Args:
current_user: Current user from get_current_user
Returns:
User: Current active user
Raises:
HTTPException: If user is inactive
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
def get_current_admin_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Get current admin user
Args:
current_user: Current user from get_current_user
Returns:
User: Current admin user
Raises:
HTTPException: If user is not admin
"""
if not admin_service.is_admin(current_user.email):
logger.warning(f"Non-admin user {current_user.email} attempted admin access")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user