Files
OCR/openspec/changes/enhance-memory-management/delta-task-management.md
egg ba8ddf2b68 feat: create OpenSpec proposal for enhanced memory management
- Create comprehensive proposal addressing OOM crashes and memory leaks
- Define 6 core areas: model lifecycle, service pooling, monitoring
- Add 58 implementation tasks across 8 sections
- Design ModelManager with reference counting and idle timeout
- Plan OCRServicePool for singleton service pattern
- Specify MemoryGuard for proactive memory monitoring
- Include concurrency controls and cleanup hooks
- Add spec deltas for ocr-processing and task-management
- Create detailed design document with architecture diagrams
- Define performance targets: 75% memory reduction, 4x concurrency

Critical improvements:
- Remove PP-StructureV3 permanent exemption from unloading
- Replace per-task OCRService instantiation with pooling
- Add real GPU memory monitoring (currently always returns True)
- Implement semaphore-based concurrency limits
- Add proper resource cleanup on task completion

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-25 15:21:32 +08:00

5.6 KiB

Spec Delta: task-management

Changes to Task Management Specification

1. Task Resource Management

Modified: Task Creation

class TaskManager:
    def create_task(self, request: TaskCreateRequest) -> Task:
        """Create task with resource estimation"""
        task = Task(...)
        task.estimated_memory_mb = self._estimate_memory(request)
        task.assigned_device = self._select_device(task.estimated_memory_mb)
        return task

Added: Resource Tracking

class Task(BaseModel):
    # Existing fields...

    # New resource tracking fields
    estimated_memory_mb: Optional[int] = None
    actual_memory_mb: Optional[int] = None
    assigned_device: Optional[str] = None
    service_instance_id: Optional[str] = None
    resource_cleanup_completed: bool = False

2. Task Execution

Modified: Task Router

@router.post("/tasks/{task_id}/start")
async def start_task(task_id: str, params: TaskStartRequest):
    # Old approach - creates new service
    # service = OCRService(device=device)

    # New approach - uses pooled service
    service = await service_pool.acquire(device=params.device)
    try:
        result = await service.process(task_id, params)
    finally:
        await service_pool.release(service)

Added: Task Queue Management

class TaskQueue:
    """Priority queue for task execution"""

    def add_task(self, task: Task, priority: int = 0):
        """Add task to queue with priority"""

    def get_next_task(self, device: str) -> Optional[Task]:
        """Get next task for specific device"""

    def requeue_task(self, task: Task):
        """Re-add failed task with lower priority"""

3. Background Task Processing

Modified: Background Task Wrapper

async def process_document_task(task_id: str, background_tasks: BackgroundTasks):
    """Enhanced background task with cleanup"""

    # Register cleanup callback
    def cleanup():
        asyncio.create_task(cleanup_task_resources(task_id))

    background_tasks.add_task(
        _process_with_cleanup,
        task_id,
        on_complete=cleanup,
        on_error=cleanup
    )

Added: Task Resource Cleanup

async def cleanup_task_resources(task_id: str):
    """Release all resources associated with task"""
    - Clear task-specific caches
    - Release temporary files
    - Update resource tracking
    - Log cleanup completion

4. Task Monitoring

Added: Task Metrics Endpoint

@router.get("/tasks/metrics")
async def get_task_metrics():
    return {
        "active_tasks": {...},
        "queued_tasks": {...},
        "memory_by_device": {...},
        "pool_utilization": {...},
        "average_wait_time": ...
    }

Added: Task Health Checks

@router.get("/tasks/{task_id}/health")
async def get_task_health(task_id: str):
    return {
        "status": "...",
        "memory_usage_mb": ...,
        "processing_time_s": ...,
        "device": "...",
        "warnings": [...]
    }

5. Error Handling

Added: Memory-Based Error Recovery

class TaskErrorHandler:
    async def handle_oom_error(self, task: Task):
        """Handle out-of-memory errors"""
        - Log memory state at failure
        - Attempt CPU fallback if configured
        - Requeue with reduced batch size
        - Alert monitoring system

Modified: Task Failure Reasons

class TaskFailureReason(Enum):
    # Existing reasons...

    # New memory-related reasons
    OUT_OF_MEMORY = "out_of_memory"
    POOL_EXHAUSTED = "pool_exhausted"
    DEVICE_UNAVAILABLE = "device_unavailable"
    MEMORY_LIMIT_EXCEEDED = "memory_limit_exceeded"

6. Task Lifecycle Events

Added: Resource Events

class TaskEvent(Enum):
    # Existing events...

    # New resource events
    RESOURCE_ACQUIRED = "resource_acquired"
    RESOURCE_RELEASED = "resource_released"
    MEMORY_WARNING = "memory_warning"
    CLEANUP_STARTED = "cleanup_started"
    CLEANUP_COMPLETED = "cleanup_completed"

Added: Event Handlers

async def on_task_resource_acquired(task_id: str, resource: Dict):
    """Log and track resource acquisition"""

async def on_task_cleanup_completed(task_id: str):
    """Verify cleanup and update status"""

Database Schema Changes

Task Table Updates

ALTER TABLE tasks ADD COLUMN estimated_memory_mb INTEGER;
ALTER TABLE tasks ADD COLUMN actual_memory_mb INTEGER;
ALTER TABLE tasks ADD COLUMN assigned_device VARCHAR(50);
ALTER TABLE tasks ADD COLUMN service_instance_id VARCHAR(100);
ALTER TABLE tasks ADD COLUMN resource_cleanup_completed BOOLEAN DEFAULT FALSE;

New Tables

CREATE TABLE task_metrics (
    id SERIAL PRIMARY KEY,
    task_id VARCHAR(36) REFERENCES tasks(id),
    timestamp TIMESTAMP,
    memory_usage_mb INTEGER,
    device VARCHAR(50),
    processing_stage VARCHAR(100)
);

CREATE TABLE task_events (
    id SERIAL PRIMARY KEY,
    task_id VARCHAR(36) REFERENCES tasks(id),
    event_type VARCHAR(50),
    timestamp TIMESTAMP,
    details JSONB
);

Configuration Changes

Added Task Settings

tasks:
  max_queue_size: 100
  queue_timeout_seconds: 300
  enable_priority_queue: true
  enable_resource_tracking: true
  cleanup_timeout_seconds: 30

  retry:
    max_attempts: 3
    backoff_multiplier: 2
    memory_reduction_factor: 0.5

Breaking Changes

None - All changes maintain backward compatibility.

Migration Requirements

  1. Run database migrations to add new columns
  2. Deploy updated task router code
  3. Configure pool settings based on hardware
  4. Enable monitoring endpoints
  5. Test cleanup hooks in staging environment