Files
OCR/openspec/changes/archive/2025-11-26-enhance-memory-management/delta-task-management.md
egg a227311b2d chore: archive enhance-memory-management proposal (75/80 tasks)
Archive incomplete proposal for later continuation.
OCR processing has known quality issues to be addressed in future work.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 16:10:45 +08:00

225 lines
5.6 KiB
Markdown

# Spec Delta: task-management
## Changes to Task Management Specification
### 1. Task Resource Management
#### Modified: Task Creation
```python
class TaskManager:
def create_task(self, request: TaskCreateRequest) -> Task:
"""Create task with resource estimation"""
task = Task(...)
task.estimated_memory_mb = self._estimate_memory(request)
task.assigned_device = self._select_device(task.estimated_memory_mb)
return task
```
#### Added: Resource Tracking
```python
class Task(BaseModel):
# Existing fields...
# New resource tracking fields
estimated_memory_mb: Optional[int] = None
actual_memory_mb: Optional[int] = None
assigned_device: Optional[str] = None
service_instance_id: Optional[str] = None
resource_cleanup_completed: bool = False
```
### 2. Task Execution
#### Modified: Task Router
```python
@router.post("/tasks/{task_id}/start")
async def start_task(task_id: str, params: TaskStartRequest):
# Old approach - creates new service
# service = OCRService(device=device)
# New approach - uses pooled service
service = await service_pool.acquire(device=params.device)
try:
result = await service.process(task_id, params)
finally:
await service_pool.release(service)
```
#### Added: Task Queue Management
```python
class TaskQueue:
"""Priority queue for task execution"""
def add_task(self, task: Task, priority: int = 0):
"""Add task to queue with priority"""
def get_next_task(self, device: str) -> Optional[Task]:
"""Get next task for specific device"""
def requeue_task(self, task: Task):
"""Re-add failed task with lower priority"""
```
### 3. Background Task Processing
#### Modified: Background Task Wrapper
```python
async def process_document_task(task_id: str, background_tasks: BackgroundTasks):
"""Enhanced background task with cleanup"""
# Register cleanup callback
def cleanup():
asyncio.create_task(cleanup_task_resources(task_id))
background_tasks.add_task(
_process_with_cleanup,
task_id,
on_complete=cleanup,
on_error=cleanup
)
```
#### Added: Task Resource Cleanup
```python
async def cleanup_task_resources(task_id: str):
"""Release all resources associated with task"""
- Clear task-specific caches
- Release temporary files
- Update resource tracking
- Log cleanup completion
```
### 4. Task Monitoring
#### Added: Task Metrics Endpoint
```python
@router.get("/tasks/metrics")
async def get_task_metrics():
return {
"active_tasks": {...},
"queued_tasks": {...},
"memory_by_device": {...},
"pool_utilization": {...},
"average_wait_time": ...
}
```
#### Added: Task Health Checks
```python
@router.get("/tasks/{task_id}/health")
async def get_task_health(task_id: str):
return {
"status": "...",
"memory_usage_mb": ...,
"processing_time_s": ...,
"device": "...",
"warnings": [...]
}
```
### 5. Error Handling
#### Added: Memory-Based Error Recovery
```python
class TaskErrorHandler:
async def handle_oom_error(self, task: Task):
"""Handle out-of-memory errors"""
- Log memory state at failure
- Attempt CPU fallback if configured
- Requeue with reduced batch size
- Alert monitoring system
```
#### Modified: Task Failure Reasons
```python
class TaskFailureReason(Enum):
# Existing reasons...
# New memory-related reasons
OUT_OF_MEMORY = "out_of_memory"
POOL_EXHAUSTED = "pool_exhausted"
DEVICE_UNAVAILABLE = "device_unavailable"
MEMORY_LIMIT_EXCEEDED = "memory_limit_exceeded"
```
### 6. Task Lifecycle Events
#### Added: Resource Events
```python
class TaskEvent(Enum):
# Existing events...
# New resource events
RESOURCE_ACQUIRED = "resource_acquired"
RESOURCE_RELEASED = "resource_released"
MEMORY_WARNING = "memory_warning"
CLEANUP_STARTED = "cleanup_started"
CLEANUP_COMPLETED = "cleanup_completed"
```
#### Added: Event Handlers
```python
async def on_task_resource_acquired(task_id: str, resource: Dict):
"""Log and track resource acquisition"""
async def on_task_cleanup_completed(task_id: str):
"""Verify cleanup and update status"""
```
## Database Schema Changes
### Task Table Updates
```sql
ALTER TABLE tasks ADD COLUMN estimated_memory_mb INTEGER;
ALTER TABLE tasks ADD COLUMN actual_memory_mb INTEGER;
ALTER TABLE tasks ADD COLUMN assigned_device VARCHAR(50);
ALTER TABLE tasks ADD COLUMN service_instance_id VARCHAR(100);
ALTER TABLE tasks ADD COLUMN resource_cleanup_completed BOOLEAN DEFAULT FALSE;
```
### New Tables
```sql
CREATE TABLE task_metrics (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) REFERENCES tasks(id),
timestamp TIMESTAMP,
memory_usage_mb INTEGER,
device VARCHAR(50),
processing_stage VARCHAR(100)
);
CREATE TABLE task_events (
id SERIAL PRIMARY KEY,
task_id VARCHAR(36) REFERENCES tasks(id),
event_type VARCHAR(50),
timestamp TIMESTAMP,
details JSONB
);
```
## Configuration Changes
### Added Task Settings
```yaml
tasks:
max_queue_size: 100
queue_timeout_seconds: 300
enable_priority_queue: true
enable_resource_tracking: true
cleanup_timeout_seconds: 30
retry:
max_attempts: 3
backoff_multiplier: 2
memory_reduction_factor: 0.5
```
## Breaking Changes
None - All changes maintain backward compatibility.
## Migration Requirements
1. Run database migrations to add new columns
2. Deploy updated task router code
3. Configure pool settings based on hardware
4. Enable monitoring endpoints
5. Test cleanup hooks in staging environment