This commit is contained in:
egg
2025-12-04 18:00:37 +08:00
parent 9437387ef1
commit 8265be1741
22 changed files with 2672 additions and 196 deletions

View File

@@ -0,0 +1,43 @@
"""Debug PyMuPDF table.cells structure"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import fitz
pdf_path = Path(__file__).parent.parent.parent / "demo_docs" / "edit3.pdf"
doc = fitz.open(str(pdf_path))
page = doc[0]
tables = page.find_tables()
for idx, table in enumerate(tables.tables):
data = table.extract()
num_rows = len(data)
num_cols = max(len(row) for row in data) if data else 0
print(f"Table {idx}:")
print(f" table.extract() dimensions: {num_rows} rows x {num_cols} cols")
print(f" Expected positions: {num_rows * num_cols}")
cell_rects = getattr(table, 'cells', None)
if cell_rects:
print(f" table.cells length: {len(cell_rects)}")
none_count = sum(1 for c in cell_rects if c is None)
actual_count = sum(1 for c in cell_rects if c is not None)
print(f" None cells: {none_count}")
print(f" Actual cells: {actual_count}")
# Check if cell_rects matches grid size
if len(cell_rects) != num_rows * num_cols:
print(f" WARNING: cell_rects length ({len(cell_rects)}) != grid size ({num_rows * num_cols})")
# Show first few cells
print(f" First 5 cells: {cell_rects[:5]}")
else:
print(f" table.cells: NOT AVAILABLE")
# Check row_count and col_count
print(f" table.row_count: {getattr(table, 'row_count', 'N/A')}")
print(f" table.col_count: {getattr(table, 'col_count', 'N/A')}")
doc.close()

View File

@@ -0,0 +1,48 @@
"""Debug PyMuPDF table structure - find merge info"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import fitz
pdf_path = Path(__file__).parent.parent.parent / "demo_docs" / "edit3.pdf"
doc = fitz.open(str(pdf_path))
page = doc[0]
tables = page.find_tables()
for idx, table in enumerate(tables.tables):
print(f"\nTable {idx}:")
# Check all available attributes
print(f" Available attributes: {[a for a in dir(table) if not a.startswith('_')]}")
# Try to get header info
if hasattr(table, 'header'):
print(f" header: {table.header}")
# Check for cells info
cell_rects = table.cells
print(f" cells count: {len(cell_rects)}")
# Get the extracted data
data = table.extract()
print(f" extract() shape: {len(data)} x {max(len(r) for r in data)}")
# Check if there's a way to map cells to grid positions
# Look at the pandas output which might have merge info
try:
df = table.to_pandas()
print(f" pandas shape: {df.shape}")
except Exception as e:
print(f" pandas error: {e}")
# Check the TableRow objects if available
if hasattr(table, 'rows'):
rows = table.rows
print(f" rows: {len(rows)}")
for ri, row in enumerate(rows[:3]): # first 3 rows
print(f" row {ri}: {len(row.cells)} cells")
for ci, cell in enumerate(row.cells[:5]): # first 5 cells
print(f" cell {ci}: bbox={cell}")
doc.close()

View File

@@ -0,0 +1,111 @@
"""
Generate test PDF to verify Phase 1 fixes
"""
import sys
import os
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.services.direct_extraction_engine import DirectExtractionEngine
from app.services.pdf_generator_service import PDFGeneratorService
from app.services.unified_document_exporter import UnifiedDocumentExporter
def generate_test_pdf(input_pdf: str, output_dir: Path):
"""Generate test PDF using Direct Track extraction"""
input_path = Path(input_pdf)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Processing: {input_path.name}")
print(f"Output dir: {output_dir}")
# Step 1: Extract with Direct Track
engine = DirectExtractionEngine(
enable_table_detection=True,
enable_image_extraction=True,
min_image_area=200.0, # Filter tiny images
enable_whiteout_detection=True,
enable_content_sanitization=True
)
unified_doc = engine.extract(input_path, output_dir=output_dir)
# Print extraction stats
print(f"\n=== Extraction Results ===")
print(f"Document ID: {unified_doc.document_id}")
print(f"Pages: {len(unified_doc.pages)}")
table_count = 0
image_count = 0
merged_cells = 0
total_cells = 0
for page in unified_doc.pages:
for elem in page.elements:
if elem.type.value == 'table':
table_count += 1
if elem.content and hasattr(elem.content, 'cells'):
total_cells += len(elem.content.cells)
for cell in elem.content.cells:
if cell.row_span > 1 or cell.col_span > 1:
merged_cells += 1
elif elem.type.value == 'image':
image_count += 1
print(f"Tables: {table_count}")
print(f" - Total cells: {total_cells}")
print(f" - Merged cells: {merged_cells}")
print(f"Images: {image_count}")
# Step 2: Export to JSON
exporter = UnifiedDocumentExporter()
json_path = output_dir / f"{input_path.stem}_result.json"
exporter.export_to_json(unified_doc, json_path)
print(f"\nJSON saved: {json_path}")
# Step 3: Generate layout PDF
pdf_generator = PDFGeneratorService()
pdf_path = output_dir / f"{input_path.stem}_layout.pdf"
try:
pdf_generator.generate_from_unified_document(
unified_doc=unified_doc,
output_path=pdf_path,
source_file_path=input_path
)
print(f"PDF saved: {pdf_path}")
return pdf_path
except Exception as e:
print(f"PDF generation error: {e}")
import traceback
traceback.print_exc()
return None
if __name__ == "__main__":
# Test with edit3.pdf (has complex tables with merging)
demo_docs = Path(__file__).parent.parent.parent / "demo_docs"
output_base = Path(__file__).parent.parent / "storage" / "test_phase1"
# Process edit3.pdf
edit3_pdf = demo_docs / "edit3.pdf"
if edit3_pdf.exists():
output_dir = output_base / "edit3"
result = generate_test_pdf(str(edit3_pdf), output_dir)
if result:
print(f"\n✓ Test PDF generated: {result}")
# Also process edit.pdf for comparison
edit_pdf = demo_docs / "edit.pdf"
if edit_pdf.exists():
output_dir = output_base / "edit"
result = generate_test_pdf(str(edit_pdf), output_dir)
if result:
print(f"\n✓ Test PDF generated: {result}")
print(f"\n=== Output Location ===")
print(f"{output_base}")

View File

@@ -0,0 +1,285 @@
"""
Phase 1 Bug Fixes Verification Tests
Tests for:
1.1 Direct Track table cell merging
1.2 OCR Track image path preservation
1.3 Cell boxes coordinate validation
1.4 Tiny decoration image filtering
1.5 Covering image removal
"""
import sys
import os
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent.parent))
import fitz
from app.services.direct_extraction_engine import DirectExtractionEngine
from app.services.ocr_to_unified_converter import validate_cell_boxes
from app.models.unified_document import TableCell
def test_1_1_table_cell_merging():
"""Test 1.1.5: Verify edit3.pdf returns correct merged cells"""
print("\n" + "="*60)
print("TEST 1.1: Direct Track Table Cell Merging")
print("="*60)
pdf_path = Path(__file__).parent.parent.parent / "demo_docs" / "edit3.pdf"
if not pdf_path.exists():
print(f"SKIP: {pdf_path} not found")
return False
doc = fitz.open(str(pdf_path))
total_cells = 0
merged_cells = 0
for page_num, page in enumerate(doc):
tables = page.find_tables()
for table_idx, table in enumerate(tables.tables):
data = table.extract()
cell_rects = getattr(table, 'cells', None)
if cell_rects:
num_rows = len(data)
num_cols = max(len(row) for row in data) if data else 0
# Count actual cells (non-None)
actual_cells = sum(1 for c in cell_rects if c is not None)
none_cells = sum(1 for c in cell_rects if c is None)
print(f" Page {page_num}, Table {table_idx}:")
print(f" Grid size: {num_rows} x {num_cols} = {num_rows * num_cols} positions")
print(f" Actual cells: {actual_cells}")
print(f" Merged positions (None): {none_cells}")
total_cells += actual_cells
if none_cells > 0:
merged_cells += 1
doc.close()
print(f"\n Total actual cells across all tables: {total_cells}")
print(f" Tables with merging: {merged_cells}")
# According to PLAN.md, edit3.pdf should have 83 cells (not 204)
# The presence of None values indicates merging is detected
if total_cells > 0 and total_cells < 204:
print(" RESULT: PASS - Cell merging detected correctly")
return True
elif total_cells == 204:
print(" RESULT: FAIL - All cells treated as 1x1 (no merging detected)")
return False
else:
print(f" RESULT: INCONCLUSIVE - {total_cells} cells found")
return None
def test_1_3_cell_boxes_validation():
"""Test 1.3: Verify cell_boxes coordinate validation"""
print("\n" + "="*60)
print("TEST 1.3: Cell Boxes Coordinate Validation")
print("="*60)
# Test case 1: Valid coordinates
valid_boxes = [
[10, 10, 100, 50],
[100, 10, 200, 50],
[10, 50, 200, 100]
]
result = validate_cell_boxes(valid_boxes, [0, 0, 300, 200], 300, 200)
print(f" Valid boxes: valid={result['valid']}, invalid_count={result['invalid_count']}")
assert result['valid'], "Valid boxes should pass validation"
# Test case 2: Out of bounds coordinates
invalid_boxes = [
[-10, 10, 100, 50], # x0 < 0
[10, 10, 400, 50], # x1 > page_width
[10, 10, 100, 300] # y1 > page_height
]
result = validate_cell_boxes(invalid_boxes, [0, 0, 300, 200], 300, 200)
print(f" Invalid boxes: valid={result['valid']}, invalid_count={result['invalid_count']}")
assert not result['valid'], "Invalid boxes should fail validation"
assert result['invalid_count'] == 3, "Should detect 3 invalid boxes"
# Test case 3: Clamping
assert len(result['clamped_boxes']) == 3, "Should return clamped boxes"
clamped = result['clamped_boxes'][0]
assert clamped[0] >= 0, "Clamped x0 should be >= 0"
print(" RESULT: PASS - Coordinate validation works correctly")
return True
def test_1_4_tiny_image_filtering():
"""Test 1.4: Verify tiny decoration image filtering"""
print("\n" + "="*60)
print("TEST 1.4: Tiny Decoration Image Filtering")
print("="*60)
pdf_path = Path(__file__).parent.parent.parent / "demo_docs" / "edit3.pdf"
if not pdf_path.exists():
print(f"SKIP: {pdf_path} not found")
return None
doc = fitz.open(str(pdf_path))
tiny_count = 0
normal_count = 0
min_area = 200 # Same threshold as in DirectExtractionEngine
for page_num, page in enumerate(doc):
images = page.get_images()
for img in images:
xref = img[0]
rects = page.get_image_rects(xref)
if rects:
rect = rects[0]
area = (rect.x1 - rect.x0) * (rect.y1 - rect.y0)
if area < min_area:
tiny_count += 1
print(f" Page {page_num}: Tiny image xref={xref}, area={area:.1f} px²")
else:
normal_count += 1
doc.close()
print(f"\n Tiny images (< {min_area} px²): {tiny_count}")
print(f" Normal images: {normal_count}")
if tiny_count > 0:
print(" RESULT: PASS - Tiny images detected, will be filtered")
return True
else:
print(" RESULT: INFO - No tiny images found in test file")
return None
def test_1_5_covering_image_detection():
"""Test 1.5: Verify covering image detection"""
print("\n" + "="*60)
print("TEST 1.5: Covering Image Detection")
print("="*60)
pdf_path = Path(__file__).parent.parent.parent / "demo_docs" / "edit3.pdf"
if not pdf_path.exists():
print(f"SKIP: {pdf_path} not found")
return None
engine = DirectExtractionEngine(
enable_whiteout_detection=True,
whiteout_iou_threshold=0.8
)
doc = fitz.open(str(pdf_path))
total_covering = 0
for page_num, page in enumerate(doc):
result = engine._preprocess_page(page, page_num, doc)
covering_images = result.get('covering_images', [])
if covering_images:
print(f" Page {page_num}: {len(covering_images)} covering images detected")
for img in covering_images[:3]: # Show first 3
print(f" - xref={img.get('xref')}, type={img.get('color_type')}, "
f"bbox={[round(x, 1) for x in img.get('bbox', [])]}")
total_covering += len(covering_images)
doc.close()
print(f"\n Total covering images detected: {total_covering}")
if total_covering > 0:
print(" RESULT: PASS - Covering images detected, will be filtered")
return True
else:
print(" RESULT: INFO - No covering images found in test file")
return None
def test_direct_extraction_full():
"""Full integration test for Direct Track extraction"""
print("\n" + "="*60)
print("INTEGRATION TEST: Direct Track Full Extraction")
print("="*60)
pdf_path = Path(__file__).parent.parent.parent / "demo_docs" / "edit3.pdf"
if not pdf_path.exists():
print(f"SKIP: {pdf_path} not found")
return None
engine = DirectExtractionEngine(
enable_table_detection=True,
enable_image_extraction=True,
min_image_area=200.0,
enable_whiteout_detection=True
)
try:
result = engine.extract(pdf_path) # Pass Path object, not string
# Count elements
table_count = 0
image_count = 0
merged_table_count = 0
for page in result.pages:
for elem in page.elements:
if elem.type.value == 'table':
table_count += 1
if elem.content and hasattr(elem.content, 'cells'):
# Check for merged cells
for cell in elem.content.cells:
if cell.row_span > 1 or cell.col_span > 1:
merged_table_count += 1
break
elif elem.type.value == 'image':
image_count += 1
print(f" Document ID: {result.document_id}")
print(f" Pages: {len(result.pages)}")
print(f" Tables: {table_count} (with merging: {merged_table_count})")
print(f" Images: {image_count}")
print(" RESULT: PASS - Extraction completed successfully")
return True
except Exception as e:
print(f" RESULT: FAIL - {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
print("="*60)
print("Phase 1 Bug Fixes Verification Tests")
print("="*60)
results = {}
# Run tests
results['1.1_table_merging'] = test_1_1_table_cell_merging()
results['1.3_coord_validation'] = test_1_3_cell_boxes_validation()
results['1.4_tiny_filtering'] = test_1_4_tiny_image_filtering()
results['1.5_covering_detection'] = test_1_5_covering_image_detection()
results['integration'] = test_direct_extraction_full()
# Summary
print("\n" + "="*60)
print("TEST SUMMARY")
print("="*60)
for test_name, result in results.items():
status = "PASS" if result is True else "FAIL" if result is False else "SKIP/INFO"
print(f" {test_name}: {status}")
passed = sum(1 for r in results.values() if r is True)
failed = sum(1 for r in results.values() if r is False)
skipped = sum(1 for r in results.values() if r is None)
print(f"\n Total: {passed} passed, {failed} failed, {skipped} skipped/info")