Files
OCR/backend/app/services/direct_extraction_engine.py
egg 108784a270 fix: resolve table/image overlap and missing images in Direct track PDF generation
This commit fixes two critical rendering issues in Direct track PDF generation
that were reported by the user after the span-based rendering fixes.

## Issue 1: Table Text Overlap (表格跟文字重疊)

**Problem**: Tables rendered with duplicate text appearing on top because
DirectExtractionEngine extracts table content as both TABLE elements (with
structure) and separate TEXT elements (individual text blocks), causing
PDFGeneratorService to render both and create overlaps.

**Solution**: Implemented overlap filtering mechanism with area-based detection

**Changes**:
- Added `_is_element_inside_regions()` method in PDFGeneratorService
  - Uses overlap ratio detection (50% threshold) instead of strict containment
  - Handles cases where text blocks are larger than detected regions
  - Algorithm: filters element if ≥50% of its area overlaps with table/image bbox

- Modified `_generate_direct_track_pdf()` to:
  - Collect exclusion regions (tables + images) before rendering
  - Check each text/list element for overlap before drawing
  - Skip elements that significantly overlap with exclusion regions

**Evidence**:
- Test case: "PRODUCT DESCRIPTION" text block overlaps 74.5% with table
- File size reduced by 545 bytes (-3.8%) from filtered elements
- E2E tests passed: test_2_4_1_simple_tables, test_2_4_2_complex_tables
- User confirmed: "表格問題看起來處理好了" ✓

## Issue 2: Missing Images (圖片消失)

**Problem**: Images not rendering in generated PDFs because `extract()` was
called without `output_dir` parameter, causing images to not be saved to
filesystem, resulting in missing `saved_path` in element content.

**Solution**: Auto-create default output directory for image extraction

**Changes**:
- Modified `DirectExtractionEngine.extract()` to:
  - Auto-create `storage/results/{document_id}/` when output_dir not provided
  - Ensures images always saved when enable_image_extraction=True
  - Uses short UUID (8 chars) for cleaner directory names
  - Maintains backward compatibility (existing calls still work)

**Evidence**:
- Image extraction: 2/2 images saved to storage/results/
- Image files: 5,320 + 4,945 = 10,265 bytes total
- PDF file size: 13,627 → 26,643 bytes (+13,016 bytes, +95.5%)
- PyMuPDF verification: 2 images embedded in page 1
- E2E tests passed: test_1_3_2_direct_track_image_rendering, test_1_3_3_verify_image_paths

## Technical Details

**Overlap Filtering Algorithm**:
```
For each text/list element:
  For each table/image region:
    Calculate overlap_area = intersection(element_bbox, region_bbox)
    Calculate overlap_ratio = overlap_area / element_area
    If overlap_ratio ≥ 0.5: SKIP element (inside region)
```

**Key Advantages**:
- Area-based vs strict containment (handles larger text blocks)
- Configurable threshold (default 50%, adjustable if needed)
- Preserves reading order and layout
- No breaking changes to existing code

## Test Results

**E2E Test Suite**: 6/8 passed (2 OCR track timeouts unrelated to these fixes)
-  test_1_3_2_direct_track_image_rendering
-  test_1_3_3_verify_image_paths
-  test_2_4_1_simple_tables
-  test_2_4_2_complex_tables
-  test_4_4_1_compare_direct_with_original

**File Size Evidence**:
- Text-only (no images): 13,627 bytes
- With images (both fixes): 26,643 bytes
- Difference: +13,016 bytes (+95.5%) confirming image inclusion

**Visual Quality**:
- Tables render without text overlay ✓
- Images embedded correctly (2/2) ✓
- Text outside regions still renders ✓
- No duplicate rendering ✓

## Files Changed

- backend/app/services/pdf_generator_service.py
  - Added _is_element_inside_regions() (lines 592-642)
  - Modified _generate_direct_track_pdf() (lines 697-766)

- backend/app/services/direct_extraction_engine.py
  - Modified extract() (lines 78-84)

- backend/tests/e2e/TEST_RESULTS_FINAL_FIX.md
  - Comprehensive test documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 16:31:28 +08:00

911 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Direct Extraction Engine using PyMuPDF
Handles direct text and structure extraction from editable PDFs without OCR.
This provides much faster processing and perfect accuracy for documents with
extractable text.
"""
import os
import logging
import fitz # PyMuPDF
import uuid
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
from datetime import datetime
import re
from ..models.unified_document import (
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
ElementType, ProcessingTrack
)
logger = logging.getLogger(__name__)
class DirectExtractionEngine:
"""
Engine for direct text extraction from editable PDFs using PyMuPDF.
This engine provides:
- Fast text extraction with exact positioning
- Font and style information preservation
- Table structure detection
- Image extraction with coordinates
- Hyperlink and annotation extraction
"""
def __init__(self,
enable_table_detection: bool = True,
enable_image_extraction: bool = True,
min_table_rows: int = 2,
min_table_cols: int = 2):
"""
Initialize the extraction engine.
Args:
enable_table_detection: Whether to detect and extract tables
enable_image_extraction: Whether to extract images
min_table_rows: Minimum rows for table detection
min_table_cols: Minimum columns for table detection
"""
self.enable_table_detection = enable_table_detection
self.enable_image_extraction = enable_image_extraction
self.min_table_rows = min_table_rows
self.min_table_cols = min_table_cols
def extract(self,
file_path: Path,
output_dir: Optional[Path] = None) -> UnifiedDocument:
"""
Extract content from PDF file to UnifiedDocument format.
Args:
file_path: Path to PDF file
output_dir: Optional directory to save extracted images.
If not provided, creates a temporary directory in storage/results/{document_id}/
Returns:
UnifiedDocument with extracted content
"""
start_time = datetime.now()
document_id = str(uuid.uuid4())[:8] # Short ID for cleaner paths
try:
doc = fitz.open(str(file_path))
# If no output_dir provided, create default directory for image extraction
if output_dir is None and self.enable_image_extraction:
# Create temporary directory in storage/results
default_output_dir = Path("storage/results") / document_id
default_output_dir.mkdir(parents=True, exist_ok=True)
output_dir = default_output_dir
logger.debug(f"Created default output directory: {output_dir}")
# Extract document metadata
metadata = self._extract_metadata(file_path, doc, start_time)
# Extract pages
pages = []
for page_num in range(len(doc)):
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
page = self._extract_page(
doc[page_num],
page_num + 1,
document_id,
output_dir
)
pages.append(page)
doc.close()
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
metadata.processing_time = processing_time
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages
)
except Exception as e:
logger.error(f"Error during direct extraction: {e}")
# Return partial result with error information
processing_time = (datetime.now() - start_time).total_seconds()
if 'metadata' not in locals():
metadata = DocumentMetadata(
filename=file_path.name,
file_type="pdf",
file_size=file_path.stat().st_size if file_path.exists() else 0,
created_at=datetime.now(),
processing_track=ProcessingTrack.DIRECT,
processing_time=processing_time
)
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages if 'pages' in locals() else [],
processing_errors=[{
"error": str(e),
"type": type(e).__name__
}]
)
def _extract_metadata(self,
file_path: Path,
doc: fitz.Document,
start_time: datetime) -> DocumentMetadata:
"""Extract document metadata"""
pdf_metadata = doc.metadata
return DocumentMetadata(
filename=file_path.name,
file_type="pdf",
file_size=file_path.stat().st_size,
created_at=start_time,
processing_track=ProcessingTrack.DIRECT,
processing_time=0.0, # Will be updated later
title=pdf_metadata.get("title"),
author=pdf_metadata.get("author"),
subject=pdf_metadata.get("subject"),
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
producer=pdf_metadata.get("producer"),
creator=pdf_metadata.get("creator"),
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
)
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
"""Parse PDF date string to datetime"""
if not date_str:
return None
try:
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
# Example: D:20240101120000+09'00
if date_str.startswith("D:"):
date_str = date_str[2:]
# Extract just the date/time part (first 14 characters)
if len(date_str) >= 14:
date_part = date_str[:14]
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
except:
pass
return None
def _extract_page(self,
page: fitz.Page,
page_num: int,
document_id: str,
output_dir: Optional[Path]) -> Page:
"""Extract content from a single page"""
elements = []
element_counter = 0
# Get page dimensions
rect = page.rect
dimensions = Dimensions(
width=rect.width,
height=rect.height,
dpi=72 # PDF standard DPI
)
# Extract text blocks with formatting (sort=True for reading order)
text_dict = page.get_text("dict", sort=True)
for block_idx, block in enumerate(text_dict.get("blocks", [])):
if block.get("type") == 0: # Text block
element = self._process_text_block(
block, page_num, element_counter
)
if element:
elements.append(element)
element_counter += 1
# Extract tables (if enabled)
if self.enable_table_detection:
try:
# Try native table detection (PyMuPDF 1.23.0+)
tables = page.find_tables()
for table_idx, table in enumerate(tables):
element = self._process_native_table(
table, page_num, element_counter
)
if element:
elements.append(element)
element_counter += 1
except AttributeError:
# Fallback to positional table detection
logger.debug("Native table detection not available, using positional detection")
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
elements.extend(table_elements)
element_counter += len(table_elements)
# Extract images (if enabled)
if self.enable_image_extraction:
image_elements = self._extract_images(
page, page_num, document_id, element_counter, output_dir
)
elements.extend(image_elements)
element_counter += len(image_elements)
# Extract hyperlinks
links = page.get_links()
for link_idx, link in enumerate(links):
# Create link annotation element if it has URI
if link.get("uri"):
from_rect = link.get("from")
if from_rect:
element = DocumentElement(
element_id=f"link_{page_num}_{element_counter}",
type=ElementType.REFERENCE,
content={"uri": link["uri"], "type": "hyperlink"},
bbox=BoundingBox(
x0=from_rect.x0,
y0=from_rect.y0,
x1=from_rect.x1,
y1=from_rect.y1
),
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
)
elements.append(element)
element_counter += 1
# Extract vector graphics (as metadata)
drawings = page.get_drawings()
if drawings:
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
# PyMuPDF's sort=True already provides good reading order for multi-column layouts
# (top-to-bottom, left-to-right within each row). We don't need to re-sort.
# NOTE: If sort=True is not used in get_text(), uncomment the line below:
# elements = self._sort_elements_for_reading_order(elements, dimensions)
# Post-process elements for header/footer detection and structure
elements = self._detect_headers_footers(elements, dimensions)
elements = self._build_section_hierarchy(elements)
elements = self._build_nested_lists(elements)
return Page(
page_number=page_num,
elements=elements,
dimensions=dimensions,
metadata={
"has_drawings": len(drawings) > 0,
"drawing_count": len(drawings),
"link_count": len(links)
}
)
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
"""
Sort elements by reading order, handling multi-column layouts.
For multi-column layouts (e.g., two-column documents), this ensures
elements are ordered correctly: top-to-bottom, then left-to-right
within each row.
Args:
elements: List of document elements
dimensions: Page dimensions
Returns:
Sorted list of elements in reading order
"""
if not elements:
return elements
# Detect if page has multi-column layout
text_elements = [e for e in elements if e.bbox and e.is_text]
if len(text_elements) < 3:
# Too few elements to determine layout, just sort by Y position
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
# Cluster x-positions to detect columns
x_positions = [e.bbox.x0 for e in text_elements]
columns = self._detect_columns(x_positions, dimensions.width)
if len(columns) <= 1:
# Single column layout - simple top-to-bottom sort
logger.debug(f"Detected single-column layout")
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}")
# Multi-column layout - use newspaper-style reading order
# (complete left column, then right column, etc.)
# This is more appropriate for technical documents and data sheets
element_data = []
for elem in elements:
if not elem.bbox:
element_data.append((elem, 0, 0))
continue
# Find which column this element belongs to
col_idx = 0
min_dist = float('inf')
for i, col_x in enumerate(columns):
dist = abs(elem.bbox.x0 - col_x)
if dist < min_dist:
min_dist = dist
col_idx = i
element_data.append((elem, col_idx, elem.bbox.y0))
# Sort by: column first, then Y position within column
# This gives newspaper-style reading: complete column 1, then column 2, etc.
element_data.sort(key=lambda x: (x[1], x[2]))
logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)")
return [e[0] for e in element_data]
def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]:
"""
Detect column positions from x-coordinates of text elements.
Args:
x_positions: List of x-coordinates (left edges of text)
page_width: Page width in points
Returns:
List of column x-positions (sorted left to right)
"""
if not x_positions:
return []
# Cluster x-positions to find column starts
# Use k-means-like approach: find groups of x-positions
threshold = page_width * 0.15 # 15% of page width as clustering threshold
sorted_x = sorted(set(x_positions))
if not sorted_x:
return []
clusters = [[sorted_x[0]]]
for x in sorted_x[1:]:
# Check if x belongs to current cluster
cluster_center = sum(clusters[-1]) / len(clusters[-1])
if abs(x - cluster_center) < threshold:
clusters[-1].append(x)
else:
# Start new cluster
clusters.append([x])
# Return average x position of each cluster (column start)
column_positions = [sum(cluster) / len(cluster) for cluster in clusters]
# Filter out columns that are too close to each other
min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart
filtered_columns = [column_positions[0]]
for col_x in column_positions[1:]:
if col_x - filtered_columns[-1] >= min_column_width:
filtered_columns.append(col_x)
return filtered_columns
def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
"""Detect and mark header/footer elements based on page position"""
page_height = dimensions.height
header_threshold = page_height * 0.1 # Top 10% of page
footer_threshold = page_height * 0.9 # Bottom 10% of page
for elem in elements:
# Skip non-text elements
if not elem.is_text:
continue
# Check if element is in header region
if elem.bbox.y1 <= header_threshold:
# Only mark as header if it's short text
if isinstance(elem.content, str) and len(elem.content) < 200:
elem.type = ElementType.HEADER
elem.metadata['is_page_header'] = True
# Check if element is in footer region
elif elem.bbox.y0 >= footer_threshold:
# Short text in footer region
if isinstance(elem.content, str) and len(elem.content) < 200:
elem.type = ElementType.FOOTER
elem.metadata['is_page_footer'] = True
return elements
def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]:
"""Build hierarchical section structure based on font sizes"""
# Collect all headers with their font sizes
headers = []
for elem in elements:
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
# Get average font size from style
font_size = 12.0 # Default
if elem.style and elem.style.font_size:
font_size = elem.style.font_size
headers.append((elem, font_size))
if not headers:
return elements
# Sort headers by font size to determine hierarchy levels
font_sizes = sorted(set(size for _, size in headers), reverse=True)
size_to_level = {size: level for level, size in enumerate(font_sizes, 1)}
# Assign section levels to headers
for elem, font_size in headers:
level = size_to_level.get(font_size, 1)
elem.metadata['section_level'] = level
elem.metadata['font_size'] = font_size
# Build parent-child relationships between headers
header_stack = [] # Stack of (element, level)
for elem, font_size in headers:
level = elem.metadata['section_level']
# Pop headers that are at same or lower level (larger font)
while header_stack and header_stack[-1][1] >= level:
header_stack.pop()
# Set parent header
if header_stack:
parent = header_stack[-1][0]
elem.metadata['parent_section'] = parent.element_id
if 'child_sections' not in parent.metadata:
parent.metadata['child_sections'] = []
parent.metadata['child_sections'].append(elem.element_id)
header_stack.append((elem, level))
# Link content to nearest preceding header at same or higher level
current_header = None
for elem in elements:
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
current_header = elem
elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]:
elem.metadata['section_id'] = current_header.element_id
return elements
def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]:
"""Build nested list structure from flat list items"""
# Group list items
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
if not list_items:
return elements
# Sort by position (top to bottom)
list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0))
# Detect indentation levels based on x position
x_positions = [item.bbox.x0 for item in list_items]
if not x_positions:
return elements
min_x = min(x_positions)
indent_unit = 20 # Typical indent size in points
# Assign nesting levels
for item in list_items:
indent = item.bbox.x0 - min_x
level = int(indent / indent_unit)
item.metadata['list_level'] = level
# Build parent-child relationships
item_stack = [] # Stack of (element, level)
for item in list_items:
level = item.metadata.get('list_level', 0)
# Pop items at same or deeper level
while item_stack and item_stack[-1][1] >= level:
item_stack.pop()
# Set parent
if item_stack:
parent = item_stack[-1][0]
item.metadata['parent_item'] = parent.element_id
if 'children' not in parent.metadata:
parent.metadata['children'] = []
parent.metadata['children'].append(item.element_id)
# Also add to actual children list
parent.children.append(item)
item_stack.append((item, level))
return elements
def _process_text_block(self, block: Dict, page_num: int, counter: int) -> Optional[DocumentElement]:
"""Process a text block into a DocumentElement"""
# Calculate block bounding box
bbox_data = block.get("bbox", [0, 0, 0, 0])
bbox = BoundingBox(
x0=bbox_data[0],
y0=bbox_data[1],
x1=bbox_data[2],
y1=bbox_data[3]
)
# Extract text content and span information
text_parts = []
styles = []
span_children = [] # Store span-level children for inline styling
span_counter = 0
for line in block.get("lines", []):
for span in line.get("spans", []):
text = span.get("text", "")
if text:
text_parts.append(text)
# Extract style information
style = StyleInfo(
font_name=span.get("font"),
font_size=span.get("size"),
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
text_color=span.get("color")
)
styles.append(style)
# Create span child element for inline styling
span_bbox_data = span.get("bbox", bbox_data)
span_bbox = BoundingBox(
x0=span_bbox_data[0],
y0=span_bbox_data[1],
x1=span_bbox_data[2],
y1=span_bbox_data[3]
)
span_element = DocumentElement(
element_id=f"span_{page_num}_{counter}_{span_counter}",
type=ElementType.TEXT, # Spans are always text
content=text,
bbox=span_bbox,
style=style,
confidence=1.0,
metadata={"span_index": span_counter}
)
span_children.append(span_element)
span_counter += 1
if not text_parts:
return None
full_text = "".join(text_parts)
# Determine element type based on content and style
element_type = self._infer_element_type(full_text, styles)
# Use the most common style for the block
if styles:
block_style = styles[0] # Could be improved with style merging
else:
block_style = None
return DocumentElement(
element_id=f"text_{page_num}_{counter}",
type=element_type,
content=full_text,
bbox=bbox,
style=block_style,
confidence=1.0, # Direct extraction has perfect confidence
children=span_children # Store span children for inline styling
)
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
"""Infer element type based on text content and styling"""
text_lower = text.lower().strip()
# Check for common patterns
if len(text_lower) < 100 and styles:
# Short text with large font might be title/header
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
if avg_size > 16:
return ElementType.TITLE
elif avg_size > 14:
return ElementType.HEADER
# Check for list patterns
if re.match(r'^[\d•·▪▫◦‣]\s', text_lower):
return ElementType.LIST_ITEM
# Check for page numbers
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
return ElementType.PAGE_NUMBER
# Check for footnote patterns
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
return ElementType.FOOTNOTE
# Default to paragraph for longer text, text for shorter
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]:
"""Process a natively detected table"""
try:
# Extract table data
data = table.extract()
if not data or len(data) < self.min_table_rows:
return None
# Get table bounding box
bbox_data = table.bbox
bbox = BoundingBox(
x0=bbox_data[0],
y0=bbox_data[1],
x1=bbox_data[2],
y1=bbox_data[3]
)
# Create table cells
cells = []
for row_idx, row in enumerate(data):
for col_idx, cell_text in enumerate(row):
if cell_text:
cells.append(TableCell(
row=row_idx,
col=col_idx,
content=str(cell_text) if cell_text else ""
))
# Create table data
table_data = TableData(
rows=len(data),
cols=max(len(row) for row in data) if data else 0,
cells=cells,
headers=data[0] if data else None # Assume first row is header
)
return DocumentElement(
element_id=f"table_{page_num}_{counter}",
type=ElementType.TABLE,
content=table_data,
bbox=bbox,
confidence=1.0
)
except Exception as e:
logger.error(f"Error processing native table: {e}")
return None
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
"""Detect tables by analyzing text positioning"""
tables = []
# Get all words with positions
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
if not words:
return tables
# Group words by approximate row (y-coordinate)
rows = {}
for word in words:
y = round(word[1] / 5) * 5 # Round to nearest 5 points
if y not in rows:
rows[y] = []
rows[y].append({
'x0': word[0],
'y0': word[1],
'x1': word[2],
'y1': word[3],
'text': word[4],
'block': word[5] if len(word) > 5 else 0
})
# Sort rows by y-coordinate
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
# Find potential tables (consecutive rows with multiple columns)
current_table_rows = []
tables_found = []
for y, words_in_row in sorted_rows:
words_in_row.sort(key=lambda w: w['x0'])
if len(words_in_row) >= self.min_table_cols:
# Check if this could be a table row
x_positions = [w['x0'] for w in words_in_row]
# Check for somewhat regular spacing
if self._has_regular_spacing(x_positions):
current_table_rows.append((y, words_in_row))
else:
# End current table if exists
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
current_table_rows = []
else:
# End current table if exists
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
current_table_rows = []
# Don't forget the last table
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
# Convert detected tables to DocumentElements
for table_idx, table_rows in enumerate(tables_found):
if not table_rows:
continue
# Calculate table bounding box
all_words = []
for _, words in table_rows:
all_words.extend(words)
min_x = min(w['x0'] for w in all_words)
min_y = min(w['y0'] for w in all_words)
max_x = max(w['x1'] for w in all_words)
max_y = max(w['y1'] for w in all_words)
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
# Create table cells
cells = []
for row_idx, (y, words) in enumerate(table_rows):
# Group words into columns
columns = self._group_into_columns(words, table_rows)
for col_idx, col_text in enumerate(columns):
if col_text:
cells.append(TableCell(
row=row_idx,
col=col_idx,
content=col_text
))
# Create table data
table_data = TableData(
rows=len(table_rows),
cols=max(len(self._group_into_columns(words, table_rows))
for _, words in table_rows),
cells=cells
)
element = DocumentElement(
element_id=f"table_{page_num}_{counter + table_idx}",
type=ElementType.TABLE,
content=table_data,
bbox=bbox,
confidence=0.8, # Lower confidence for positional detection
metadata={"detection_method": "positional"}
)
tables.append(element)
return tables
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
"""Check if x positions have somewhat regular spacing"""
if len(x_positions) < 3:
return False
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
avg_spacing = sum(spacings) / len(spacings)
# Check if spacings are within tolerance of average
for spacing in spacings:
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
return False
return True
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
"""Group words into columns based on x-position"""
if not words:
return []
# Find common column positions across all rows
all_x_positions = []
for _, row_words in all_rows:
all_x_positions.extend([w['x0'] for w in row_words])
# Cluster x-positions to find columns
column_positions = self._cluster_positions(all_x_positions)
# Assign words to columns
columns = [""] * len(column_positions)
for word in words:
# Find closest column
closest_col = 0
min_dist = float('inf')
for col_idx, col_x in enumerate(column_positions):
dist = abs(word['x0'] - col_x)
if dist < min_dist:
min_dist = dist
closest_col = col_idx
if columns[closest_col]:
columns[closest_col] += " " + word['text']
else:
columns[closest_col] = word['text']
return columns
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
"""Cluster positions to find common columns"""
if not positions:
return []
sorted_pos = sorted(positions)
clusters = [[sorted_pos[0]]]
for pos in sorted_pos[1:]:
# Check if position belongs to current cluster
if pos - clusters[-1][-1] < threshold:
clusters[-1].append(pos)
else:
clusters.append([pos])
# Return average position of each cluster
return [sum(cluster) / len(cluster) for cluster in clusters]
def _extract_images(self,
page: fitz.Page,
page_num: int,
document_id: str,
counter: int,
output_dir: Optional[Path]) -> List[DocumentElement]:
"""Extract images from page"""
elements = []
image_list = page.get_images()
for img_idx, img in enumerate(image_list):
try:
xref = img[0]
# Get image position(s)
img_rects = page.get_image_rects(xref)
if not img_rects:
continue
rect = img_rects[0] # Use first occurrence
bbox = BoundingBox(
x0=rect.x0,
y0=rect.y0,
x1=rect.x1,
y1=rect.y1
)
# Extract image data
pix = fitz.Pixmap(page.parent, xref)
image_data = {
"width": pix.width,
"height": pix.height,
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
"xref": xref
}
# Save image if output directory provided
if output_dir:
output_dir.mkdir(parents=True, exist_ok=True)
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
image_path = output_dir / image_filename
pix.save(str(image_path))
image_data["saved_path"] = str(image_path)
logger.debug(f"Saved image to {image_path}")
element = DocumentElement(
element_id=f"image_{page_num}_{counter + img_idx}",
type=ElementType.IMAGE,
content=image_data,
bbox=bbox,
confidence=1.0,
metadata={
"image_index": img_idx,
"xref": xref
}
)
elements.append(element)
pix = None # Free memory
except Exception as e:
logger.error(f"Error extracting image {img_idx}: {e}")
return elements