""" Unit tests for DirectExtractionEngine service. Tests cover: - Text extraction accuracy - Structure preservation (headers, lists, sections) - Coordinate extraction - Table detection - Image extraction """ import pytest from pathlib import Path import fitz from PIL import Image import io from app.services.direct_extraction_engine import DirectExtractionEngine from app.models.unified_document import ( ElementType, ProcessingTrack, UnifiedDocument ) class TestDirectExtractionEngine: """Test suite for DirectExtractionEngine.""" @pytest.fixture def engine(self): """Create engine with default settings.""" return DirectExtractionEngine() @pytest.fixture def engine_no_tables(self): """Create engine with table detection disabled.""" return DirectExtractionEngine(enable_table_detection=False) @pytest.fixture def engine_no_images(self): """Create engine with image extraction disabled.""" return DirectExtractionEngine(enable_image_extraction=False) # ===== Text Extraction Tests ===== def test_extract_simple_text(self, engine, temp_dir): """Test basic text extraction from PDF.""" pdf_path = temp_dir / "simple.pdf" doc = fitz.open() page = doc.new_page() test_text = "Hello World! This is a test document." page.insert_text((50, 50), test_text, fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) assert isinstance(result, UnifiedDocument) assert len(result.pages) == 1 assert result.metadata.processing_track == ProcessingTrack.DIRECT # Check text was extracted text_elements = [e for e in result.pages[0].elements if e.is_text] assert len(text_elements) > 0 # Verify text content all_text = " ".join(e.content for e in text_elements if isinstance(e.content, str)) assert "Hello World" in all_text def test_extract_multiline_text(self, engine, temp_dir): """Test extraction of multiple lines of text.""" pdf_path = temp_dir / "multiline.pdf" doc = fitz.open() page = doc.new_page() lines = [ "First line of text", "Second line of text", "Third line of text" ] y = 50 for line in lines: page.insert_text((50, y), line, fontsize=12) y += 20 doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) text_elements = [e for e in result.pages[0].elements if e.is_text] all_text = " ".join(e.content for e in text_elements if isinstance(e.content, str)) for line in lines: assert line in all_text def test_extract_chinese_text(self, engine, temp_dir): """Test extraction of Chinese text.""" pdf_path = temp_dir / "chinese.pdf" doc = fitz.open() page = doc.new_page() # Use a font that supports Chinese if available chinese_text = "這是中文測試文字" try: page.insert_text((50, 50), chinese_text, fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) assert len(result.pages) == 1 except Exception: # Skip if font not available doc.close() pytest.skip("Chinese font not available for test") def test_extract_multiple_pages(self, engine, temp_dir): """Test extraction from multi-page PDF.""" pdf_path = temp_dir / "multipage.pdf" doc = fitz.open() for i in range(5): page = doc.new_page() page.insert_text((50, 50), f"This is page {i + 1}", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) assert len(result.pages) == 5 for i, page in enumerate(result.pages): assert page.page_number == i + 1 # ===== Coordinate Extraction Tests ===== def test_bounding_box_accuracy(self, engine, temp_dir): """Test that bounding boxes are extracted correctly.""" pdf_path = temp_dir / "bbox.pdf" doc = fitz.open() page = doc.new_page() # Insert text at known position x, y = 100, 200 page.insert_text((x, y), "Positioned Text", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) text_elements = [e for e in result.pages[0].elements if e.is_text] assert len(text_elements) > 0 # Check that bbox is near the insertion point element = text_elements[0] assert element.bbox is not None assert element.bbox.x0 >= 0 assert element.bbox.y0 >= 0 assert element.bbox.x1 > element.bbox.x0 assert element.bbox.y1 > element.bbox.y0 def test_page_dimensions(self, engine, temp_dir): """Test that page dimensions are extracted correctly.""" pdf_path = temp_dir / "dimensions.pdf" doc = fitz.open() # Create A4 page (595.28 x 841.89 points) page = doc.new_page(width=595.28, height=841.89) page.insert_text((50, 50), "Test", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) dimensions = result.pages[0].dimensions assert abs(dimensions.width - 595.28) < 1 assert abs(dimensions.height - 841.89) < 1 # ===== Structure Preservation Tests ===== def test_detect_title_by_font_size(self, engine, temp_dir): """Test title detection based on font size.""" pdf_path = temp_dir / "title.pdf" doc = fitz.open() page = doc.new_page() # Large text for title page.insert_text((50, 50), "Document Title", fontsize=24) # Normal text for content page.insert_text((50, 100), "This is regular content.", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) elements = result.pages[0].elements # Should have extracted text elements text_elements = [e for e in elements if e.is_text] assert len(text_elements) >= 1 # Check that elements with larger fonts have different types or metadata # Note: Title detection depends on font size thresholds and may vary font_sizes = [] for e in text_elements: if e.style and e.style.font_size: font_sizes.append(e.style.font_size) # Should have captured different font sizes if len(font_sizes) >= 2: assert max(font_sizes) > min(font_sizes) def test_detect_list_items(self, engine, temp_dir): """Test detection of list items.""" pdf_path = temp_dir / "list.pdf" doc = fitz.open() page = doc.new_page() y = 50 list_items = [ "• First item", "• Second item", "• Third item" ] for item in list_items: page.insert_text((50, y), item, fontsize=12) y += 20 doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) list_elements = [e for e in result.pages[0].elements if e.type == ElementType.LIST_ITEM] # Should detect list items assert len(list_elements) >= 1 def test_detect_headers_footers(self, engine, temp_dir): """Test header/footer detection by position.""" pdf_path = temp_dir / "header_footer.pdf" doc = fitz.open() page = doc.new_page() # Header at top page.insert_text((50, 30), "Document Header", fontsize=10) # Content in middle page.insert_text((50, 400), "Main content of the document.", fontsize=12) # Footer at bottom page.insert_text((50, 800), "Page 1", fontsize=10) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) elements = result.pages[0].elements # Check for header/footer metadata headers = [e for e in elements if e.metadata.get('is_page_header')] footers = [e for e in elements if e.metadata.get('is_page_footer')] # At least one should be detected assert len(headers) + len(footers) >= 1 def test_section_hierarchy(self, engine, temp_dir): """Test section hierarchy building.""" pdf_path = temp_dir / "sections.pdf" doc = fitz.open() page = doc.new_page() # Create hierarchical headers page.insert_text((50, 50), "Main Title", fontsize=20) page.insert_text((50, 100), "Section 1", fontsize=16) page.insert_text((50, 150), "Subsection 1.1", fontsize=14) page.insert_text((50, 200), "Content text", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) elements = result.pages[0].elements # Check for section levels headers = [e for e in elements if e.type in [ElementType.TITLE, ElementType.HEADER]] levels = [e.metadata.get('section_level') for e in headers if e.metadata.get('section_level')] if levels: # Should have multiple levels assert len(set(levels)) >= 1 # ===== Table Detection Tests ===== def test_detect_simple_table(self, engine, temp_dir): """Test detection of a simple table.""" pdf_path = temp_dir / "table.pdf" doc = fitz.open() page = doc.new_page() # Create a simple table layout table_data = [ ["Name", "Age", "City"], ["Alice", "30", "Tokyo"], ["Bob", "25", "Paris"] ] y = 100 for row in table_data: x = 50 for cell in row: page.insert_text((x, y), cell, fontsize=10) x += 100 y += 20 doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) table_elements = [e for e in result.pages[0].elements if e.type == ElementType.TABLE] # Table detection may or may not succeed depending on layout # Just verify the extraction completed without error assert result.pages[0].elements is not None def test_table_detection_disabled(self, engine_no_tables, temp_dir): """Test that table detection can be disabled.""" pdf_path = temp_dir / "no_table.pdf" doc = fitz.open() page = doc.new_page() # Create table-like layout for i in range(3): page.insert_text((50 + i*100, 100), f"Col{i}", fontsize=10) page.insert_text((50 + i*100, 120), f"Val{i}", fontsize=10) doc.save(str(pdf_path)) doc.close() result = engine_no_tables.extract(pdf_path) # With table detection disabled, should not find tables table_elements = [e for e in result.pages[0].elements if e.type == ElementType.TABLE] assert len(table_elements) == 0 # ===== Image Extraction Tests ===== def test_extract_embedded_image(self, engine, temp_dir): """Test extraction of embedded images.""" pdf_path = temp_dir / "with_image.pdf" doc = fitz.open() page = doc.new_page() # Create and embed an image img = Image.new('RGB', (100, 100), color='red') img_bytes = io.BytesIO() img.save(img_bytes, format='PNG') img_bytes.seek(0) # Insert image into PDF rect = fitz.Rect(100, 100, 300, 300) page.insert_image(rect, stream=img_bytes.read()) doc.save(str(pdf_path)) doc.close() # Create output directory for images output_dir = temp_dir / "images" result = engine.extract(pdf_path, output_dir) image_elements = [e for e in result.pages[0].elements if e.type == ElementType.IMAGE] assert len(image_elements) >= 1 # Check image has bbox if image_elements: img_elem = image_elements[0] assert img_elem.bbox is not None def test_image_extraction_disabled(self, engine_no_images, temp_dir): """Test that image extraction can be disabled.""" pdf_path = temp_dir / "no_image.pdf" doc = fitz.open() page = doc.new_page() # Add image img = Image.new('RGB', (50, 50), color='blue') img_bytes = io.BytesIO() img.save(img_bytes, format='PNG') img_bytes.seek(0) rect = fitz.Rect(100, 100, 200, 200) page.insert_image(rect, stream=img_bytes.read()) doc.save(str(pdf_path)) doc.close() result = engine_no_images.extract(pdf_path) image_elements = [e for e in result.pages[0].elements if e.type == ElementType.IMAGE] assert len(image_elements) == 0 # ===== Metadata Tests ===== def test_extract_pdf_metadata(self, engine, temp_dir): """Test extraction of PDF metadata.""" pdf_path = temp_dir / "metadata.pdf" doc = fitz.open() # Set metadata doc.set_metadata({ "title": "Test Document", "author": "Test Author", "subject": "Testing" }) page = doc.new_page() page.insert_text((50, 50), "Content", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) assert result.metadata.title == "Test Document" assert result.metadata.author == "Test Author" assert result.metadata.subject == "Testing" def test_processing_track_is_direct(self, engine, temp_dir): """Test that processing track is set to DIRECT.""" pdf_path = temp_dir / "track.pdf" doc = fitz.open() page = doc.new_page() page.insert_text((50, 50), "Test", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) assert result.metadata.processing_track == ProcessingTrack.DIRECT def test_confidence_is_perfect(self, engine, temp_dir): """Test that direct extraction has confidence 1.0.""" pdf_path = temp_dir / "confidence.pdf" doc = fitz.open() page = doc.new_page() page.insert_text((50, 50), "High confidence text", fontsize=12) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) text_elements = [e for e in result.pages[0].elements if e.is_text] for element in text_elements: assert element.confidence == 1.0 # ===== Style Extraction Tests ===== def test_extract_font_info(self, engine, temp_dir): """Test extraction of font information.""" pdf_path = temp_dir / "fonts.pdf" doc = fitz.open() page = doc.new_page() page.insert_text((50, 50), "Normal text", fontsize=12) page.insert_text((50, 80), "Large text", fontsize=20) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) text_elements = [e for e in result.pages[0].elements if e.is_text and e.style] # Should have style information assert len(text_elements) > 0 # Check font sizes are different font_sizes = [e.style.font_size for e in text_elements if e.style.font_size] if len(font_sizes) >= 2: assert max(font_sizes) > min(font_sizes) # ===== Error Handling Tests ===== def test_nonexistent_file(self, engine, temp_dir): """Test handling of non-existent file.""" pdf_path = temp_dir / "nonexistent.pdf" result = engine.extract(pdf_path) # Should return document with errors assert result.processing_errors is not None assert len(result.processing_errors) > 0 def test_corrupted_pdf(self, engine, temp_dir): """Test handling of corrupted PDF.""" pdf_path = temp_dir / "corrupted.pdf" # Create invalid PDF file pdf_path.write_bytes(b"This is not a valid PDF file") result = engine.extract(pdf_path) # Should return document with errors assert result.processing_errors is not None def test_empty_pdf_page(self, engine, temp_dir): """Test handling of PDF with empty pages.""" pdf_path = temp_dir / "empty_page.pdf" doc = fitz.open() doc.new_page() # Empty page doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) assert len(result.pages) == 1 # May or may not have elements # ===== Link Extraction Tests ===== def test_extract_hyperlinks(self, engine, temp_dir): """Test extraction of hyperlinks.""" pdf_path = temp_dir / "links.pdf" doc = fitz.open() page = doc.new_page() # Add text and link page.insert_text((50, 50), "Click here", fontsize=12) # Add hyperlink link_rect = fitz.Rect(50, 40, 120, 60) page.insert_link({ 'kind': fitz.LINK_URI, 'from': link_rect, 'uri': 'https://example.com' }) doc.save(str(pdf_path)) doc.close() result = engine.extract(pdf_path) reference_elements = [e for e in result.pages[0].elements if e.type == ElementType.REFERENCE] assert len(reference_elements) >= 1 # Check URI is extracted if reference_elements: link = reference_elements[0] assert link.content.get('uri') == 'https://example.com' # ===== Performance Tests ===== def test_large_document_performance(self, engine, temp_dir): """Test extraction performance on larger document.""" import time pdf_path = temp_dir / "large.pdf" doc = fitz.open() # Create 20 pages with content for i in range(20): page = doc.new_page() for j in range(10): page.insert_text((50, 50 + j*20), f"Page {i+1} Line {j+1}: Lorem ipsum dolor sit amet", fontsize=10) doc.save(str(pdf_path)) doc.close() start_time = time.time() result = engine.extract(pdf_path) elapsed = time.time() - start_time assert len(result.pages) == 20 assert elapsed < 30 # Should complete within 30 seconds # Check processing time is recorded assert result.metadata.processing_time > 0