Files
OCR/openspec/changes/archive/2025-11-26-enhance-memory-management/design.md
egg a227311b2d chore: archive enhance-memory-management proposal (75/80 tasks)
Archive incomplete proposal for later continuation.
OCR processing has known quality issues to be addressed in future work.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 16:10:45 +08:00

21 KiB

Design Document: Enhanced Memory Management

Architecture Overview

The enhanced memory management system introduces three core components that work together to prevent OOM crashes and optimize resource utilization:

┌─────────────────────────────────────────────────────────────┐
│                        Task Router                           │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Request → Queue → Acquire Service → Process → Release │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                     OCRServicePool                          │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │
│  │Service 1│  │Service 2│  │Service 3│  │Service 4│       │
│  │ GPU:0   │  │ GPU:0   │  │ GPU:1   │  │  CPU    │       │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘       │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                      ModelManager                           │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Models: {id → (instance, ref_count, last_used)}    │   │
│  │  Timeout Monitor → Unload Idle Models               │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                      MemoryGuard                            │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Monitor: GPU/CPU Memory Usage                       │   │
│  │  Actions: Warn → Throttle → Fallback → Emergency     │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Component Design

1. ModelManager

Purpose: Centralized model lifecycle management with reference counting and idle timeout.

Key Design Decisions:

  • Singleton Pattern: One ModelManager instance per application
  • Reference Counting: Track active users of each model
  • LRU Cache: Evict least recently used models when memory pressure
  • Lazy Loading: Load models only when first requested

Implementation:

class ModelManager:
    def __init__(self, config: ModelConfig):
        self.models: Dict[str, ModelEntry] = {}
        self.lock = asyncio.Lock()
        self.config = config
        self._start_timeout_monitor()

    async def load_model(self, model_id: str, params: Dict) -> Model:
        async with self.lock:
            if model_id in self.models:
                entry = self.models[model_id]
                entry.ref_count += 1
                entry.last_used = time.time()
                return entry.model

            # Check memory before loading
            if not await self.memory_guard.check_memory(params['estimated_memory']):
                await self._evict_idle_models()

            model = await self._create_model(model_id, params)
            self.models[model_id] = ModelEntry(
                model=model,
                ref_count=1,
                last_used=time.time()
            )
            return model

2. OCRServicePool

Purpose: Manage a pool of OCRService instances to prevent duplicate model loading.

Key Design Decisions:

  • Per-Device Pools: Separate pool for each GPU/CPU device
  • Semaphore Control: Limit concurrent usage per service
  • Queue Management: FIFO queue with timeout for waiting requests
  • Health Monitoring: Periodic health checks on pooled services

Implementation:

class OCRServicePool:
    def __init__(self, config: PoolConfig):
        self.pools: Dict[str, List[OCRService]] = {}
        self.semaphores: Dict[str, asyncio.Semaphore] = {}
        self.queues: Dict[str, asyncio.Queue] = {}
        self._initialize_pools()

    async def acquire(self, device: str = "GPU:0") -> OCRService:
        # Try to get from pool
        if device in self.pools and self.pools[device]:
            for service in self.pools[device]:
                if await service.try_acquire():
                    return service

        # Queue if pool exhausted
        return await self._wait_for_service(device)

3. MemoryGuard

Purpose: Monitor memory usage and trigger preventive actions.

Key Design Decisions:

  • Multi-Backend Support: paddle.device.cuda, pynvml, torch as fallbacks
  • Threshold System: Warning (80%), Critical (95%), Emergency (98%)
  • Predictive Allocation: Estimate memory before operations
  • Progressive Actions: Warn → Throttle → CPU Fallback → Reject

Implementation:

class MemoryGuard:
    def __init__(self, config: MemoryConfig):
        self.config = config
        self.backend = self._detect_backend()
        self._start_monitor()

    async def check_memory(self, required_mb: int = 0) -> bool:
        stats = await self.get_memory_stats()
        available = stats['gpu_free_mb']

        if available < required_mb:
            return False

        usage_ratio = stats['gpu_used_ratio']
        if usage_ratio > self.config.critical_threshold:
            await self._trigger_emergency_cleanup()
            return False

        if usage_ratio > self.config.warning_threshold:
            await self._trigger_warning()

        return True

Memory Optimization Strategies

1. PP-StructureV3 Specific Optimizations

Problem: PP-StructureV3 is permanently exempted from unloading (lines 255-267).

Solution:

# Remove exemption
def should_unload_model(model_id: str) -> bool:
    # Old: if model_id == "ppstructure_v3": return False
    # New: Apply same rules to all models
    return True

# Add proper cleanup
def unload_ppstructure_v3(engine: PPStructureV3):
    engine.table_engine = None
    engine.text_detector = None
    engine.text_recognizer = None
    paddle.device.cuda.empty_cache()

2. Batch Processing for Large Documents

Strategy: Process documents in configurable batches to limit memory usage.

async def process_large_document(doc_path: Path, batch_size: int = 10):
    total_pages = get_page_count(doc_path)

    for start_idx in range(0, total_pages, batch_size):
        end_idx = min(start_idx + batch_size, total_pages)

        # Process batch
        batch_results = await process_pages(doc_path, start_idx, end_idx)

        # Force cleanup between batches
        paddle.device.cuda.empty_cache()
        gc.collect()

        yield batch_results

3. Selective Feature Disabling

Strategy: Allow disabling memory-intensive features when under pressure.

class AdaptiveProcessing:
    def __init__(self):
        self.features = {
            'charts': True,
            'formulas': True,
            'tables': True,
            'layout': True
        }

    async def adapt_to_memory(self, available_mb: int):
        if available_mb < 1000:
            self.features['charts'] = False
            self.features['formulas'] = False
        if available_mb < 500:
            self.features['tables'] = False

Concurrency Management

1. Semaphore-Based Limiting

# Global semaphores
prediction_semaphore = asyncio.Semaphore(2)  # Max 2 concurrent predictions
processing_semaphore = asyncio.Semaphore(4)  # Max 4 concurrent OCR tasks

async def predict_with_structure(image, params=None):
    async with prediction_semaphore:
        # Memory check before prediction
        required_mb = estimate_prediction_memory(image.shape)
        if not await memory_guard.check_memory(required_mb):
            raise MemoryError("Insufficient memory for prediction")

        return await pp_structure.predict(image, params)

2. Queue-Based Task Distribution

class TaskDistributor:
    def __init__(self):
        self.queues = {
            'high': asyncio.Queue(maxsize=10),
            'normal': asyncio.Queue(maxsize=50),
            'low': asyncio.Queue(maxsize=100)
        }

    async def distribute_task(self, task: Task):
        priority = self._calculate_priority(task)
        queue = self.queues[priority]

        try:
            await asyncio.wait_for(
                queue.put(task),
                timeout=self.config.queue_timeout
            )
        except asyncio.TimeoutError:
            raise QueueFullError(f"Queue {priority} is full")

Monitoring and Metrics

1. Memory Metrics Collection

class MemoryMetrics:
    def __init__(self):
        self.history = deque(maxlen=1000)
        self.alerts = []

    async def collect(self):
        stats = {
            'timestamp': time.time(),
            'gpu_used_mb': get_gpu_memory_used(),
            'gpu_free_mb': get_gpu_memory_free(),
            'cpu_used_mb': get_cpu_memory_used(),
            'models_loaded': len(model_manager.models),
            'active_tasks': len(active_tasks),
            'pool_utilization': get_pool_utilization()
        }
        self.history.append(stats)
        await self._check_alerts(stats)

2. Monitoring Dashboard Endpoints

@router.get("/admin/memory/stats")
async def get_memory_stats():
    return {
        'current': memory_metrics.get_current(),
        'history': memory_metrics.get_history(minutes=5),
        'alerts': memory_metrics.get_active_alerts(),
        'recommendations': memory_optimizer.get_recommendations()
    }

@router.post("/admin/memory/gc")
async def trigger_garbage_collection():
    """Manual garbage collection trigger"""
    results = await memory_manager.force_cleanup()
    return {'freed_mb': results['freed'], 'models_unloaded': results['models']}

Error Recovery

1. OOM Recovery Strategy

class OOMRecovery:
    async def recover(self, error: Exception, task: Task):
        logger.error(f"OOM detected for task {task.id}: {error}")

        # Step 1: Emergency cleanup
        await self.emergency_cleanup()

        # Step 2: Try CPU fallback
        if self.config.enable_cpu_fallback:
            task.device = "CPU"
            return await self.retry_on_cpu(task)

        # Step 3: Reduce batch size and retry
        if task.batch_size > 1:
            task.batch_size = max(1, task.batch_size // 2)
            return await self.retry_with_reduced_batch(task)

        # Step 4: Fail gracefully
        await self.mark_task_failed(task, "Insufficient memory")

2. Service Recovery

class ServiceRecovery:
    async def restart_service(self, service_id: str):
        """Restart a failed service"""
        # Kill existing process
        await self.kill_service_process(service_id)

        # Clear service memory
        await self.clear_service_cache(service_id)

        # Restart with fresh state
        new_service = await self.create_service(service_id)
        await self.pool.replace_service(service_id, new_service)

Testing Strategy

1. Memory Leak Detection

@pytest.mark.memory
async def test_no_memory_leak():
    initial_memory = get_memory_usage()

    # Process 100 tasks
    for _ in range(100):
        task = create_test_task()
        await process_task(task)

    # Force cleanup
    await cleanup_all()
    gc.collect()

    final_memory = get_memory_usage()
    leak = final_memory - initial_memory

    assert leak < 100  # Max 100MB leak tolerance

2. Stress Testing

@pytest.mark.stress
async def test_concurrent_load():
    tasks = [create_large_task() for _ in range(50)]

    # Should handle gracefully without OOM
    results = await asyncio.gather(
        *[process_task(t) for t in tasks],
        return_exceptions=True
    )

    # Some may fail but system should remain stable
    successful = sum(1 for r in results if not isinstance(r, Exception))
    assert successful > 0
    assert await health_check() == "healthy"

Performance Targets

Metric Current Target Improvement
Memory per task 2-4 GB 0.5-1 GB 75% reduction
Concurrent tasks 1-2 4-8 4x increase
Model load time 30-60s 5-10s (cached) 6x faster
OOM crashes/day 5-10 0-1 90% reduction
Service uptime 4-8 hours 24+ hours 3x improvement

Rollout Plan

Phase 1: Foundation (Week 1)

  • Implement ModelManager
  • Integrate with existing OCRService
  • Add basic memory monitoring

Phase 2: Pooling (Week 2)

  • Implement OCRServicePool
  • Update task router
  • Add concurrency limits

Phase 3: Optimization (Week 3)

  • Add MemoryGuard
  • Implement adaptive processing
  • Add batch processing

Phase 4: Hardening (Week 4)

  • Stress testing
  • Performance tuning
  • Documentation and monitoring

Configuration Settings Reference

All memory management settings are defined in backend/app/core/config.py under the Settings class.

Memory Thresholds

Setting Type Default Description
memory_warning_threshold float 0.80 GPU memory usage ratio (0-1) to trigger warning alerts
memory_critical_threshold float 0.95 GPU memory ratio to start throttling operations
memory_emergency_threshold float 0.98 GPU memory ratio to trigger emergency cleanup

Memory Monitoring

Setting Type Default Description
memory_check_interval_seconds int 30 Background check interval for memory monitoring
enable_memory_alerts bool True Enable/disable memory threshold alerts
gpu_memory_limit_mb int 6144 Maximum GPU memory to use (MB)
gpu_memory_reserve_mb int 512 Memory reserved for CUDA overhead

Model Lifecycle Management

Setting Type Default Description
enable_model_lifecycle_management bool True Use ModelManager for model lifecycle
model_idle_timeout_seconds int 300 Unload models after idle time
pp_structure_idle_timeout_seconds int 300 Unload PP-StructureV3 after idle
structure_model_memory_mb int 2000 Estimated memory for PP-StructureV3
ocr_model_memory_mb int 500 Estimated memory per OCR language model
enable_lazy_model_loading bool True Load models on demand
auto_unload_unused_models bool True Auto-unload unused language models

Service Pool Configuration

Setting Type Default Description
enable_service_pool bool True Use OCRServicePool
max_services_per_device int 1 Max OCRService instances per GPU
max_total_services int 2 Max total OCRService instances
service_acquire_timeout_seconds float 300.0 Timeout for acquiring service from pool
max_queue_size int 50 Max pending tasks per device queue

Concurrency Control

Setting Type Default Description
max_concurrent_predictions int 2 Max concurrent PP-StructureV3 predictions
max_concurrent_pages int 2 Max pages processed concurrently
inference_batch_size int 1 Batch size for inference
enable_batch_processing bool True Enable batch processing for large docs

Recovery Settings

Setting Type Default Description
enable_cpu_fallback bool True Fall back to CPU when GPU memory low
enable_emergency_cleanup bool True Auto-cleanup on memory pressure
enable_worker_restart bool False Restart workers on OOM (requires supervisor)

Feature Flags

Setting Type Default Description
enable_chart_recognition bool True Enable chart/diagram recognition
enable_formula_recognition bool True Enable math formula recognition
enable_table_recognition bool True Enable table structure recognition
enable_seal_recognition bool True Enable seal/stamp recognition
enable_text_recognition bool True Enable general text recognition
enable_memory_optimization bool True Enable memory optimizations

Environment Variable Override

All settings can be overridden via environment variables. The format is uppercase with underscores:

# Example .env file
MEMORY_WARNING_THRESHOLD=0.75
MEMORY_CRITICAL_THRESHOLD=0.90
MAX_CONCURRENT_PREDICTIONS=1
GPU_MEMORY_LIMIT_MB=4096
ENABLE_CPU_FALLBACK=true

RTX 4060 8GB (Default)

GPU_MEMORY_LIMIT_MB=6144
MAX_CONCURRENT_PREDICTIONS=2
MAX_CONCURRENT_PAGES=2
INFERENCE_BATCH_SIZE=1

RTX 3090 24GB

GPU_MEMORY_LIMIT_MB=20480
MAX_CONCURRENT_PREDICTIONS=4
MAX_CONCURRENT_PAGES=4
INFERENCE_BATCH_SIZE=2

CPU-Only Mode

FORCE_CPU_MODE=true
MAX_CONCURRENT_PREDICTIONS=1
ENABLE_CPU_FALLBACK=false

Prometheus Metrics

The system exports Prometheus-format metrics via the PrometheusMetrics class. Available metrics:

GPU Metrics

  • tool_ocr_memory_gpu_total_bytes - Total GPU memory
  • tool_ocr_memory_gpu_used_bytes - Used GPU memory
  • tool_ocr_memory_gpu_free_bytes - Free GPU memory
  • tool_ocr_memory_gpu_utilization_ratio - GPU utilization (0-1)

Model Metrics

  • tool_ocr_memory_models_loaded_total - Number of loaded models
  • tool_ocr_memory_models_memory_bytes - Total memory used by models
  • tool_ocr_memory_model_ref_count{model_id} - Reference count per model

Prediction Metrics

  • tool_ocr_memory_predictions_active - Currently active predictions
  • tool_ocr_memory_predictions_queue_depth - Predictions waiting in queue
  • tool_ocr_memory_predictions_total - Total predictions processed (counter)
  • tool_ocr_memory_predictions_timeouts_total - Total prediction timeouts (counter)

Pool Metrics

  • tool_ocr_memory_pool_services_total - Total services in pool
  • tool_ocr_memory_pool_services_available - Available services
  • tool_ocr_memory_pool_services_in_use - Services in use
  • tool_ocr_memory_pool_acquisitions_total - Total acquisitions (counter)

Recovery Metrics

  • tool_ocr_memory_recovery_count_total - Total recovery attempts
  • tool_ocr_memory_recovery_in_cooldown - In cooldown (0/1)
  • tool_ocr_memory_recovery_cooldown_remaining_seconds - Remaining cooldown

Memory Dump API

The MemoryDumper class provides debugging capabilities:

from app.services.memory_manager import get_memory_dumper

dumper = get_memory_dumper()

# Create a memory dump
dump = dumper.create_dump(include_python_objects=True)

# Get dump as dictionary for JSON serialization
dump_dict = dumper.to_dict(dump)

# Compare two dumps to detect memory growth
comparison = dumper.compare_dumps(dump1, dump2)

Memory dumps include:

  • GPU/CPU memory usage
  • Loaded models and reference counts
  • Active predictions and queue state
  • Service pool statistics
  • Recovery manager state
  • Python GC statistics
  • Large Python objects (optional)