chore: archive enhance-memory-management proposal (75/80 tasks)

Archive incomplete proposal for later continuation.
OCR processing has known quality issues to be addressed in future work.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-11-26 16:10:45 +08:00
parent fa9b542b06
commit a227311b2d
6 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,146 @@
# Spec Delta: ocr-processing
## Changes to OCR Processing Specification
### 1. Model Lifecycle Management
#### Added: ModelManager Class
```python
class ModelManager:
"""Manages model lifecycle with reference counting and idle timeout"""
def load_model(self, model_id: str, config: Dict) -> Model
"""Load a model or return existing instance with ref count++"""
def unload_model(self, model_id: str) -> None
"""Decrement ref count and unload if zero"""
def get_model(self, model_id: str) -> Optional[Model]
"""Get model instance if loaded"""
def teardown(self) -> None
"""Force unload all models immediately"""
```
#### Modified: PPStructureV3 Integration
- Remove permanent exemption from unloading (lines 255-267)
- Wrap PP-StructureV3 in ModelManager
- Support lazy loading on first access
- Add unload capability with cache clearing
### 2. Service Architecture
#### Added: OCRServicePool
```python
class OCRServicePool:
"""Pool of OCRService instances (one per device)"""
def acquire(self, device: str = "GPU:0") -> OCRService
"""Get service from pool with semaphore control"""
def release(self, service: OCRService) -> None
"""Return service to pool"""
```
#### Modified: OCRService Instantiation
- Replace direct instantiation with pool.acquire()
- Add finally blocks for pool.release()
- Handle pool exhaustion gracefully
### 3. Memory Management
#### Added: MemoryGuard Class
```python
class MemoryGuard:
"""Monitor and control memory usage"""
def check_memory(self, required_mb: int = 0) -> bool
"""Check if sufficient memory available"""
def get_memory_stats(self) -> Dict
"""Get current memory usage statistics"""
def predict_memory(self, operation: str, params: Dict) -> int
"""Predict memory requirement for operation"""
```
#### Modified: Processing Flow
- Add memory checks before operations
- Implement CPU fallback when GPU memory low
- Add progressive loading for multi-page documents
### 4. Concurrency Control
#### Added: Prediction Semaphores
```python
# Maximum concurrent PP-StructureV3 predictions
MAX_CONCURRENT_PREDICTIONS = 2
prediction_semaphore = asyncio.Semaphore(MAX_CONCURRENT_PREDICTIONS)
async def predict_with_limit(self, image, custom_params=None):
async with prediction_semaphore:
return await self._predict(image, custom_params)
```
#### Added: Selective Processing
```python
class ProcessingConfig:
enable_charts: bool = True
enable_formulas: bool = True
enable_tables: bool = True
batch_size: int = 10 # Pages per batch
```
### 5. Resource Cleanup
#### Added: Cleanup Hooks
```python
@app.on_event("shutdown")
async def shutdown_handler():
"""Graceful shutdown with model unloading"""
await model_manager.teardown()
await service_pool.shutdown()
```
#### Modified: Task Completion
```python
async def process_task(task_id: str):
service = None
try:
service = await pool.acquire()
# ... processing ...
finally:
if service:
await pool.release(service)
await cleanup_task_resources(task_id)
```
## Configuration Changes
### Added Settings
```yaml
memory:
gpu_threshold_warning: 0.8 # 80% usage
gpu_threshold_critical: 0.95 # 95% usage
model_idle_timeout: 300 # 5 minutes
enable_memory_monitor: true
monitor_interval: 10 # seconds
pool:
max_services_per_device: 2
queue_timeout: 60 # seconds
concurrency:
max_predictions: 2
max_batch_size: 10
```
## Breaking Changes
None - All changes are backward compatible optimizations.
## Migration Path
1. Deploy new code with default settings (no config changes needed)
2. Monitor memory metrics via new endpoints
3. Tune parameters based on workload
4. Enable selective processing if needed

View File

@@ -0,0 +1,225 @@
# Spec Delta: task-management
## Changes to Task Management Specification
### 1. Task Resource Management
#### Modified: Task Creation
```python
class TaskManager:
def create_task(self, request: TaskCreateRequest) -> Task:
"""Create task with resource estimation"""
task = Task(...)
task.estimated_memory_mb = self._estimate_memory(request)
task.assigned_device = self._select_device(task.estimated_memory_mb)
return task
```
#### Added: Resource Tracking
```python
class Task(BaseModel):
# Existing fields...
# New resource tracking fields
estimated_memory_mb: Optional[int] = None
actual_memory_mb: Optional[int] = None
assigned_device: Optional[str] = None
service_instance_id: Optional[str] = None
resource_cleanup_completed: bool = False
```
### 2. Task Execution
#### Modified: Task Router
```python
@router.post("/tasks/{task_id}/start")
async def start_task(task_id: str, params: TaskStartRequest):
# Old approach - creates new service
# service = OCRService(device=device)
# New approach - uses pooled service
service = await service_pool.acquire(device=params.device)
try:
result = await service.process(task_id, params)
finally:
await service_pool.release(service)
```
#### Added: Task Queue Management
```python
class TaskQueue:
"""Priority queue for task execution"""
def add_task(self, task: Task, priority: int = 0):
"""Add task to queue with priority"""
def get_next_task(self, device: str) -> Optional[Task]:
"""Get next task for specific device"""
def requeue_task(self, task: Task):
"""Re-add failed task with lower priority"""
```
### 3. Background Task Processing
#### Modified: Background Task Wrapper
```python
async def process_document_task(task_id: str, background_tasks: BackgroundTasks):
"""Enhanced background task with cleanup"""
# Register cleanup callback
def cleanup():
asyncio.create_task(cleanup_task_resources(task_id))
background_tasks.add_task(
_process_with_cleanup,
task_id,
on_complete=cleanup,
on_error=cleanup
)
```
#### Added: Task Resource Cleanup
```python
async def cleanup_task_resources(task_id: str):
"""Release all resources associated with task"""
- Clear task-specific caches
- Release temporary files
- Update resource tracking
- Log cleanup completion
```
### 4. Task Monitoring
#### Added: Task Metrics Endpoint
```python
@router.get("/tasks/metrics")
async def get_task_metrics():
return {
"active_tasks": {...},
"queued_tasks": {...},
"memory_by_device": {...},
"pool_utilization": {...},
"average_wait_time": ...
}
```
#### Added: Task Health Checks
```python
@router.get("/tasks/{task_id}/health")
async def get_task_health(task_id: str):
return {
"status": "...",
"memory_usage_mb": ...,
"processing_time_s": ...,
"device": "...",
"warnings": [...]
}
```
### 5. Error Handling
#### Added: Memory-Based Error Recovery
```python
class TaskErrorHandler:
async def handle_oom_error(self, task: Task):
"""Handle out-of-memory errors"""
- Log memory state at failure
- Attempt CPU fallback if configured
- Requeue with reduced batch size
- Alert monitoring system
```
#### Modified: Task Failure Reasons
```python
class TaskFailureReason(Enum):
# Existing reasons...
# New memory-related reasons
OUT_OF_MEMORY = "out_of_memory"
POOL_EXHAUSTED = "pool_exhausted"
DEVICE_UNAVAILABLE = "device_unavailable"
MEMORY_LIMIT_EXCEEDED = "memory_limit_exceeded"
```
### 6. Task Lifecycle Events
#### Added: Resource Events
```python
class TaskEvent(Enum):
# Existing events...
# New resource events
RESOURCE_ACQUIRED = "resource_acquired"
RESOURCE_RELEASED = "resource_released"
MEMORY_WARNING = "memory_warning"
CLEANUP_STARTED = "cleanup_started"
CLEANUP_COMPLETED = "cleanup_completed"
```
#### Added: Event Handlers
```python
async def on_task_resource_acquired(task_id: str, resource: Dict):
"""Log and track resource acquisition"""
async def on_task_cleanup_completed(task_id: str):
"""Verify cleanup and update status"""
```
## Database Schema Changes
### Task Table Updates
```sql
ALTER TABLE tasks ADD COLUMN estimated_memory_mb INTEGER;
ALTER TABLE tasks ADD COLUMN actual_memory_mb INTEGER;
ALTER TABLE tasks ADD COLUMN assigned_device VARCHAR(50);
ALTER TABLE tasks ADD COLUMN service_instance_id VARCHAR(100);
ALTER TABLE tasks ADD COLUMN resource_cleanup_completed BOOLEAN DEFAULT FALSE;
```
### New Tables
```sql
CREATE TABLE task_metrics (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) REFERENCES tasks(id),
timestamp TIMESTAMP,
memory_usage_mb INTEGER,
device VARCHAR(50),
processing_stage VARCHAR(100)
);
CREATE TABLE task_events (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) REFERENCES tasks(id),
event_type VARCHAR(50),
timestamp TIMESTAMP,
details JSONB
);
```
## Configuration Changes
### Added Task Settings
```yaml
tasks:
max_queue_size: 100
queue_timeout_seconds: 300
enable_priority_queue: true
enable_resource_tracking: true
cleanup_timeout_seconds: 30
retry:
max_attempts: 3
backoff_multiplier: 2
memory_reduction_factor: 0.5
```
## Breaking Changes
None - All changes maintain backward compatibility.
## Migration Requirements
1. Run database migrations to add new columns
2. Deploy updated task router code
3. Configure pool settings based on hardware
4. Enable monitoring endpoints
5. Test cleanup hooks in staging environment

View File

@@ -0,0 +1,587 @@
# Design Document: Enhanced Memory Management
## Architecture Overview
The enhanced memory management system introduces three core components that work together to prevent OOM crashes and optimize resource utilization:
```
┌─────────────────────────────────────────────────────────────┐
│ Task Router │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Request → Queue → Acquire Service → Process → Release │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ OCRServicePool │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Service 1│ │Service 2│ │Service 3│ │Service 4│ │
│ │ GPU:0 │ │ GPU:0 │ │ GPU:1 │ │ CPU │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ModelManager │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Models: {id → (instance, ref_count, last_used)} │ │
│ │ Timeout Monitor → Unload Idle Models │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ MemoryGuard │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Monitor: GPU/CPU Memory Usage │ │
│ │ Actions: Warn → Throttle → Fallback → Emergency │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## Component Design
### 1. ModelManager
**Purpose**: Centralized model lifecycle management with reference counting and idle timeout.
**Key Design Decisions**:
- **Singleton Pattern**: One ModelManager instance per application
- **Reference Counting**: Track active users of each model
- **LRU Cache**: Evict least recently used models when memory pressure
- **Lazy Loading**: Load models only when first requested
**Implementation**:
```python
class ModelManager:
def __init__(self, config: ModelConfig):
self.models: Dict[str, ModelEntry] = {}
self.lock = asyncio.Lock()
self.config = config
self._start_timeout_monitor()
async def load_model(self, model_id: str, params: Dict) -> Model:
async with self.lock:
if model_id in self.models:
entry = self.models[model_id]
entry.ref_count += 1
entry.last_used = time.time()
return entry.model
# Check memory before loading
if not await self.memory_guard.check_memory(params['estimated_memory']):
await self._evict_idle_models()
model = await self._create_model(model_id, params)
self.models[model_id] = ModelEntry(
model=model,
ref_count=1,
last_used=time.time()
)
return model
```
### 2. OCRServicePool
**Purpose**: Manage a pool of OCRService instances to prevent duplicate model loading.
**Key Design Decisions**:
- **Per-Device Pools**: Separate pool for each GPU/CPU device
- **Semaphore Control**: Limit concurrent usage per service
- **Queue Management**: FIFO queue with timeout for waiting requests
- **Health Monitoring**: Periodic health checks on pooled services
**Implementation**:
```python
class OCRServicePool:
def __init__(self, config: PoolConfig):
self.pools: Dict[str, List[OCRService]] = {}
self.semaphores: Dict[str, asyncio.Semaphore] = {}
self.queues: Dict[str, asyncio.Queue] = {}
self._initialize_pools()
async def acquire(self, device: str = "GPU:0") -> OCRService:
# Try to get from pool
if device in self.pools and self.pools[device]:
for service in self.pools[device]:
if await service.try_acquire():
return service
# Queue if pool exhausted
return await self._wait_for_service(device)
```
### 3. MemoryGuard
**Purpose**: Monitor memory usage and trigger preventive actions.
**Key Design Decisions**:
- **Multi-Backend Support**: paddle.device.cuda, pynvml, torch as fallbacks
- **Threshold System**: Warning (80%), Critical (95%), Emergency (98%)
- **Predictive Allocation**: Estimate memory before operations
- **Progressive Actions**: Warn → Throttle → CPU Fallback → Reject
**Implementation**:
```python
class MemoryGuard:
def __init__(self, config: MemoryConfig):
self.config = config
self.backend = self._detect_backend()
self._start_monitor()
async def check_memory(self, required_mb: int = 0) -> bool:
stats = await self.get_memory_stats()
available = stats['gpu_free_mb']
if available < required_mb:
return False
usage_ratio = stats['gpu_used_ratio']
if usage_ratio > self.config.critical_threshold:
await self._trigger_emergency_cleanup()
return False
if usage_ratio > self.config.warning_threshold:
await self._trigger_warning()
return True
```
## Memory Optimization Strategies
### 1. PP-StructureV3 Specific Optimizations
**Problem**: PP-StructureV3 is permanently exempted from unloading (lines 255-267).
**Solution**:
```python
# Remove exemption
def should_unload_model(model_id: str) -> bool:
# Old: if model_id == "ppstructure_v3": return False
# New: Apply same rules to all models
return True
# Add proper cleanup
def unload_ppstructure_v3(engine: PPStructureV3):
engine.table_engine = None
engine.text_detector = None
engine.text_recognizer = None
paddle.device.cuda.empty_cache()
```
### 2. Batch Processing for Large Documents
**Strategy**: Process documents in configurable batches to limit memory usage.
```python
async def process_large_document(doc_path: Path, batch_size: int = 10):
total_pages = get_page_count(doc_path)
for start_idx in range(0, total_pages, batch_size):
end_idx = min(start_idx + batch_size, total_pages)
# Process batch
batch_results = await process_pages(doc_path, start_idx, end_idx)
# Force cleanup between batches
paddle.device.cuda.empty_cache()
gc.collect()
yield batch_results
```
### 3. Selective Feature Disabling
**Strategy**: Allow disabling memory-intensive features when under pressure.
```python
class AdaptiveProcessing:
def __init__(self):
self.features = {
'charts': True,
'formulas': True,
'tables': True,
'layout': True
}
async def adapt_to_memory(self, available_mb: int):
if available_mb < 1000:
self.features['charts'] = False
self.features['formulas'] = False
if available_mb < 500:
self.features['tables'] = False
```
## Concurrency Management
### 1. Semaphore-Based Limiting
```python
# Global semaphores
prediction_semaphore = asyncio.Semaphore(2) # Max 2 concurrent predictions
processing_semaphore = asyncio.Semaphore(4) # Max 4 concurrent OCR tasks
async def predict_with_structure(image, params=None):
async with prediction_semaphore:
# Memory check before prediction
required_mb = estimate_prediction_memory(image.shape)
if not await memory_guard.check_memory(required_mb):
raise MemoryError("Insufficient memory for prediction")
return await pp_structure.predict(image, params)
```
### 2. Queue-Based Task Distribution
```python
class TaskDistributor:
def __init__(self):
self.queues = {
'high': asyncio.Queue(maxsize=10),
'normal': asyncio.Queue(maxsize=50),
'low': asyncio.Queue(maxsize=100)
}
async def distribute_task(self, task: Task):
priority = self._calculate_priority(task)
queue = self.queues[priority]
try:
await asyncio.wait_for(
queue.put(task),
timeout=self.config.queue_timeout
)
except asyncio.TimeoutError:
raise QueueFullError(f"Queue {priority} is full")
```
## Monitoring and Metrics
### 1. Memory Metrics Collection
```python
class MemoryMetrics:
def __init__(self):
self.history = deque(maxlen=1000)
self.alerts = []
async def collect(self):
stats = {
'timestamp': time.time(),
'gpu_used_mb': get_gpu_memory_used(),
'gpu_free_mb': get_gpu_memory_free(),
'cpu_used_mb': get_cpu_memory_used(),
'models_loaded': len(model_manager.models),
'active_tasks': len(active_tasks),
'pool_utilization': get_pool_utilization()
}
self.history.append(stats)
await self._check_alerts(stats)
```
### 2. Monitoring Dashboard Endpoints
```python
@router.get("/admin/memory/stats")
async def get_memory_stats():
return {
'current': memory_metrics.get_current(),
'history': memory_metrics.get_history(minutes=5),
'alerts': memory_metrics.get_active_alerts(),
'recommendations': memory_optimizer.get_recommendations()
}
@router.post("/admin/memory/gc")
async def trigger_garbage_collection():
"""Manual garbage collection trigger"""
results = await memory_manager.force_cleanup()
return {'freed_mb': results['freed'], 'models_unloaded': results['models']}
```
## Error Recovery
### 1. OOM Recovery Strategy
```python
class OOMRecovery:
async def recover(self, error: Exception, task: Task):
logger.error(f"OOM detected for task {task.id}: {error}")
# Step 1: Emergency cleanup
await self.emergency_cleanup()
# Step 2: Try CPU fallback
if self.config.enable_cpu_fallback:
task.device = "CPU"
return await self.retry_on_cpu(task)
# Step 3: Reduce batch size and retry
if task.batch_size > 1:
task.batch_size = max(1, task.batch_size // 2)
return await self.retry_with_reduced_batch(task)
# Step 4: Fail gracefully
await self.mark_task_failed(task, "Insufficient memory")
```
### 2. Service Recovery
```python
class ServiceRecovery:
async def restart_service(self, service_id: str):
"""Restart a failed service"""
# Kill existing process
await self.kill_service_process(service_id)
# Clear service memory
await self.clear_service_cache(service_id)
# Restart with fresh state
new_service = await self.create_service(service_id)
await self.pool.replace_service(service_id, new_service)
```
## Testing Strategy
### 1. Memory Leak Detection
```python
@pytest.mark.memory
async def test_no_memory_leak():
initial_memory = get_memory_usage()
# Process 100 tasks
for _ in range(100):
task = create_test_task()
await process_task(task)
# Force cleanup
await cleanup_all()
gc.collect()
final_memory = get_memory_usage()
leak = final_memory - initial_memory
assert leak < 100 # Max 100MB leak tolerance
```
### 2. Stress Testing
```python
@pytest.mark.stress
async def test_concurrent_load():
tasks = [create_large_task() for _ in range(50)]
# Should handle gracefully without OOM
results = await asyncio.gather(
*[process_task(t) for t in tasks],
return_exceptions=True
)
# Some may fail but system should remain stable
successful = sum(1 for r in results if not isinstance(r, Exception))
assert successful > 0
assert await health_check() == "healthy"
```
## Performance Targets
| Metric | Current | Target | Improvement |
|--------|---------|---------|------------|
| Memory per task | 2-4 GB | 0.5-1 GB | 75% reduction |
| Concurrent tasks | 1-2 | 4-8 | 4x increase |
| Model load time | 30-60s | 5-10s (cached) | 6x faster |
| OOM crashes/day | 5-10 | 0-1 | 90% reduction |
| Service uptime | 4-8 hours | 24+ hours | 3x improvement |
## Rollout Plan
### Phase 1: Foundation (Week 1)
- Implement ModelManager
- Integrate with existing OCRService
- Add basic memory monitoring
### Phase 2: Pooling (Week 2)
- Implement OCRServicePool
- Update task router
- Add concurrency limits
### Phase 3: Optimization (Week 3)
- Add MemoryGuard
- Implement adaptive processing
- Add batch processing
### Phase 4: Hardening (Week 4)
- Stress testing
- Performance tuning
- Documentation and monitoring
## Configuration Settings Reference
All memory management settings are defined in `backend/app/core/config.py` under the `Settings` class.
### Memory Thresholds
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `memory_warning_threshold` | float | 0.80 | GPU memory usage ratio (0-1) to trigger warning alerts |
| `memory_critical_threshold` | float | 0.95 | GPU memory ratio to start throttling operations |
| `memory_emergency_threshold` | float | 0.98 | GPU memory ratio to trigger emergency cleanup |
### Memory Monitoring
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `memory_check_interval_seconds` | int | 30 | Background check interval for memory monitoring |
| `enable_memory_alerts` | bool | True | Enable/disable memory threshold alerts |
| `gpu_memory_limit_mb` | int | 6144 | Maximum GPU memory to use (MB) |
| `gpu_memory_reserve_mb` | int | 512 | Memory reserved for CUDA overhead |
### Model Lifecycle Management
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `enable_model_lifecycle_management` | bool | True | Use ModelManager for model lifecycle |
| `model_idle_timeout_seconds` | int | 300 | Unload models after idle time |
| `pp_structure_idle_timeout_seconds` | int | 300 | Unload PP-StructureV3 after idle |
| `structure_model_memory_mb` | int | 2000 | Estimated memory for PP-StructureV3 |
| `ocr_model_memory_mb` | int | 500 | Estimated memory per OCR language model |
| `enable_lazy_model_loading` | bool | True | Load models on demand |
| `auto_unload_unused_models` | bool | True | Auto-unload unused language models |
### Service Pool Configuration
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `enable_service_pool` | bool | True | Use OCRServicePool |
| `max_services_per_device` | int | 1 | Max OCRService instances per GPU |
| `max_total_services` | int | 2 | Max total OCRService instances |
| `service_acquire_timeout_seconds` | float | 300.0 | Timeout for acquiring service from pool |
| `max_queue_size` | int | 50 | Max pending tasks per device queue |
### Concurrency Control
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `max_concurrent_predictions` | int | 2 | Max concurrent PP-StructureV3 predictions |
| `max_concurrent_pages` | int | 2 | Max pages processed concurrently |
| `inference_batch_size` | int | 1 | Batch size for inference |
| `enable_batch_processing` | bool | True | Enable batch processing for large docs |
### Recovery Settings
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `enable_cpu_fallback` | bool | True | Fall back to CPU when GPU memory low |
| `enable_emergency_cleanup` | bool | True | Auto-cleanup on memory pressure |
| `enable_worker_restart` | bool | False | Restart workers on OOM (requires supervisor) |
### Feature Flags
| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `enable_chart_recognition` | bool | True | Enable chart/diagram recognition |
| `enable_formula_recognition` | bool | True | Enable math formula recognition |
| `enable_table_recognition` | bool | True | Enable table structure recognition |
| `enable_seal_recognition` | bool | True | Enable seal/stamp recognition |
| `enable_text_recognition` | bool | True | Enable general text recognition |
| `enable_memory_optimization` | bool | True | Enable memory optimizations |
### Environment Variable Override
All settings can be overridden via environment variables. The format is uppercase with underscores:
```bash
# Example .env file
MEMORY_WARNING_THRESHOLD=0.75
MEMORY_CRITICAL_THRESHOLD=0.90
MAX_CONCURRENT_PREDICTIONS=1
GPU_MEMORY_LIMIT_MB=4096
ENABLE_CPU_FALLBACK=true
```
### Recommended Configurations
#### RTX 4060 8GB (Default)
```bash
GPU_MEMORY_LIMIT_MB=6144
MAX_CONCURRENT_PREDICTIONS=2
MAX_CONCURRENT_PAGES=2
INFERENCE_BATCH_SIZE=1
```
#### RTX 3090 24GB
```bash
GPU_MEMORY_LIMIT_MB=20480
MAX_CONCURRENT_PREDICTIONS=4
MAX_CONCURRENT_PAGES=4
INFERENCE_BATCH_SIZE=2
```
#### CPU-Only Mode
```bash
FORCE_CPU_MODE=true
MAX_CONCURRENT_PREDICTIONS=1
ENABLE_CPU_FALLBACK=false
```
## Prometheus Metrics
The system exports Prometheus-format metrics via the `PrometheusMetrics` class. Available metrics:
### GPU Metrics
- `tool_ocr_memory_gpu_total_bytes` - Total GPU memory
- `tool_ocr_memory_gpu_used_bytes` - Used GPU memory
- `tool_ocr_memory_gpu_free_bytes` - Free GPU memory
- `tool_ocr_memory_gpu_utilization_ratio` - GPU utilization (0-1)
### Model Metrics
- `tool_ocr_memory_models_loaded_total` - Number of loaded models
- `tool_ocr_memory_models_memory_bytes` - Total memory used by models
- `tool_ocr_memory_model_ref_count{model_id}` - Reference count per model
### Prediction Metrics
- `tool_ocr_memory_predictions_active` - Currently active predictions
- `tool_ocr_memory_predictions_queue_depth` - Predictions waiting in queue
- `tool_ocr_memory_predictions_total` - Total predictions processed (counter)
- `tool_ocr_memory_predictions_timeouts_total` - Total prediction timeouts (counter)
### Pool Metrics
- `tool_ocr_memory_pool_services_total` - Total services in pool
- `tool_ocr_memory_pool_services_available` - Available services
- `tool_ocr_memory_pool_services_in_use` - Services in use
- `tool_ocr_memory_pool_acquisitions_total` - Total acquisitions (counter)
### Recovery Metrics
- `tool_ocr_memory_recovery_count_total` - Total recovery attempts
- `tool_ocr_memory_recovery_in_cooldown` - In cooldown (0/1)
- `tool_ocr_memory_recovery_cooldown_remaining_seconds` - Remaining cooldown
## Memory Dump API
The `MemoryDumper` class provides debugging capabilities:
```python
from app.services.memory_manager import get_memory_dumper
dumper = get_memory_dumper()
# Create a memory dump
dump = dumper.create_dump(include_python_objects=True)
# Get dump as dictionary for JSON serialization
dump_dict = dumper.to_dict(dump)
# Compare two dumps to detect memory growth
comparison = dumper.compare_dumps(dump1, dump2)
```
Memory dumps include:
- GPU/CPU memory usage
- Loaded models and reference counts
- Active predictions and queue state
- Service pool statistics
- Recovery manager state
- Python GC statistics
- Large Python objects (optional)

View File

@@ -0,0 +1,77 @@
# Change: Enhanced Memory Management for OCR Services
## Why
The current OCR service architecture suffers from critical memory management issues that lead to GPU memory exhaustion, service instability, and degraded performance under load:
1. **Memory Leaks**: PP-StructureV3 models are permanently exempted from unloading (lines 255-267), causing VRAM to remain occupied indefinitely.
2. **Instance Proliferation**: Each task creates a new OCRService instance (tasks.py lines 44-65), leading to duplicate model loading and memory fragmentation.
3. **Inadequate Memory Monitoring**: `check_gpu_memory()` always returns True in Paddle-only environments, providing no actual memory protection.
4. **Uncontrolled Concurrency**: No limits on simultaneous PP-StructureV3 predictions, causing memory spikes.
5. **No Resource Cleanup**: Tasks complete without releasing GPU memory, leading to accumulated memory usage.
These issues cause service crashes, require frequent restarts, and prevent scaling to handle multiple concurrent requests.
## What Changes
### 1. Model Lifecycle Management
- **NEW**: `ModelManager` class to handle model loading/unloading with reference counting
- **NEW**: Idle timeout mechanism for PP-StructureV3 (same as language models)
- **NEW**: Explicit `teardown()` method for end-of-flow cleanup
- **MODIFIED**: OCRService to use managed model instances
### 2. Service Singleton Pattern
- **NEW**: `OCRServicePool` to manage OCRService instances (one per GPU/device)
- **NEW**: Queue-based task distribution with concurrency limits
- **MODIFIED**: Task router to use pooled services instead of creating new instances
### 3. Enhanced Memory Monitoring
- **NEW**: `MemoryGuard` class using paddle.device.cuda memory APIs
- **NEW**: Support for pynvml/torch as fallback memory query methods
- **NEW**: Memory threshold configuration (warning/critical levels)
- **MODIFIED**: Processing logic to degrade gracefully when memory is low
### 4. Concurrency Control
- **NEW**: Semaphore-based limits for PP-StructureV3 predictions
- **NEW**: Configuration to disable/delay chart/formula/table analysis
- **NEW**: Batch processing mode for large documents
### 5. Active Memory Management
- **NEW**: Background memory monitor thread with metrics collection
- **NEW**: Automatic cache clearing when thresholds exceeded
- **NEW**: Model unloading based on LRU policy
- **NEW**: Worker process restart capability when memory cannot be recovered
### 6. Cleanup Hooks
- **NEW**: Global shutdown handlers for graceful cleanup
- **NEW**: Task completion callbacks to release resources
- **MODIFIED**: Background task wrapper to ensure cleanup on success/failure
## Impact
**Affected specs**:
- `ocr-processing` - Model management and processing flow
- `task-management` - Task execution and resource management
**Affected code**:
- `backend/app/services/ocr_service.py` - Major refactoring for memory management
- `backend/app/routers/tasks.py` - Use service pool instead of new instances
- `backend/app/core/config.py` - New memory management settings
- `backend/app/services/memory_manager.py` - NEW file
- `backend/app/services/service_pool.py` - NEW file
**Breaking changes**: None - All changes are internal optimizations
**Migration**: Existing deployments will benefit immediately with no configuration changes required. Optional tuning parameters available for optimization.
## Testing Requirements
1. **Memory leak tests** - Verify models are properly unloaded
2. **Concurrency tests** - Validate semaphore limits work correctly
3. **Stress tests** - Ensure system degrades gracefully under memory pressure
4. **Integration tests** - Verify pooled services work correctly
5. **Performance benchmarks** - Measure memory usage improvements

View File

@@ -0,0 +1,104 @@
# Memory Management Specification
## ADDED Requirements
### Requirement: Model Manager
The system SHALL provide a ModelManager class that manages model lifecycle with reference counting and idle timeout mechanisms.
#### Scenario: Loading a model
GIVEN a request to load a model
WHEN the model is not already loaded
THEN the ModelManager creates a new instance and sets reference count to 1
#### Scenario: Reusing loaded model
GIVEN a model is already loaded
WHEN another request for the same model arrives
THEN the ModelManager returns the existing instance and increments reference count
#### Scenario: Unloading idle model
GIVEN a model with zero reference count
WHEN the idle timeout period expires
THEN the ModelManager unloads the model and frees memory
### Requirement: Service Pool
The system SHALL implement an OCRServicePool that manages a pool of OCRService instances with one instance per GPU/CPU device.
#### Scenario: Acquiring service from pool
GIVEN a task needs processing
WHEN a service is requested from the pool
THEN the pool returns an available service or queues the request if all services are busy
#### Scenario: Releasing service to pool
GIVEN a task has completed processing
WHEN the service is released
THEN the service becomes available for other tasks in the pool
### Requirement: Memory Monitoring
The system SHALL continuously monitor GPU and CPU memory usage and trigger preventive actions based on configurable thresholds.
#### Scenario: Memory warning threshold
GIVEN memory usage reaches 80% (warning threshold)
WHEN a new task is requested
THEN the system logs a warning and may defer non-critical operations
#### Scenario: Memory critical threshold
GIVEN memory usage reaches 95% (critical threshold)
WHEN a new task is requested
THEN the system attempts CPU fallback or rejects the task
### Requirement: Concurrency Control
The system SHALL limit concurrent PP-StructureV3 predictions using semaphores to prevent memory exhaustion.
#### Scenario: Concurrent prediction limit
GIVEN the maximum concurrent predictions is set to 2
WHEN 2 predictions are already running
THEN additional prediction requests wait in queue until a slot becomes available
### Requirement: Resource Cleanup
The system SHALL ensure all resources are properly cleaned up after task completion or failure.
#### Scenario: Successful task cleanup
GIVEN a task completes successfully
WHEN the task finishes
THEN all allocated memory, temporary files, and model references are released
#### Scenario: Failed task cleanup
GIVEN a task fails with an error
WHEN the error handler runs
THEN cleanup is performed in the finally block regardless of failure reason
## MODIFIED Requirements
### Requirement: OCR Service Instantiation
The OCR service instantiation SHALL use pooled instances instead of creating new instances for each task.
#### Scenario: Task using pooled service
GIVEN a new OCR task arrives
WHEN the task starts processing
THEN it acquires a service from the pool instead of creating a new instance
### Requirement: PP-StructureV3 Model Management
The PP-StructureV3 model SHALL be subject to the same lifecycle management as other models, removing its permanent exemption from unloading.
#### Scenario: PP-StructureV3 unloading
GIVEN PP-StructureV3 has been idle for the configured timeout
WHEN memory pressure is detected
THEN the model can be unloaded to free memory
### Requirement: Task Resource Tracking
Tasks SHALL track their resource usage including estimated and actual memory consumption.
#### Scenario: Task memory tracking
GIVEN a task is processing
WHEN memory metrics are collected
THEN the task records both estimated and actual memory usage for analysis
## REMOVED Requirements
### Requirement: Permanent Model Loading
The requirement for PP-StructureV3 to remain permanently loaded SHALL be removed.
#### Scenario: Dynamic model loading
GIVEN the system starts
WHEN no tasks are using PP-StructureV3
THEN the model is not loaded until first use

View File

@@ -0,0 +1,176 @@
# Tasks for Enhanced Memory Management
## Section 1: Model Lifecycle Management (Priority: Critical)
### 1.1 Create ModelManager class
- [x] Design ModelManager interface with load/unload/get methods
- [x] Implement reference counting for model instances
- [x] Add idle timeout tracking with configurable thresholds
- [x] Create teardown() method for explicit cleanup
- [x] Add logging for model lifecycle events
### 1.2 Integrate PP-StructureV3 with ModelManager
- [x] Remove permanent exemption from unloading (lines 255-267)
- [x] Wrap PP-StructureV3 in managed model wrapper
- [x] Implement lazy loading on first access
- [x] Add unload capability with cache clearing
- [x] Test model reload after unload
## Section 2: Service Singleton Pattern (Priority: Critical)
### 2.1 Create OCRServicePool
- [x] Design pool interface with acquire/release methods
- [x] Implement per-device instance management
- [x] Add queue-based task distribution
- [x] Implement concurrency limits via semaphores
- [x] Add health check for pooled instances
### 2.2 Refactor task router
- [x] Replace OCRService() instantiation with pool.acquire()
- [x] Add proper release in finally blocks
- [x] Handle pool exhaustion gracefully
- [x] Add metrics for pool utilization
- [x] Update error handling for pooled services
## Section 3: Enhanced Memory Monitoring (Priority: High)
### 3.1 Create MemoryGuard class
- [x] Implement paddle.device.cuda memory queries
- [x] Add pynvml integration as fallback
- [x] Add torch memory query support
- [x] Create configurable threshold system
- [x] Implement memory prediction for operations
### 3.2 Integrate memory checks
- [x] Replace existing check_gpu_memory implementation
- [x] Add pre-operation memory checks
- [x] Implement CPU fallback when memory low
- [x] Add memory usage logging
- [x] Create memory pressure alerts
## Section 4: Concurrency Control (Priority: High)
### 4.1 Implement prediction semaphores
- [x] Add semaphore for PP-StructureV3.predict
- [x] Configure max concurrent predictions
- [x] Add queue for waiting predictions
- [x] Implement timeout handling
- [x] Add metrics for queue depth
### 4.2 Add selective processing
- [x] Create config for disabling chart/formula/table
- [x] Implement batch processing for large documents
- [x] Add progressive loading for multi-page docs
- [x] Create priority queue for operations
- [x] Test memory savings with selective processing
## Section 5: Active Memory Management (Priority: Medium)
### 5.1 Create memory monitor thread
- [x] Implement background monitoring loop
- [x] Add periodic memory metrics collection
- [x] Create threshold-based triggers
- [x] Implement automatic cache clearing
- [x] Add LRU-based model unloading
### 5.2 Add recovery mechanisms
- [x] Implement emergency memory release
- [x] Add worker process restart capability (RecoveryManager)
- [x] Create memory dump for debugging
- [x] Add cooldown period after recovery
- [x] Test recovery under various scenarios
## Section 6: Cleanup Hooks (Priority: Medium)
### 6.1 Implement shutdown handlers
- [x] Add FastAPI shutdown event handler
- [x] Create signal handlers (SIGTERM, SIGINT)
- [x] Implement graceful model unloading
- [x] Add connection draining
- [x] Test shutdown sequence
### 6.2 Add task cleanup
- [x] Wrap background tasks with cleanup
- [x] Add success/failure callbacks
- [x] Implement resource release on completion
- [x] Add cleanup verification logging
- [x] Test cleanup in error scenarios
## Section 7: Configuration & Settings (Priority: Low)
### 7.1 Add memory settings to config
- [x] Define memory threshold parameters
- [x] Add model timeout settings
- [x] Configure pool sizes
- [x] Add feature flags for new behavior
- [x] Document all settings
### 7.2 Create monitoring dashboard
- [x] Add memory metrics endpoint
- [x] Create pool status endpoint
- [x] Add model lifecycle stats
- [x] Implement health check endpoint
- [x] Add Prometheus metrics export
## Section 8: Testing & Documentation (Priority: High)
### 8.1 Create comprehensive tests
- [x] Unit tests for ModelManager
- [x] Integration tests for OCRServicePool
- [x] Memory leak detection tests
- [x] Stress tests with concurrent requests
- [x] Performance benchmarks
### 8.2 Documentation
- [ ] Document memory management architecture
- [ ] Create tuning guide
- [ ] Add troubleshooting section
- [ ] Document monitoring setup
- [ ] Create migration guide
---
**Total Tasks**: 58
**Completed**: 53
**Remaining**: 5 (Section 8.2 Documentation only)
**Progress**: ~91%
**Critical Path Status**: Sections 1-8.1 are completed (foundation, memory monitoring, prediction semaphores, batch processing, recovery, signal handlers, configuration, Prometheus metrics, and comprehensive tests in place)
## Implementation Summary
### Files Created
- `backend/app/services/memory_manager.py` - ModelManager, MemoryGuard, MemoryConfig, PredictionSemaphore, BatchProcessor, ProgressiveLoader, PriorityOperationQueue, RecoveryManager
- `backend/app/services/service_pool.py` - OCRServicePool, PoolConfig
- `backend/tests/services/test_memory_manager.py` - Unit tests for memory management (57 tests)
- `backend/tests/services/test_service_pool.py` - Unit tests for service pool (18 tests)
- `backend/tests/services/test_ocr_memory_integration.py` - Integration tests for memory check patterns (10 tests)
### Files Modified
- `backend/app/core/config.py` - Added memory management configuration settings
- `backend/app/services/ocr_service.py` - Removed PP-StructureV3 exemption, added unload capability, integrated MemoryGuard for pre-operation checks and CPU fallback, added PredictionSemaphore for concurrent prediction control
- `backend/app/services/pp_structure_enhanced.py` - Added PredictionSemaphore control for predict calls
- `backend/app/routers/tasks.py` - Refactored to use service pool
- `backend/app/main.py` - Added startup/shutdown handlers, signal handlers (SIGTERM/SIGINT), connection draining, recovery manager shutdown
### New Classes Added (Section 4.2-8)
- `BatchProcessor` - Memory-aware batch processing for large documents with priority support
- `ProgressiveLoader` - Progressive page loading with lookahead and automatic cleanup
- `PriorityOperationQueue` - Priority queue with timeout and cancellation support
- `RecoveryManager` - Memory recovery with cooldown period and attempt limits
- `MemoryDumper` - Memory dump creation for debugging with history and comparison
- `PrometheusMetrics` - Prometheus-format metrics export for monitoring
- Signal handlers for graceful shutdown (SIGTERM, SIGINT)
- Connection draining for clean shutdown
### New Test Classes Added (Section 8.1)
- `TestModelReloadAfterUnload` - Tests for model reload after unload
- `TestSelectiveProcessingMemorySavings` - Tests for memory savings with selective processing
- `TestRecoveryScenarios` - Tests for recovery under various scenarios
- `TestShutdownSequence` - Tests for shutdown sequence
- `TestCleanupInErrorScenarios` - Tests for cleanup in error scenarios
- `TestMemoryLeakDetection` - Tests for memory leak detection
- `TestStressConcurrentRequests` - Stress tests with concurrent requests
- `TestPerformanceBenchmarks` - Performance benchmark tests
- `TestMemoryDumper` - Tests for MemoryDumper class
- `TestPrometheusMetrics` - Tests for PrometheusMetrics class