Backend: - Add hybrid image extraction for Direct track (inline image blocks) - Add render_inline_image_regions() fallback when OCR doesn't find images - Add check_document_for_missing_images() for detecting missing images - Add memory management system (MemoryGuard, ModelManager, ServicePool) - Update pdf_generator_service to handle HYBRID processing track - Add ElementType.LOGO for logo extraction Frontend: - Fix PDF viewer re-rendering issues with memoization - Add TaskNotFound component and useTaskValidation hook - Disable StrictMode due to react-pdf incompatibility - Fix task detail and results page loading states 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
388 lines
12 KiB
Python
388 lines
12 KiB
Python
"""
|
|
Tests for OCR Service Pool
|
|
|
|
Tests OCRServicePool functionality including acquire, release, and concurrency.
|
|
"""
|
|
|
|
import pytest
|
|
import threading
|
|
import time
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import sys
|
|
|
|
# Mock paddle before importing service_pool to avoid import errors
|
|
# when paddle is not installed in the test environment
|
|
paddle_mock = MagicMock()
|
|
paddle_mock.is_compiled_with_cuda.return_value = False
|
|
paddle_mock.device.cuda.device_count.return_value = 0
|
|
paddle_mock.device.cuda.memory_allocated.return_value = 0
|
|
paddle_mock.device.cuda.memory_reserved.return_value = 0
|
|
paddle_mock.device.cuda.empty_cache = MagicMock()
|
|
sys.modules['paddle'] = paddle_mock
|
|
|
|
from app.services.service_pool import (
|
|
OCRServicePool,
|
|
PooledService,
|
|
PoolConfig,
|
|
ServiceState,
|
|
get_service_pool,
|
|
shutdown_service_pool,
|
|
)
|
|
|
|
|
|
class TestPoolConfig:
|
|
"""Tests for PoolConfig class"""
|
|
|
|
def test_default_values(self):
|
|
"""Test default configuration values"""
|
|
config = PoolConfig()
|
|
assert config.max_services_per_device == 1
|
|
assert config.max_total_services == 2
|
|
assert config.acquire_timeout_seconds == 300.0
|
|
assert config.max_queue_size == 50
|
|
assert config.max_consecutive_errors == 3
|
|
|
|
def test_custom_values(self):
|
|
"""Test custom configuration values"""
|
|
config = PoolConfig(
|
|
max_services_per_device=2,
|
|
max_total_services=4,
|
|
acquire_timeout_seconds=60.0,
|
|
)
|
|
assert config.max_services_per_device == 2
|
|
assert config.max_total_services == 4
|
|
assert config.acquire_timeout_seconds == 60.0
|
|
|
|
|
|
class TestPooledService:
|
|
"""Tests for PooledService class"""
|
|
|
|
def test_creation(self):
|
|
"""Test PooledService creation"""
|
|
mock_service = Mock()
|
|
pooled = PooledService(
|
|
service=mock_service,
|
|
device="GPU:0",
|
|
)
|
|
assert pooled.service is mock_service
|
|
assert pooled.device == "GPU:0"
|
|
assert pooled.state == ServiceState.AVAILABLE
|
|
assert pooled.use_count == 0
|
|
assert pooled.error_count == 0
|
|
|
|
|
|
class TestOCRServicePool:
|
|
"""Tests for OCRServicePool class"""
|
|
|
|
def setup_method(self):
|
|
"""Reset singleton before each test"""
|
|
shutdown_service_pool()
|
|
OCRServicePool._instance = None
|
|
OCRServicePool._lock = threading.Lock()
|
|
|
|
def teardown_method(self):
|
|
"""Cleanup after each test"""
|
|
shutdown_service_pool()
|
|
OCRServicePool._instance = None
|
|
|
|
def test_singleton_pattern(self):
|
|
"""Test that OCRServicePool is a singleton"""
|
|
pool1 = OCRServicePool()
|
|
pool2 = OCRServicePool()
|
|
assert pool1 is pool2
|
|
pool1.shutdown()
|
|
|
|
def test_initialize_device(self):
|
|
"""Test device initialization"""
|
|
config = PoolConfig()
|
|
pool = OCRServicePool(config)
|
|
|
|
# Default device should be initialized
|
|
assert "GPU:0" in pool.services
|
|
assert "GPU:0" in pool.semaphores
|
|
|
|
# Test adding new device
|
|
pool._initialize_device("GPU:1")
|
|
assert "GPU:1" in pool.services
|
|
assert "GPU:1" in pool.semaphores
|
|
|
|
pool.shutdown()
|
|
|
|
def test_acquire_creates_service(self):
|
|
"""Test that acquire creates a new service if none available"""
|
|
config = PoolConfig(max_services_per_device=1)
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
mock_service.process = Mock()
|
|
mock_service.get_gpu_status = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
pooled = pool.acquire(device="GPU:0", timeout=5.0)
|
|
assert pooled is not None
|
|
assert pooled.state == ServiceState.IN_USE
|
|
assert pooled.use_count == 1
|
|
|
|
pool.shutdown()
|
|
|
|
def test_acquire_reuses_available_service(self):
|
|
"""Test that acquire reuses available services"""
|
|
config = PoolConfig(max_services_per_device=1)
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
# First acquire
|
|
pooled1 = pool.acquire(device="GPU:0")
|
|
service_id = id(pooled1.service)
|
|
pool.release(pooled1)
|
|
|
|
# Second acquire should get the same service
|
|
pooled2 = pool.acquire(device="GPU:0")
|
|
assert id(pooled2.service) == service_id
|
|
assert pooled2.use_count == 2
|
|
|
|
pool.shutdown()
|
|
|
|
def test_release_makes_service_available(self):
|
|
"""Test that release makes service available again"""
|
|
config = PoolConfig()
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
pooled = pool.acquire(device="GPU:0")
|
|
assert pooled.state == ServiceState.IN_USE
|
|
|
|
pool.release(pooled)
|
|
assert pooled.state == ServiceState.AVAILABLE
|
|
|
|
pool.shutdown()
|
|
|
|
def test_release_with_error(self):
|
|
"""Test that release with error increments error count"""
|
|
config = PoolConfig(max_consecutive_errors=3)
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
pooled = pool.acquire(device="GPU:0")
|
|
pool.release(pooled, error=Exception("Test error"))
|
|
|
|
assert pooled.error_count == 1
|
|
assert pooled.state == ServiceState.AVAILABLE
|
|
|
|
pool.shutdown()
|
|
|
|
def test_release_marks_unhealthy_after_errors(self):
|
|
"""Test that service is marked unhealthy after too many errors"""
|
|
config = PoolConfig(max_consecutive_errors=2)
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
pooled = pool.acquire(device="GPU:0")
|
|
pool.release(pooled, error=Exception("Error 1"))
|
|
|
|
pooled = pool.acquire(device="GPU:0")
|
|
pool.release(pooled, error=Exception("Error 2"))
|
|
|
|
assert pooled.state == ServiceState.UNHEALTHY
|
|
assert pooled.error_count == 2
|
|
|
|
pool.shutdown()
|
|
|
|
def test_acquire_context_manager(self):
|
|
"""Test context manager for acquire/release"""
|
|
config = PoolConfig()
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
with pool.acquire_context(device="GPU:0") as pooled:
|
|
assert pooled is not None
|
|
assert pooled.state == ServiceState.IN_USE
|
|
|
|
# After context, service should be available
|
|
assert pooled.state == ServiceState.AVAILABLE
|
|
|
|
pool.shutdown()
|
|
|
|
def test_acquire_context_manager_with_error(self):
|
|
"""Test context manager releases on error"""
|
|
config = PoolConfig()
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
with pytest.raises(ValueError):
|
|
with pool.acquire_context(device="GPU:0") as pooled:
|
|
raise ValueError("Test error")
|
|
|
|
# Service should still be available after error
|
|
assert pooled.error_count == 1
|
|
|
|
pool.shutdown()
|
|
|
|
def test_acquire_timeout(self):
|
|
"""Test that acquire times out when no service available"""
|
|
config = PoolConfig(
|
|
max_services_per_device=1,
|
|
max_total_services=1,
|
|
)
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
# Acquire the only service
|
|
pooled1 = pool.acquire(device="GPU:0")
|
|
assert pooled1 is not None
|
|
|
|
# Try to acquire another - should timeout
|
|
pooled2 = pool.acquire(device="GPU:0", timeout=0.5)
|
|
assert pooled2 is None
|
|
|
|
pool.shutdown()
|
|
|
|
def test_get_pool_stats(self):
|
|
"""Test pool statistics"""
|
|
config = PoolConfig()
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
# Acquire a service
|
|
pooled = pool.acquire(device="GPU:0")
|
|
|
|
stats = pool.get_pool_stats()
|
|
assert stats["total_services"] == 1
|
|
assert stats["in_use_services"] == 1
|
|
assert stats["available_services"] == 0
|
|
assert stats["metrics"]["total_acquisitions"] == 1
|
|
|
|
pool.release(pooled)
|
|
|
|
stats = pool.get_pool_stats()
|
|
assert stats["available_services"] == 1
|
|
assert stats["metrics"]["total_releases"] == 1
|
|
|
|
pool.shutdown()
|
|
|
|
def test_health_check(self):
|
|
"""Test health check functionality"""
|
|
config = PoolConfig()
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with a mock service
|
|
mock_service = Mock()
|
|
mock_service.process = Mock()
|
|
mock_service.get_gpu_status = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
# Acquire and release to update use_count
|
|
pooled = pool.acquire(device="GPU:0")
|
|
pool.release(pooled)
|
|
|
|
health = pool.health_check()
|
|
assert health["healthy"] is True
|
|
assert len(health["services"]) == 1
|
|
assert health["services"][0]["responsive"] is True
|
|
|
|
pool.shutdown()
|
|
|
|
def test_concurrent_acquire(self):
|
|
"""Test concurrent service acquisition"""
|
|
config = PoolConfig(
|
|
max_services_per_device=2,
|
|
max_total_services=2,
|
|
)
|
|
pool = OCRServicePool(config)
|
|
|
|
# Pre-populate with 2 mock services
|
|
for i in range(2):
|
|
mock_service = Mock()
|
|
pooled_service = PooledService(service=mock_service, device="GPU:0")
|
|
pool.services["GPU:0"].append(pooled_service)
|
|
|
|
results = []
|
|
|
|
def worker(worker_id):
|
|
pooled = pool.acquire(device="GPU:0", timeout=5.0, task_id=f"task_{worker_id}")
|
|
if pooled:
|
|
results.append((worker_id, pooled))
|
|
time.sleep(0.1) # Simulate work
|
|
pool.release(pooled)
|
|
|
|
threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
# All workers should have acquired a service
|
|
assert len(results) == 4
|
|
|
|
pool.shutdown()
|
|
|
|
|
|
class TestGetServicePool:
|
|
"""Tests for get_service_pool helper function"""
|
|
|
|
def setup_method(self):
|
|
"""Reset singleton before each test"""
|
|
shutdown_service_pool()
|
|
OCRServicePool._instance = None
|
|
|
|
def teardown_method(self):
|
|
"""Cleanup after each test"""
|
|
shutdown_service_pool()
|
|
OCRServicePool._instance = None
|
|
|
|
def test_get_service_pool_creates_singleton(self):
|
|
"""Test that get_service_pool creates a singleton"""
|
|
pool1 = get_service_pool()
|
|
pool2 = get_service_pool()
|
|
assert pool1 is pool2
|
|
shutdown_service_pool()
|
|
|
|
def test_shutdown_service_pool(self):
|
|
"""Test shutdown_service_pool cleans up"""
|
|
pool = get_service_pool()
|
|
shutdown_service_pool()
|
|
|
|
# Should be able to create new pool
|
|
new_pool = get_service_pool()
|
|
assert new_pool._initialized is True
|
|
shutdown_service_pool()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|