"""File validation utilities""" import magic import os from fastapi import UploadFile, HTTPException from typing import Set, Dict import logging from app.core.config import get_settings logger = logging.getLogger(__name__) settings = get_settings() # MIME type whitelists IMAGE_TYPES: Set[str] = { "image/jpeg", "image/png", "image/gif" } DOCUMENT_TYPES: Set[str] = { "application/pdf", "application/x-pdf", # Some systems detect PDF as x-pdf } # Extensions that can be accepted even if MIME detection fails EXTENSION_FALLBACK: Dict[str, str] = { ".pdf": "application/pdf", ".doc": "application/msword", ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", } LOG_TYPES: Set[str] = { "text/plain", "text/csv" } def detect_mime_type(file_data: bytes) -> str: """ Detect MIME type from file content using python-magic Args: file_data: First chunk of file data Returns: MIME type string """ try: mime = magic.Magic(mime=True) return mime.from_buffer(file_data) except Exception as e: logger.error(f"Failed to detect MIME type: {e}") return "application/octet-stream" def validate_file_type(file: UploadFile, allowed_types: Set[str]) -> str: """ Validate file MIME type using actual file content Args: file: FastAPI UploadFile object allowed_types: Set of allowed MIME types Returns: Detected MIME type Raises: HTTPException if file type is not allowed """ # Read first 2048 bytes to detect MIME type file.file.seek(0) header = file.file.read(2048) file.file.seek(0) # Detect actual MIME type from content detected_mime = detect_mime_type(header) if detected_mime not in allowed_types: # Try extension fallback for known safe file types filename = file.filename or "" _, ext = os.path.splitext(filename.lower()) if ext in EXTENSION_FALLBACK: logger.info( f"MIME detection returned {detected_mime} for {filename}, " f"using extension fallback: {EXTENSION_FALLBACK[ext]}" ) return EXTENSION_FALLBACK[ext] raise HTTPException( status_code=400, detail=f"File type not allowed: {detected_mime}. Allowed types: {', '.join(allowed_types)}" ) return detected_mime def validate_file_size(file: UploadFile, max_size: int): """ Validate file size Args: file: FastAPI UploadFile object max_size: Maximum allowed size in bytes Raises: HTTPException if file exceeds max size """ # Seek to end to get file size file.file.seek(0, 2) # 2 = SEEK_END file_size = file.file.tell() file.file.seek(0) # Reset to beginning if file_size > max_size: max_mb = max_size / (1024 * 1024) actual_mb = file_size / (1024 * 1024) raise HTTPException( status_code=413, detail=f"File size exceeds limit: {actual_mb:.2f}MB > {max_mb:.2f}MB" ) return file_size def get_file_type_and_limits(mime_type: str) -> tuple[str, int]: """ Determine file type category and size limit from MIME type Args: mime_type: MIME type string Returns: Tuple of (file_type, max_size) Raises: HTTPException if MIME type not recognized """ # Include extension fallback types as documents document_types_extended = DOCUMENT_TYPES | set(EXTENSION_FALLBACK.values()) if mime_type in IMAGE_TYPES: return ("image", settings.get_image_max_size_bytes()) elif mime_type in document_types_extended: return ("document", settings.get_document_max_size_bytes()) elif mime_type in LOG_TYPES: return ("log", settings.get_log_max_size_bytes()) else: raise HTTPException( status_code=400, detail=f"Unsupported file type: {mime_type}" ) def validate_upload_file(file: UploadFile) -> tuple[str, str, int]: """ Validate uploaded file (type and size) Args: file: FastAPI UploadFile object Returns: Tuple of (file_type, mime_type, file_size) Raises: HTTPException if validation fails """ # Combine all allowed types all_allowed_types = IMAGE_TYPES | DOCUMENT_TYPES | LOG_TYPES # Validate MIME type mime_type = validate_file_type(file, all_allowed_types) # Get file type category and max size file_type, max_size = get_file_type_and_limits(mime_type) # Validate file size file_size = validate_file_size(file, max_size) return (file_type, mime_type, file_size)