- Enable PyMuPDF sort=True for correct reading order in multi-column PDFs - Add column detection utilities (_sort_elements_for_reading_order, _detect_columns) - Preserve extraction order in PDF generation instead of re-sorting by Y position - Fix StyleInfo field names (font_name, font_size, text_color instead of font, size, color) - Fix Page.dimensions access (was incorrectly accessing Page.width directly) - Implement row-by-row reading order (top-to-bottom, left-to-right within each row) This fixes the issue where multi-column PDFs (e.g., technical data sheets) had incorrect element ordering, with title appearing at position 12 instead of first. PyMuPDF's built-in sort=True parameter provides optimal reading order for most multi-column layouts without requiring custom column detection. Resolves: Multi-column layout reading order issue reported by user Affects: Direct track PDF extraction and generation (Task 8) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
902 lines
33 KiB
Python
902 lines
33 KiB
Python
"""
|
||
Direct Extraction Engine using PyMuPDF
|
||
|
||
Handles direct text and structure extraction from editable PDFs without OCR.
|
||
This provides much faster processing and perfect accuracy for documents with
|
||
extractable text.
|
||
"""
|
||
|
||
import os
|
||
import logging
|
||
import fitz # PyMuPDF
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||
from datetime import datetime
|
||
import re
|
||
|
||
from ..models.unified_document import (
|
||
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
|
||
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
|
||
ElementType, ProcessingTrack
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class DirectExtractionEngine:
|
||
"""
|
||
Engine for direct text extraction from editable PDFs using PyMuPDF.
|
||
|
||
This engine provides:
|
||
- Fast text extraction with exact positioning
|
||
- Font and style information preservation
|
||
- Table structure detection
|
||
- Image extraction with coordinates
|
||
- Hyperlink and annotation extraction
|
||
"""
|
||
|
||
def __init__(self,
|
||
enable_table_detection: bool = True,
|
||
enable_image_extraction: bool = True,
|
||
min_table_rows: int = 2,
|
||
min_table_cols: int = 2):
|
||
"""
|
||
Initialize the extraction engine.
|
||
|
||
Args:
|
||
enable_table_detection: Whether to detect and extract tables
|
||
enable_image_extraction: Whether to extract images
|
||
min_table_rows: Minimum rows for table detection
|
||
min_table_cols: Minimum columns for table detection
|
||
"""
|
||
self.enable_table_detection = enable_table_detection
|
||
self.enable_image_extraction = enable_image_extraction
|
||
self.min_table_rows = min_table_rows
|
||
self.min_table_cols = min_table_cols
|
||
|
||
def extract(self,
|
||
file_path: Path,
|
||
output_dir: Optional[Path] = None) -> UnifiedDocument:
|
||
"""
|
||
Extract content from PDF file to UnifiedDocument format.
|
||
|
||
Args:
|
||
file_path: Path to PDF file
|
||
output_dir: Optional directory to save extracted images
|
||
|
||
Returns:
|
||
UnifiedDocument with extracted content
|
||
"""
|
||
start_time = datetime.now()
|
||
document_id = str(uuid.uuid4())
|
||
|
||
try:
|
||
doc = fitz.open(str(file_path))
|
||
|
||
# Extract document metadata
|
||
metadata = self._extract_metadata(file_path, doc, start_time)
|
||
|
||
# Extract pages
|
||
pages = []
|
||
for page_num in range(len(doc)):
|
||
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
|
||
page = self._extract_page(
|
||
doc[page_num],
|
||
page_num + 1,
|
||
document_id,
|
||
output_dir
|
||
)
|
||
pages.append(page)
|
||
|
||
doc.close()
|
||
|
||
# Calculate processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
metadata.processing_time = processing_time
|
||
|
||
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during direct extraction: {e}")
|
||
# Return partial result with error information
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
if 'metadata' not in locals():
|
||
metadata = DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size if file_path.exists() else 0,
|
||
created_at=datetime.now(),
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=processing_time
|
||
)
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages if 'pages' in locals() else [],
|
||
processing_errors=[{
|
||
"error": str(e),
|
||
"type": type(e).__name__
|
||
}]
|
||
)
|
||
|
||
def _extract_metadata(self,
|
||
file_path: Path,
|
||
doc: fitz.Document,
|
||
start_time: datetime) -> DocumentMetadata:
|
||
"""Extract document metadata"""
|
||
pdf_metadata = doc.metadata
|
||
|
||
return DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size,
|
||
created_at=start_time,
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=0.0, # Will be updated later
|
||
title=pdf_metadata.get("title"),
|
||
author=pdf_metadata.get("author"),
|
||
subject=pdf_metadata.get("subject"),
|
||
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
|
||
producer=pdf_metadata.get("producer"),
|
||
creator=pdf_metadata.get("creator"),
|
||
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
|
||
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
|
||
)
|
||
|
||
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
|
||
"""Parse PDF date string to datetime"""
|
||
if not date_str:
|
||
return None
|
||
|
||
try:
|
||
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
|
||
# Example: D:20240101120000+09'00
|
||
if date_str.startswith("D:"):
|
||
date_str = date_str[2:]
|
||
|
||
# Extract just the date/time part (first 14 characters)
|
||
if len(date_str) >= 14:
|
||
date_part = date_str[:14]
|
||
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
|
||
except:
|
||
pass
|
||
|
||
return None
|
||
|
||
def _extract_page(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
output_dir: Optional[Path]) -> Page:
|
||
"""Extract content from a single page"""
|
||
elements = []
|
||
element_counter = 0
|
||
|
||
# Get page dimensions
|
||
rect = page.rect
|
||
dimensions = Dimensions(
|
||
width=rect.width,
|
||
height=rect.height,
|
||
dpi=72 # PDF standard DPI
|
||
)
|
||
|
||
# Extract text blocks with formatting (sort=True for reading order)
|
||
text_dict = page.get_text("dict", sort=True)
|
||
for block_idx, block in enumerate(text_dict.get("blocks", [])):
|
||
if block.get("type") == 0: # Text block
|
||
element = self._process_text_block(
|
||
block, page_num, element_counter
|
||
)
|
||
if element:
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# Extract tables (if enabled)
|
||
if self.enable_table_detection:
|
||
try:
|
||
# Try native table detection (PyMuPDF 1.23.0+)
|
||
tables = page.find_tables()
|
||
for table_idx, table in enumerate(tables):
|
||
element = self._process_native_table(
|
||
table, page_num, element_counter
|
||
)
|
||
if element:
|
||
elements.append(element)
|
||
element_counter += 1
|
||
except AttributeError:
|
||
# Fallback to positional table detection
|
||
logger.debug("Native table detection not available, using positional detection")
|
||
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
|
||
elements.extend(table_elements)
|
||
element_counter += len(table_elements)
|
||
|
||
# Extract images (if enabled)
|
||
if self.enable_image_extraction:
|
||
image_elements = self._extract_images(
|
||
page, page_num, document_id, element_counter, output_dir
|
||
)
|
||
elements.extend(image_elements)
|
||
element_counter += len(image_elements)
|
||
|
||
# Extract hyperlinks
|
||
links = page.get_links()
|
||
for link_idx, link in enumerate(links):
|
||
# Create link annotation element if it has URI
|
||
if link.get("uri"):
|
||
from_rect = link.get("from")
|
||
if from_rect:
|
||
element = DocumentElement(
|
||
element_id=f"link_{page_num}_{element_counter}",
|
||
type=ElementType.REFERENCE,
|
||
content={"uri": link["uri"], "type": "hyperlink"},
|
||
bbox=BoundingBox(
|
||
x0=from_rect.x0,
|
||
y0=from_rect.y0,
|
||
x1=from_rect.x1,
|
||
y1=from_rect.y1
|
||
),
|
||
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
|
||
)
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# Extract vector graphics (as metadata)
|
||
drawings = page.get_drawings()
|
||
if drawings:
|
||
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
|
||
|
||
# PyMuPDF's sort=True already provides good reading order for multi-column layouts
|
||
# (top-to-bottom, left-to-right within each row). We don't need to re-sort.
|
||
# NOTE: If sort=True is not used in get_text(), uncomment the line below:
|
||
# elements = self._sort_elements_for_reading_order(elements, dimensions)
|
||
|
||
# Post-process elements for header/footer detection and structure
|
||
elements = self._detect_headers_footers(elements, dimensions)
|
||
elements = self._build_section_hierarchy(elements)
|
||
elements = self._build_nested_lists(elements)
|
||
|
||
return Page(
|
||
page_number=page_num,
|
||
elements=elements,
|
||
dimensions=dimensions,
|
||
metadata={
|
||
"has_drawings": len(drawings) > 0,
|
||
"drawing_count": len(drawings),
|
||
"link_count": len(links)
|
||
}
|
||
)
|
||
|
||
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""
|
||
Sort elements by reading order, handling multi-column layouts.
|
||
|
||
For multi-column layouts (e.g., two-column documents), this ensures
|
||
elements are ordered correctly: top-to-bottom, then left-to-right
|
||
within each row.
|
||
|
||
Args:
|
||
elements: List of document elements
|
||
dimensions: Page dimensions
|
||
|
||
Returns:
|
||
Sorted list of elements in reading order
|
||
"""
|
||
if not elements:
|
||
return elements
|
||
|
||
# Detect if page has multi-column layout
|
||
text_elements = [e for e in elements if e.bbox and e.is_text]
|
||
if len(text_elements) < 3:
|
||
# Too few elements to determine layout, just sort by Y position
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
# Cluster x-positions to detect columns
|
||
x_positions = [e.bbox.x0 for e in text_elements]
|
||
columns = self._detect_columns(x_positions, dimensions.width)
|
||
|
||
if len(columns) <= 1:
|
||
# Single column layout - simple top-to-bottom sort
|
||
logger.debug(f"Detected single-column layout")
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}")
|
||
|
||
# Multi-column layout - use newspaper-style reading order
|
||
# (complete left column, then right column, etc.)
|
||
# This is more appropriate for technical documents and data sheets
|
||
element_data = []
|
||
for elem in elements:
|
||
if not elem.bbox:
|
||
element_data.append((elem, 0, 0))
|
||
continue
|
||
|
||
# Find which column this element belongs to
|
||
col_idx = 0
|
||
min_dist = float('inf')
|
||
for i, col_x in enumerate(columns):
|
||
dist = abs(elem.bbox.x0 - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
col_idx = i
|
||
|
||
element_data.append((elem, col_idx, elem.bbox.y0))
|
||
|
||
# Sort by: column first, then Y position within column
|
||
# This gives newspaper-style reading: complete column 1, then column 2, etc.
|
||
element_data.sort(key=lambda x: (x[1], x[2]))
|
||
|
||
logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)")
|
||
return [e[0] for e in element_data]
|
||
|
||
def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]:
|
||
"""
|
||
Detect column positions from x-coordinates of text elements.
|
||
|
||
Args:
|
||
x_positions: List of x-coordinates (left edges of text)
|
||
page_width: Page width in points
|
||
|
||
Returns:
|
||
List of column x-positions (sorted left to right)
|
||
"""
|
||
if not x_positions:
|
||
return []
|
||
|
||
# Cluster x-positions to find column starts
|
||
# Use k-means-like approach: find groups of x-positions
|
||
threshold = page_width * 0.15 # 15% of page width as clustering threshold
|
||
|
||
sorted_x = sorted(set(x_positions))
|
||
if not sorted_x:
|
||
return []
|
||
|
||
clusters = [[sorted_x[0]]]
|
||
|
||
for x in sorted_x[1:]:
|
||
# Check if x belongs to current cluster
|
||
cluster_center = sum(clusters[-1]) / len(clusters[-1])
|
||
if abs(x - cluster_center) < threshold:
|
||
clusters[-1].append(x)
|
||
else:
|
||
# Start new cluster
|
||
clusters.append([x])
|
||
|
||
# Return average x position of each cluster (column start)
|
||
column_positions = [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
# Filter out columns that are too close to each other
|
||
min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart
|
||
filtered_columns = [column_positions[0]]
|
||
for col_x in column_positions[1:]:
|
||
if col_x - filtered_columns[-1] >= min_column_width:
|
||
filtered_columns.append(col_x)
|
||
|
||
return filtered_columns
|
||
|
||
def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""Detect and mark header/footer elements based on page position"""
|
||
page_height = dimensions.height
|
||
header_threshold = page_height * 0.1 # Top 10% of page
|
||
footer_threshold = page_height * 0.9 # Bottom 10% of page
|
||
|
||
for elem in elements:
|
||
# Skip non-text elements
|
||
if not elem.is_text:
|
||
continue
|
||
|
||
# Check if element is in header region
|
||
if elem.bbox.y1 <= header_threshold:
|
||
# Only mark as header if it's short text
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.HEADER
|
||
elem.metadata['is_page_header'] = True
|
||
|
||
# Check if element is in footer region
|
||
elif elem.bbox.y0 >= footer_threshold:
|
||
# Short text in footer region
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.FOOTER
|
||
elem.metadata['is_page_footer'] = True
|
||
|
||
return elements
|
||
|
||
def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build hierarchical section structure based on font sizes"""
|
||
# Collect all headers with their font sizes
|
||
headers = []
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
# Get average font size from style
|
||
font_size = 12.0 # Default
|
||
if elem.style and elem.style.font_size:
|
||
font_size = elem.style.font_size
|
||
headers.append((elem, font_size))
|
||
|
||
if not headers:
|
||
return elements
|
||
|
||
# Sort headers by font size to determine hierarchy levels
|
||
font_sizes = sorted(set(size for _, size in headers), reverse=True)
|
||
size_to_level = {size: level for level, size in enumerate(font_sizes, 1)}
|
||
|
||
# Assign section levels to headers
|
||
for elem, font_size in headers:
|
||
level = size_to_level.get(font_size, 1)
|
||
elem.metadata['section_level'] = level
|
||
elem.metadata['font_size'] = font_size
|
||
|
||
# Build parent-child relationships between headers
|
||
header_stack = [] # Stack of (element, level)
|
||
for elem, font_size in headers:
|
||
level = elem.metadata['section_level']
|
||
|
||
# Pop headers that are at same or lower level (larger font)
|
||
while header_stack and header_stack[-1][1] >= level:
|
||
header_stack.pop()
|
||
|
||
# Set parent header
|
||
if header_stack:
|
||
parent = header_stack[-1][0]
|
||
elem.metadata['parent_section'] = parent.element_id
|
||
if 'child_sections' not in parent.metadata:
|
||
parent.metadata['child_sections'] = []
|
||
parent.metadata['child_sections'].append(elem.element_id)
|
||
|
||
header_stack.append((elem, level))
|
||
|
||
# Link content to nearest preceding header at same or higher level
|
||
current_header = None
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
current_header = elem
|
||
elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]:
|
||
elem.metadata['section_id'] = current_header.element_id
|
||
|
||
return elements
|
||
|
||
def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build nested list structure from flat list items"""
|
||
# Group list items
|
||
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
|
||
if not list_items:
|
||
return elements
|
||
|
||
# Sort by position (top to bottom)
|
||
list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0))
|
||
|
||
# Detect indentation levels based on x position
|
||
x_positions = [item.bbox.x0 for item in list_items]
|
||
if not x_positions:
|
||
return elements
|
||
|
||
min_x = min(x_positions)
|
||
indent_unit = 20 # Typical indent size in points
|
||
|
||
# Assign nesting levels
|
||
for item in list_items:
|
||
indent = item.bbox.x0 - min_x
|
||
level = int(indent / indent_unit)
|
||
item.metadata['list_level'] = level
|
||
|
||
# Build parent-child relationships
|
||
item_stack = [] # Stack of (element, level)
|
||
for item in list_items:
|
||
level = item.metadata.get('list_level', 0)
|
||
|
||
# Pop items at same or deeper level
|
||
while item_stack and item_stack[-1][1] >= level:
|
||
item_stack.pop()
|
||
|
||
# Set parent
|
||
if item_stack:
|
||
parent = item_stack[-1][0]
|
||
item.metadata['parent_item'] = parent.element_id
|
||
if 'children' not in parent.metadata:
|
||
parent.metadata['children'] = []
|
||
parent.metadata['children'].append(item.element_id)
|
||
# Also add to actual children list
|
||
parent.children.append(item)
|
||
|
||
item_stack.append((item, level))
|
||
|
||
return elements
|
||
|
||
def _process_text_block(self, block: Dict, page_num: int, counter: int) -> Optional[DocumentElement]:
|
||
"""Process a text block into a DocumentElement"""
|
||
# Calculate block bounding box
|
||
bbox_data = block.get("bbox", [0, 0, 0, 0])
|
||
bbox = BoundingBox(
|
||
x0=bbox_data[0],
|
||
y0=bbox_data[1],
|
||
x1=bbox_data[2],
|
||
y1=bbox_data[3]
|
||
)
|
||
|
||
# Extract text content and span information
|
||
text_parts = []
|
||
styles = []
|
||
span_children = [] # Store span-level children for inline styling
|
||
span_counter = 0
|
||
|
||
for line in block.get("lines", []):
|
||
for span in line.get("spans", []):
|
||
text = span.get("text", "")
|
||
if text:
|
||
text_parts.append(text)
|
||
|
||
# Extract style information
|
||
style = StyleInfo(
|
||
font_name=span.get("font"),
|
||
font_size=span.get("size"),
|
||
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
|
||
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
|
||
text_color=span.get("color")
|
||
)
|
||
styles.append(style)
|
||
|
||
# Create span child element for inline styling
|
||
span_bbox_data = span.get("bbox", bbox_data)
|
||
span_bbox = BoundingBox(
|
||
x0=span_bbox_data[0],
|
||
y0=span_bbox_data[1],
|
||
x1=span_bbox_data[2],
|
||
y1=span_bbox_data[3]
|
||
)
|
||
|
||
span_element = DocumentElement(
|
||
element_id=f"span_{page_num}_{counter}_{span_counter}",
|
||
type=ElementType.TEXT, # Spans are always text
|
||
content=text,
|
||
bbox=span_bbox,
|
||
style=style,
|
||
confidence=1.0,
|
||
metadata={"span_index": span_counter}
|
||
)
|
||
span_children.append(span_element)
|
||
span_counter += 1
|
||
|
||
if not text_parts:
|
||
return None
|
||
|
||
full_text = "".join(text_parts)
|
||
|
||
# Determine element type based on content and style
|
||
element_type = self._infer_element_type(full_text, styles)
|
||
|
||
# Use the most common style for the block
|
||
if styles:
|
||
block_style = styles[0] # Could be improved with style merging
|
||
else:
|
||
block_style = None
|
||
|
||
return DocumentElement(
|
||
element_id=f"text_{page_num}_{counter}",
|
||
type=element_type,
|
||
content=full_text,
|
||
bbox=bbox,
|
||
style=block_style,
|
||
confidence=1.0, # Direct extraction has perfect confidence
|
||
children=span_children # Store span children for inline styling
|
||
)
|
||
|
||
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
|
||
"""Infer element type based on text content and styling"""
|
||
text_lower = text.lower().strip()
|
||
|
||
# Check for common patterns
|
||
if len(text_lower) < 100 and styles:
|
||
# Short text with large font might be title/header
|
||
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
|
||
if avg_size > 16:
|
||
return ElementType.TITLE
|
||
elif avg_size > 14:
|
||
return ElementType.HEADER
|
||
|
||
# Check for list patterns
|
||
if re.match(r'^[\d•·▪▫◦‣⁃]\s', text_lower):
|
||
return ElementType.LIST_ITEM
|
||
|
||
# Check for page numbers
|
||
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
|
||
return ElementType.PAGE_NUMBER
|
||
|
||
# Check for footnote patterns
|
||
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
|
||
return ElementType.FOOTNOTE
|
||
|
||
# Default to paragraph for longer text, text for shorter
|
||
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
|
||
|
||
def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]:
|
||
"""Process a natively detected table"""
|
||
try:
|
||
# Extract table data
|
||
data = table.extract()
|
||
if not data or len(data) < self.min_table_rows:
|
||
return None
|
||
|
||
# Get table bounding box
|
||
bbox_data = table.bbox
|
||
bbox = BoundingBox(
|
||
x0=bbox_data[0],
|
||
y0=bbox_data[1],
|
||
x1=bbox_data[2],
|
||
y1=bbox_data[3]
|
||
)
|
||
|
||
# Create table cells
|
||
cells = []
|
||
for row_idx, row in enumerate(data):
|
||
for col_idx, cell_text in enumerate(row):
|
||
if cell_text:
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=str(cell_text) if cell_text else ""
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(data),
|
||
cols=max(len(row) for row in data) if data else 0,
|
||
cells=cells,
|
||
headers=data[0] if data else None # Assume first row is header
|
||
)
|
||
|
||
return DocumentElement(
|
||
element_id=f"table_{page_num}_{counter}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=1.0
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing native table: {e}")
|
||
return None
|
||
|
||
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
|
||
"""Detect tables by analyzing text positioning"""
|
||
tables = []
|
||
|
||
# Get all words with positions
|
||
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
|
||
|
||
if not words:
|
||
return tables
|
||
|
||
# Group words by approximate row (y-coordinate)
|
||
rows = {}
|
||
for word in words:
|
||
y = round(word[1] / 5) * 5 # Round to nearest 5 points
|
||
if y not in rows:
|
||
rows[y] = []
|
||
rows[y].append({
|
||
'x0': word[0],
|
||
'y0': word[1],
|
||
'x1': word[2],
|
||
'y1': word[3],
|
||
'text': word[4],
|
||
'block': word[5] if len(word) > 5 else 0
|
||
})
|
||
|
||
# Sort rows by y-coordinate
|
||
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
|
||
|
||
# Find potential tables (consecutive rows with multiple columns)
|
||
current_table_rows = []
|
||
tables_found = []
|
||
|
||
for y, words_in_row in sorted_rows:
|
||
words_in_row.sort(key=lambda w: w['x0'])
|
||
|
||
if len(words_in_row) >= self.min_table_cols:
|
||
# Check if this could be a table row
|
||
x_positions = [w['x0'] for w in words_in_row]
|
||
|
||
# Check for somewhat regular spacing
|
||
if self._has_regular_spacing(x_positions):
|
||
current_table_rows.append((y, words_in_row))
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
|
||
# Don't forget the last table
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
|
||
# Convert detected tables to DocumentElements
|
||
for table_idx, table_rows in enumerate(tables_found):
|
||
if not table_rows:
|
||
continue
|
||
|
||
# Calculate table bounding box
|
||
all_words = []
|
||
for _, words in table_rows:
|
||
all_words.extend(words)
|
||
|
||
min_x = min(w['x0'] for w in all_words)
|
||
min_y = min(w['y0'] for w in all_words)
|
||
max_x = max(w['x1'] for w in all_words)
|
||
max_y = max(w['y1'] for w in all_words)
|
||
|
||
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
|
||
|
||
# Create table cells
|
||
cells = []
|
||
for row_idx, (y, words) in enumerate(table_rows):
|
||
# Group words into columns
|
||
columns = self._group_into_columns(words, table_rows)
|
||
for col_idx, col_text in enumerate(columns):
|
||
if col_text:
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=col_text
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(table_rows),
|
||
cols=max(len(self._group_into_columns(words, table_rows))
|
||
for _, words in table_rows),
|
||
cells=cells
|
||
)
|
||
|
||
element = DocumentElement(
|
||
element_id=f"table_{page_num}_{counter + table_idx}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=0.8, # Lower confidence for positional detection
|
||
metadata={"detection_method": "positional"}
|
||
)
|
||
tables.append(element)
|
||
|
||
return tables
|
||
|
||
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
|
||
"""Check if x positions have somewhat regular spacing"""
|
||
if len(x_positions) < 3:
|
||
return False
|
||
|
||
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
|
||
avg_spacing = sum(spacings) / len(spacings)
|
||
|
||
# Check if spacings are within tolerance of average
|
||
for spacing in spacings:
|
||
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
|
||
"""Group words into columns based on x-position"""
|
||
if not words:
|
||
return []
|
||
|
||
# Find common column positions across all rows
|
||
all_x_positions = []
|
||
for _, row_words in all_rows:
|
||
all_x_positions.extend([w['x0'] for w in row_words])
|
||
|
||
# Cluster x-positions to find columns
|
||
column_positions = self._cluster_positions(all_x_positions)
|
||
|
||
# Assign words to columns
|
||
columns = [""] * len(column_positions)
|
||
for word in words:
|
||
# Find closest column
|
||
closest_col = 0
|
||
min_dist = float('inf')
|
||
for col_idx, col_x in enumerate(column_positions):
|
||
dist = abs(word['x0'] - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
closest_col = col_idx
|
||
|
||
if columns[closest_col]:
|
||
columns[closest_col] += " " + word['text']
|
||
else:
|
||
columns[closest_col] = word['text']
|
||
|
||
return columns
|
||
|
||
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
|
||
"""Cluster positions to find common columns"""
|
||
if not positions:
|
||
return []
|
||
|
||
sorted_pos = sorted(positions)
|
||
clusters = [[sorted_pos[0]]]
|
||
|
||
for pos in sorted_pos[1:]:
|
||
# Check if position belongs to current cluster
|
||
if pos - clusters[-1][-1] < threshold:
|
||
clusters[-1].append(pos)
|
||
else:
|
||
clusters.append([pos])
|
||
|
||
# Return average position of each cluster
|
||
return [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
def _extract_images(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path]) -> List[DocumentElement]:
|
||
"""Extract images from page"""
|
||
elements = []
|
||
image_list = page.get_images()
|
||
|
||
for img_idx, img in enumerate(image_list):
|
||
try:
|
||
xref = img[0]
|
||
|
||
# Get image position(s)
|
||
img_rects = page.get_image_rects(xref)
|
||
if not img_rects:
|
||
continue
|
||
|
||
rect = img_rects[0] # Use first occurrence
|
||
bbox = BoundingBox(
|
||
x0=rect.x0,
|
||
y0=rect.y0,
|
||
x1=rect.x1,
|
||
y1=rect.y1
|
||
)
|
||
|
||
# Extract image data
|
||
pix = fitz.Pixmap(page.parent, xref)
|
||
image_data = {
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"xref": xref
|
||
}
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
|
||
image_path = output_dir / image_filename
|
||
pix.save(str(image_path))
|
||
image_data["saved_path"] = str(image_path)
|
||
logger.debug(f"Saved image to {image_path}")
|
||
|
||
element = DocumentElement(
|
||
element_id=f"image_{page_num}_{counter + img_idx}",
|
||
type=ElementType.IMAGE,
|
||
content=image_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata={
|
||
"image_index": img_idx,
|
||
"xref": xref
|
||
}
|
||
)
|
||
elements.append(element)
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting image {img_idx}: {e}")
|
||
|
||
return elements |