Files
OCR/openspec/changes/enhance-memory-management/design.md
egg ba8ddf2b68 feat: create OpenSpec proposal for enhanced memory management
- Create comprehensive proposal addressing OOM crashes and memory leaks
- Define 6 core areas: model lifecycle, service pooling, monitoring
- Add 58 implementation tasks across 8 sections
- Design ModelManager with reference counting and idle timeout
- Plan OCRServicePool for singleton service pattern
- Specify MemoryGuard for proactive memory monitoring
- Include concurrency controls and cleanup hooks
- Add spec deltas for ocr-processing and task-management
- Create detailed design document with architecture diagrams
- Define performance targets: 75% memory reduction, 4x concurrency

Critical improvements:
- Remove PP-StructureV3 permanent exemption from unloading
- Replace per-task OCRService instantiation with pooling
- Add real GPU memory monitoring (currently always returns True)
- Implement semaphore-based concurrency limits
- Add proper resource cleanup on task completion

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 15:21:32 +08:00

15 KiB

Design Document: Enhanced Memory Management

Architecture Overview

The enhanced memory management system introduces three core components that work together to prevent OOM crashes and optimize resource utilization:

┌─────────────────────────────────────────────────────────────┐
│                        Task Router                           │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Request → Queue → Acquire Service → Process → Release │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                     OCRServicePool                          │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │
│  │Service 1│  │Service 2│  │Service 3│  │Service 4│       │
│  │ GPU:0   │  │ GPU:0   │  │ GPU:1   │  │  CPU    │       │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘       │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                      ModelManager                           │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Models: {id → (instance, ref_count, last_used)}    │   │
│  │  Timeout Monitor → Unload Idle Models               │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                      MemoryGuard                            │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Monitor: GPU/CPU Memory Usage                       │   │
│  │  Actions: Warn → Throttle → Fallback → Emergency     │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Component Design

1. ModelManager

Purpose: Centralized model lifecycle management with reference counting and idle timeout.

Key Design Decisions:

  • Singleton Pattern: One ModelManager instance per application
  • Reference Counting: Track active users of each model
  • LRU Cache: Evict least recently used models when memory pressure
  • Lazy Loading: Load models only when first requested

Implementation:

class ModelManager:
    def __init__(self, config: ModelConfig):
        self.models: Dict[str, ModelEntry] = {}
        self.lock = asyncio.Lock()
        self.config = config
        self._start_timeout_monitor()

    async def load_model(self, model_id: str, params: Dict) -> Model:
        async with self.lock:
            if model_id in self.models:
                entry = self.models[model_id]
                entry.ref_count += 1
                entry.last_used = time.time()
                return entry.model

            # Check memory before loading
            if not await self.memory_guard.check_memory(params['estimated_memory']):
                await self._evict_idle_models()

            model = await self._create_model(model_id, params)
            self.models[model_id] = ModelEntry(
                model=model,
                ref_count=1,
                last_used=time.time()
            )
            return model

2. OCRServicePool

Purpose: Manage a pool of OCRService instances to prevent duplicate model loading.

Key Design Decisions:

  • Per-Device Pools: Separate pool for each GPU/CPU device
  • Semaphore Control: Limit concurrent usage per service
  • Queue Management: FIFO queue with timeout for waiting requests
  • Health Monitoring: Periodic health checks on pooled services

Implementation:

class OCRServicePool:
    def __init__(self, config: PoolConfig):
        self.pools: Dict[str, List[OCRService]] = {}
        self.semaphores: Dict[str, asyncio.Semaphore] = {}
        self.queues: Dict[str, asyncio.Queue] = {}
        self._initialize_pools()

    async def acquire(self, device: str = "GPU:0") -> OCRService:
        # Try to get from pool
        if device in self.pools and self.pools[device]:
            for service in self.pools[device]:
                if await service.try_acquire():
                    return service

        # Queue if pool exhausted
        return await self._wait_for_service(device)

3. MemoryGuard

Purpose: Monitor memory usage and trigger preventive actions.

Key Design Decisions:

  • Multi-Backend Support: paddle.device.cuda, pynvml, torch as fallbacks
  • Threshold System: Warning (80%), Critical (95%), Emergency (98%)
  • Predictive Allocation: Estimate memory before operations
  • Progressive Actions: Warn → Throttle → CPU Fallback → Reject

Implementation:

class MemoryGuard:
    def __init__(self, config: MemoryConfig):
        self.config = config
        self.backend = self._detect_backend()
        self._start_monitor()

    async def check_memory(self, required_mb: int = 0) -> bool:
        stats = await self.get_memory_stats()
        available = stats['gpu_free_mb']

        if available < required_mb:
            return False

        usage_ratio = stats['gpu_used_ratio']
        if usage_ratio > self.config.critical_threshold:
            await self._trigger_emergency_cleanup()
            return False

        if usage_ratio > self.config.warning_threshold:
            await self._trigger_warning()

        return True

Memory Optimization Strategies

1. PP-StructureV3 Specific Optimizations

Problem: PP-StructureV3 is permanently exempted from unloading (lines 255-267).

Solution:

# Remove exemption
def should_unload_model(model_id: str) -> bool:
    # Old: if model_id == "ppstructure_v3": return False
    # New: Apply same rules to all models
    return True

# Add proper cleanup
def unload_ppstructure_v3(engine: PPStructureV3):
    engine.table_engine = None
    engine.text_detector = None
    engine.text_recognizer = None
    paddle.device.cuda.empty_cache()

2. Batch Processing for Large Documents

Strategy: Process documents in configurable batches to limit memory usage.

async def process_large_document(doc_path: Path, batch_size: int = 10):
    total_pages = get_page_count(doc_path)

    for start_idx in range(0, total_pages, batch_size):
        end_idx = min(start_idx + batch_size, total_pages)

        # Process batch
        batch_results = await process_pages(doc_path, start_idx, end_idx)

        # Force cleanup between batches
        paddle.device.cuda.empty_cache()
        gc.collect()

        yield batch_results

3. Selective Feature Disabling

Strategy: Allow disabling memory-intensive features when under pressure.

class AdaptiveProcessing:
    def __init__(self):
        self.features = {
            'charts': True,
            'formulas': True,
            'tables': True,
            'layout': True
        }

    async def adapt_to_memory(self, available_mb: int):
        if available_mb < 1000:
            self.features['charts'] = False
            self.features['formulas'] = False
        if available_mb < 500:
            self.features['tables'] = False

Concurrency Management

1. Semaphore-Based Limiting

# Global semaphores
prediction_semaphore = asyncio.Semaphore(2)  # Max 2 concurrent predictions
processing_semaphore = asyncio.Semaphore(4)  # Max 4 concurrent OCR tasks

async def predict_with_structure(image, params=None):
    async with prediction_semaphore:
        # Memory check before prediction
        required_mb = estimate_prediction_memory(image.shape)
        if not await memory_guard.check_memory(required_mb):
            raise MemoryError("Insufficient memory for prediction")

        return await pp_structure.predict(image, params)

2. Queue-Based Task Distribution

class TaskDistributor:
    def __init__(self):
        self.queues = {
            'high': asyncio.Queue(maxsize=10),
            'normal': asyncio.Queue(maxsize=50),
            'low': asyncio.Queue(maxsize=100)
        }

    async def distribute_task(self, task: Task):
        priority = self._calculate_priority(task)
        queue = self.queues[priority]

        try:
            await asyncio.wait_for(
                queue.put(task),
                timeout=self.config.queue_timeout
            )
        except asyncio.TimeoutError:
            raise QueueFullError(f"Queue {priority} is full")

Monitoring and Metrics

1. Memory Metrics Collection

class MemoryMetrics:
    def __init__(self):
        self.history = deque(maxlen=1000)
        self.alerts = []

    async def collect(self):
        stats = {
            'timestamp': time.time(),
            'gpu_used_mb': get_gpu_memory_used(),
            'gpu_free_mb': get_gpu_memory_free(),
            'cpu_used_mb': get_cpu_memory_used(),
            'models_loaded': len(model_manager.models),
            'active_tasks': len(active_tasks),
            'pool_utilization': get_pool_utilization()
        }
        self.history.append(stats)
        await self._check_alerts(stats)

2. Monitoring Dashboard Endpoints

@router.get("/admin/memory/stats")
async def get_memory_stats():
    return {
        'current': memory_metrics.get_current(),
        'history': memory_metrics.get_history(minutes=5),
        'alerts': memory_metrics.get_active_alerts(),
        'recommendations': memory_optimizer.get_recommendations()
    }

@router.post("/admin/memory/gc")
async def trigger_garbage_collection():
    """Manual garbage collection trigger"""
    results = await memory_manager.force_cleanup()
    return {'freed_mb': results['freed'], 'models_unloaded': results['models']}

Error Recovery

1. OOM Recovery Strategy

class OOMRecovery:
    async def recover(self, error: Exception, task: Task):
        logger.error(f"OOM detected for task {task.id}: {error}")

        # Step 1: Emergency cleanup
        await self.emergency_cleanup()

        # Step 2: Try CPU fallback
        if self.config.enable_cpu_fallback:
            task.device = "CPU"
            return await self.retry_on_cpu(task)

        # Step 3: Reduce batch size and retry
        if task.batch_size > 1:
            task.batch_size = max(1, task.batch_size // 2)
            return await self.retry_with_reduced_batch(task)

        # Step 4: Fail gracefully
        await self.mark_task_failed(task, "Insufficient memory")

2. Service Recovery

class ServiceRecovery:
    async def restart_service(self, service_id: str):
        """Restart a failed service"""
        # Kill existing process
        await self.kill_service_process(service_id)

        # Clear service memory
        await self.clear_service_cache(service_id)

        # Restart with fresh state
        new_service = await self.create_service(service_id)
        await self.pool.replace_service(service_id, new_service)

Testing Strategy

1. Memory Leak Detection

@pytest.mark.memory
async def test_no_memory_leak():
    initial_memory = get_memory_usage()

    # Process 100 tasks
    for _ in range(100):
        task = create_test_task()
        await process_task(task)

    # Force cleanup
    await cleanup_all()
    gc.collect()

    final_memory = get_memory_usage()
    leak = final_memory - initial_memory

    assert leak < 100  # Max 100MB leak tolerance

2. Stress Testing

@pytest.mark.stress
async def test_concurrent_load():
    tasks = [create_large_task() for _ in range(50)]

    # Should handle gracefully without OOM
    results = await asyncio.gather(
        *[process_task(t) for t in tasks],
        return_exceptions=True
    )

    # Some may fail but system should remain stable
    successful = sum(1 for r in results if not isinstance(r, Exception))
    assert successful > 0
    assert await health_check() == "healthy"

Performance Targets

Metric Current Target Improvement
Memory per task 2-4 GB 0.5-1 GB 75% reduction
Concurrent tasks 1-2 4-8 4x increase
Model load time 30-60s 5-10s (cached) 6x faster
OOM crashes/day 5-10 0-1 90% reduction
Service uptime 4-8 hours 24+ hours 3x improvement

Rollout Plan

Phase 1: Foundation (Week 1)

  • Implement ModelManager
  • Integrate with existing OCRService
  • Add basic memory monitoring

Phase 2: Pooling (Week 2)

  • Implement OCRServicePool
  • Update task router
  • Add concurrency limits

Phase 3: Optimization (Week 3)

  • Add MemoryGuard
  • Implement adaptive processing
  • Add batch processing

Phase 4: Hardening (Week 4)

  • Stress testing
  • Performance tuning
  • Documentation and monitoring