"""MinIO client singleton for object storage""" from minio import Minio from minio.error import S3Error from app.core.config import get_settings import logging logger = logging.getLogger(__name__) _minio_client = None def get_minio_client() -> Minio: """ Get or create MinIO client singleton Returns: Minio client instance """ global _minio_client if _minio_client is None: settings = get_settings() _minio_client = Minio( settings.MINIO_ENDPOINT, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE ) logger.info(f"MinIO client initialized: {settings.MINIO_ENDPOINT}") return _minio_client def initialize_bucket(): """ Initialize MinIO bucket if it doesn't exist Returns: True if bucket exists or was created successfully """ settings = get_settings() client = get_minio_client() bucket_name = settings.MINIO_BUCKET try: # Check if bucket exists if not client.bucket_exists(bucket_name): # Create bucket client.make_bucket(bucket_name) logger.info(f"MinIO bucket created: {bucket_name}") else: logger.info(f"MinIO bucket already exists: {bucket_name}") return True except S3Error as e: logger.error(f"Failed to initialize MinIO bucket '{bucket_name}': {e}") return False except Exception as e: logger.error(f"Unexpected error initializing MinIO bucket: {e}") return False def health_check() -> bool: """ Check if MinIO connection is healthy Returns: True if connection is healthy, False otherwise """ settings = get_settings() try: client = get_minio_client() # List buckets as a health check client.list_buckets() return True except Exception as e: logger.error(f"MinIO health check failed: {e}") return False