- Force Office documents (PPTX, DOCX, XLSX) to use Direct track after LibreOffice conversion, since converted PDFs always have extractable text - Fix PDF generator to not exclude text in image regions for Direct track, allowing text to render on top of background images (critical for PPT) - Increase file_type column from VARCHAR(50) to VARCHAR(100) to support long MIME types like PPTX - Remove reference to non-existent total_images metadata attribute This significantly improves processing time for Office documents (from ~170s OCR to ~10s Direct) while preserving text quality. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1791 lines
70 KiB
Python
1791 lines
70 KiB
Python
"""
|
||
Direct Extraction Engine using PyMuPDF
|
||
|
||
Handles direct text and structure extraction from editable PDFs without OCR.
|
||
This provides much faster processing and perfect accuracy for documents with
|
||
extractable text.
|
||
"""
|
||
|
||
import os
|
||
import logging
|
||
import fitz # PyMuPDF
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||
from datetime import datetime
|
||
import re
|
||
|
||
from ..models.unified_document import (
|
||
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
|
||
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
|
||
ElementType, ProcessingTrack
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class DirectExtractionEngine:
|
||
"""
|
||
Engine for direct text extraction from editable PDFs using PyMuPDF.
|
||
|
||
This engine provides:
|
||
- Fast text extraction with exact positioning
|
||
- Font and style information preservation
|
||
- Table structure detection
|
||
- Image extraction with coordinates
|
||
- Hyperlink and annotation extraction
|
||
"""
|
||
|
||
def __init__(self,
|
||
enable_table_detection: bool = True,
|
||
enable_image_extraction: bool = True,
|
||
min_table_rows: int = 2,
|
||
min_table_cols: int = 2):
|
||
"""
|
||
Initialize the extraction engine.
|
||
|
||
Args:
|
||
enable_table_detection: Whether to detect and extract tables
|
||
enable_image_extraction: Whether to extract images
|
||
min_table_rows: Minimum rows for table detection
|
||
min_table_cols: Minimum columns for table detection
|
||
"""
|
||
self.enable_table_detection = enable_table_detection
|
||
self.enable_image_extraction = enable_image_extraction
|
||
self.min_table_rows = min_table_rows
|
||
self.min_table_cols = min_table_cols
|
||
|
||
def extract(self,
|
||
file_path: Path,
|
||
output_dir: Optional[Path] = None) -> UnifiedDocument:
|
||
"""
|
||
Extract content from PDF file to UnifiedDocument format.
|
||
|
||
Args:
|
||
file_path: Path to PDF file
|
||
output_dir: Optional directory to save extracted images.
|
||
If not provided, creates a temporary directory in storage/results/{document_id}/
|
||
|
||
Returns:
|
||
UnifiedDocument with extracted content
|
||
"""
|
||
start_time = datetime.now()
|
||
document_id = str(uuid.uuid4())[:8] # Short ID for cleaner paths
|
||
|
||
try:
|
||
doc = fitz.open(str(file_path))
|
||
|
||
# If no output_dir provided, create default directory for image extraction
|
||
if output_dir is None and self.enable_image_extraction:
|
||
# Create temporary directory in storage/results
|
||
default_output_dir = Path("storage/results") / document_id
|
||
default_output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_dir = default_output_dir
|
||
logger.debug(f"Created default output directory: {output_dir}")
|
||
|
||
# Extract document metadata
|
||
metadata = self._extract_metadata(file_path, doc, start_time)
|
||
|
||
# Extract pages
|
||
pages = []
|
||
for page_num in range(len(doc)):
|
||
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
|
||
page = self._extract_page(
|
||
doc[page_num],
|
||
page_num + 1,
|
||
document_id,
|
||
output_dir
|
||
)
|
||
pages.append(page)
|
||
|
||
doc.close()
|
||
|
||
# Calculate processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
metadata.processing_time = processing_time
|
||
|
||
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during direct extraction: {e}")
|
||
# Return partial result with error information
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
if 'metadata' not in locals():
|
||
metadata = DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size if file_path.exists() else 0,
|
||
created_at=datetime.now(),
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=processing_time
|
||
)
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages if 'pages' in locals() else [],
|
||
processing_errors=[{
|
||
"error": str(e),
|
||
"type": type(e).__name__
|
||
}]
|
||
)
|
||
|
||
def _extract_metadata(self,
|
||
file_path: Path,
|
||
doc: fitz.Document,
|
||
start_time: datetime) -> DocumentMetadata:
|
||
"""Extract document metadata"""
|
||
pdf_metadata = doc.metadata
|
||
|
||
return DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size,
|
||
created_at=start_time,
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=0.0, # Will be updated later
|
||
title=pdf_metadata.get("title"),
|
||
author=pdf_metadata.get("author"),
|
||
subject=pdf_metadata.get("subject"),
|
||
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
|
||
producer=pdf_metadata.get("producer"),
|
||
creator=pdf_metadata.get("creator"),
|
||
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
|
||
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
|
||
)
|
||
|
||
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
|
||
"""Parse PDF date string to datetime"""
|
||
if not date_str:
|
||
return None
|
||
|
||
try:
|
||
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
|
||
# Example: D:20240101120000+09'00
|
||
if date_str.startswith("D:"):
|
||
date_str = date_str[2:]
|
||
|
||
# Extract just the date/time part (first 14 characters)
|
||
if len(date_str) >= 14:
|
||
date_part = date_str[:14]
|
||
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
|
||
except:
|
||
pass
|
||
|
||
return None
|
||
|
||
def _extract_page(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
output_dir: Optional[Path]) -> Page:
|
||
"""Extract content from a single page"""
|
||
elements = []
|
||
element_counter = 0
|
||
|
||
# Get page-level metadata (for final Page metadata)
|
||
drawings = page.get_drawings()
|
||
links = page.get_links()
|
||
|
||
# Get page dimensions
|
||
rect = page.rect
|
||
dimensions = Dimensions(
|
||
width=rect.width,
|
||
height=rect.height,
|
||
dpi=72 # PDF standard DPI
|
||
)
|
||
|
||
# Extract tables first (if enabled) to get table regions
|
||
table_bboxes = []
|
||
if self.enable_table_detection:
|
||
try:
|
||
# Try native table detection (PyMuPDF 1.23.0+)
|
||
tables = page.find_tables()
|
||
for table_idx, table in enumerate(tables):
|
||
element = self._process_native_table(
|
||
table, page_num, element_counter
|
||
)
|
||
if element and element.bbox:
|
||
elements.append(element)
|
||
table_bboxes.append(element.bbox)
|
||
element_counter += 1
|
||
except AttributeError:
|
||
# Fallback to positional table detection
|
||
logger.debug("Native table detection not available, using positional detection")
|
||
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
|
||
for elem in table_elements:
|
||
if elem.bbox:
|
||
table_bboxes.append(elem.bbox)
|
||
elements.extend(table_elements)
|
||
element_counter += len(table_elements)
|
||
|
||
# Extract text blocks with formatting (sort=True for reading order)
|
||
# Filter out lines that overlap with table regions
|
||
text_dict = page.get_text("dict", sort=True)
|
||
for block_idx, block in enumerate(text_dict.get("blocks", [])):
|
||
if block.get("type") == 0: # Text block
|
||
element = self._process_text_block(
|
||
block, page_num, element_counter, table_bboxes
|
||
)
|
||
if element:
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# Extract images (if enabled)
|
||
if self.enable_image_extraction:
|
||
image_elements = self._extract_images(
|
||
page, page_num, document_id, element_counter, output_dir
|
||
)
|
||
elements.extend(image_elements)
|
||
element_counter += len(image_elements)
|
||
|
||
# Extract vector graphics (charts, diagrams) from drawing commands
|
||
# Pass table_bboxes to filter out table border drawings before clustering
|
||
if self.enable_image_extraction:
|
||
vector_elements = self._extract_vector_graphics(
|
||
page, page_num, document_id, element_counter, output_dir,
|
||
table_bboxes=table_bboxes
|
||
)
|
||
elements.extend(vector_elements)
|
||
element_counter += len(vector_elements)
|
||
|
||
# Extract hyperlinks
|
||
links = page.get_links()
|
||
for link_idx, link in enumerate(links):
|
||
# Create link annotation element if it has URI
|
||
if link.get("uri"):
|
||
from_rect = link.get("from")
|
||
if from_rect:
|
||
element = DocumentElement(
|
||
element_id=f"link_{page_num}_{element_counter}",
|
||
type=ElementType.REFERENCE,
|
||
content={"uri": link["uri"], "type": "hyperlink"},
|
||
bbox=BoundingBox(
|
||
x0=from_rect.x0,
|
||
y0=from_rect.y0,
|
||
x1=from_rect.x1,
|
||
y1=from_rect.y1
|
||
),
|
||
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
|
||
)
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# PyMuPDF's sort=True already provides good reading order for multi-column layouts
|
||
# (top-to-bottom, left-to-right within each row). We don't need to re-sort.
|
||
# NOTE: If sort=True is not used in get_text(), uncomment the line below:
|
||
# elements = self._sort_elements_for_reading_order(elements, dimensions)
|
||
|
||
# Deduplicate: Remove CHART elements that overlap with TABLE elements
|
||
# (Tables have structured data, so they take priority over vector graphics)
|
||
elements = self._deduplicate_table_chart_overlap(elements)
|
||
|
||
# Post-process elements for header/footer detection and structure
|
||
elements = self._detect_headers_footers(elements, dimensions)
|
||
elements = self._build_section_hierarchy(elements)
|
||
elements = self._build_nested_lists(elements)
|
||
|
||
return Page(
|
||
page_number=page_num,
|
||
elements=elements,
|
||
dimensions=dimensions,
|
||
metadata={
|
||
"has_drawings": len(drawings) > 0,
|
||
"drawing_count": len(drawings),
|
||
"link_count": len(links)
|
||
}
|
||
)
|
||
|
||
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""
|
||
Sort elements by reading order, handling multi-column layouts.
|
||
|
||
For multi-column layouts (e.g., two-column documents), this ensures
|
||
elements are ordered correctly: top-to-bottom, then left-to-right
|
||
within each row.
|
||
|
||
Args:
|
||
elements: List of document elements
|
||
dimensions: Page dimensions
|
||
|
||
Returns:
|
||
Sorted list of elements in reading order
|
||
"""
|
||
if not elements:
|
||
return elements
|
||
|
||
# Detect if page has multi-column layout
|
||
text_elements = [e for e in elements if e.bbox and e.is_text]
|
||
if len(text_elements) < 3:
|
||
# Too few elements to determine layout, just sort by Y position
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
# Cluster x-positions to detect columns
|
||
x_positions = [e.bbox.x0 for e in text_elements]
|
||
columns = self._detect_columns(x_positions, dimensions.width)
|
||
|
||
if len(columns) <= 1:
|
||
# Single column layout - simple top-to-bottom sort
|
||
logger.debug(f"Detected single-column layout")
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}")
|
||
|
||
# Multi-column layout - use newspaper-style reading order
|
||
# (complete left column, then right column, etc.)
|
||
# This is more appropriate for technical documents and data sheets
|
||
element_data = []
|
||
for elem in elements:
|
||
if not elem.bbox:
|
||
element_data.append((elem, 0, 0))
|
||
continue
|
||
|
||
# Find which column this element belongs to
|
||
col_idx = 0
|
||
min_dist = float('inf')
|
||
for i, col_x in enumerate(columns):
|
||
dist = abs(elem.bbox.x0 - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
col_idx = i
|
||
|
||
element_data.append((elem, col_idx, elem.bbox.y0))
|
||
|
||
# Sort by: column first, then Y position within column
|
||
# This gives newspaper-style reading: complete column 1, then column 2, etc.
|
||
element_data.sort(key=lambda x: (x[1], x[2]))
|
||
|
||
logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)")
|
||
return [e[0] for e in element_data]
|
||
|
||
def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]:
|
||
"""
|
||
Detect column positions from x-coordinates of text elements.
|
||
|
||
Args:
|
||
x_positions: List of x-coordinates (left edges of text)
|
||
page_width: Page width in points
|
||
|
||
Returns:
|
||
List of column x-positions (sorted left to right)
|
||
"""
|
||
if not x_positions:
|
||
return []
|
||
|
||
# Cluster x-positions to find column starts
|
||
# Use k-means-like approach: find groups of x-positions
|
||
threshold = page_width * 0.15 # 15% of page width as clustering threshold
|
||
|
||
sorted_x = sorted(set(x_positions))
|
||
if not sorted_x:
|
||
return []
|
||
|
||
clusters = [[sorted_x[0]]]
|
||
|
||
for x in sorted_x[1:]:
|
||
# Check if x belongs to current cluster
|
||
cluster_center = sum(clusters[-1]) / len(clusters[-1])
|
||
if abs(x - cluster_center) < threshold:
|
||
clusters[-1].append(x)
|
||
else:
|
||
# Start new cluster
|
||
clusters.append([x])
|
||
|
||
# Return average x position of each cluster (column start)
|
||
column_positions = [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
# Filter out columns that are too close to each other
|
||
min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart
|
||
filtered_columns = [column_positions[0]]
|
||
for col_x in column_positions[1:]:
|
||
if col_x - filtered_columns[-1] >= min_column_width:
|
||
filtered_columns.append(col_x)
|
||
|
||
return filtered_columns
|
||
|
||
def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""Detect and mark header/footer elements based on page position"""
|
||
page_height = dimensions.height
|
||
header_threshold = page_height * 0.1 # Top 10% of page
|
||
footer_threshold = page_height * 0.9 # Bottom 10% of page
|
||
|
||
for elem in elements:
|
||
# Skip non-text elements
|
||
if not elem.is_text:
|
||
continue
|
||
|
||
# Check if element is in header region
|
||
if elem.bbox.y1 <= header_threshold:
|
||
# Only mark as header if it's short text
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.HEADER
|
||
elem.metadata['is_page_header'] = True
|
||
|
||
# Check if element is in footer region
|
||
elif elem.bbox.y0 >= footer_threshold:
|
||
# Short text in footer region
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.FOOTER
|
||
elem.metadata['is_page_footer'] = True
|
||
|
||
return elements
|
||
|
||
def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build hierarchical section structure based on font sizes"""
|
||
# Collect all headers with their font sizes
|
||
headers = []
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
# Get average font size from style
|
||
font_size = 12.0 # Default
|
||
if elem.style and elem.style.font_size:
|
||
font_size = elem.style.font_size
|
||
headers.append((elem, font_size))
|
||
|
||
if not headers:
|
||
return elements
|
||
|
||
# Sort headers by font size to determine hierarchy levels
|
||
font_sizes = sorted(set(size for _, size in headers), reverse=True)
|
||
size_to_level = {size: level for level, size in enumerate(font_sizes, 1)}
|
||
|
||
# Assign section levels to headers
|
||
for elem, font_size in headers:
|
||
level = size_to_level.get(font_size, 1)
|
||
elem.metadata['section_level'] = level
|
||
elem.metadata['font_size'] = font_size
|
||
|
||
# Build parent-child relationships between headers
|
||
header_stack = [] # Stack of (element, level)
|
||
for elem, font_size in headers:
|
||
level = elem.metadata['section_level']
|
||
|
||
# Pop headers that are at same or lower level (larger font)
|
||
while header_stack and header_stack[-1][1] >= level:
|
||
header_stack.pop()
|
||
|
||
# Set parent header
|
||
if header_stack:
|
||
parent = header_stack[-1][0]
|
||
elem.metadata['parent_section'] = parent.element_id
|
||
if 'child_sections' not in parent.metadata:
|
||
parent.metadata['child_sections'] = []
|
||
parent.metadata['child_sections'].append(elem.element_id)
|
||
|
||
header_stack.append((elem, level))
|
||
|
||
# Link content to nearest preceding header at same or higher level
|
||
current_header = None
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
current_header = elem
|
||
elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]:
|
||
elem.metadata['section_id'] = current_header.element_id
|
||
|
||
return elements
|
||
|
||
def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build nested list structure from flat list items"""
|
||
# Group list items
|
||
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
|
||
if not list_items:
|
||
return elements
|
||
|
||
# Sort by position (top to bottom)
|
||
list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0))
|
||
|
||
# Detect indentation levels based on x position
|
||
x_positions = [item.bbox.x0 for item in list_items]
|
||
if not x_positions:
|
||
return elements
|
||
|
||
min_x = min(x_positions)
|
||
indent_unit = 20 # Typical indent size in points
|
||
|
||
# Assign nesting levels
|
||
for item in list_items:
|
||
indent = item.bbox.x0 - min_x
|
||
level = int(indent / indent_unit)
|
||
item.metadata['list_level'] = level
|
||
|
||
# Build parent-child relationships
|
||
item_stack = [] # Stack of (element, level)
|
||
for item in list_items:
|
||
level = item.metadata.get('list_level', 0)
|
||
|
||
# Pop items at same or deeper level
|
||
while item_stack and item_stack[-1][1] >= level:
|
||
item_stack.pop()
|
||
|
||
# Set parent
|
||
if item_stack:
|
||
parent = item_stack[-1][0]
|
||
item.metadata['parent_item'] = parent.element_id
|
||
if 'children' not in parent.metadata:
|
||
parent.metadata['children'] = []
|
||
parent.metadata['children'].append(item.element_id)
|
||
# Also add to actual children list
|
||
parent.children.append(item)
|
||
|
||
item_stack.append((item, level))
|
||
|
||
return elements
|
||
|
||
def _process_text_block(self, block: Dict, page_num: int, counter: int,
|
||
table_bboxes: List[BoundingBox] = None) -> Optional[DocumentElement]:
|
||
"""
|
||
Process a text block into a DocumentElement.
|
||
|
||
Args:
|
||
block: Text block from PyMuPDF
|
||
page_num: Page number
|
||
counter: Element counter
|
||
table_bboxes: List of table bounding boxes to filter overlapping lines
|
||
|
||
Returns:
|
||
DocumentElement or None if all lines overlap with tables
|
||
"""
|
||
if table_bboxes is None:
|
||
table_bboxes = []
|
||
|
||
# Extract text content and span information
|
||
# Filter out lines that significantly overlap with table regions
|
||
text_parts = []
|
||
styles = []
|
||
span_children = [] # Store span-level children for inline styling
|
||
span_counter = 0
|
||
valid_line_bboxes = [] # Track bboxes of valid lines for overall bbox calculation
|
||
|
||
for line in block.get("lines", []):
|
||
line_bbox_data = line.get("bbox", [0, 0, 0, 0])
|
||
|
||
# Check if this line overlaps with any table region
|
||
line_overlaps_table = False
|
||
for table_bbox in table_bboxes:
|
||
overlap_x0 = max(line_bbox_data[0], table_bbox.x0)
|
||
overlap_y0 = max(line_bbox_data[1], table_bbox.y0)
|
||
overlap_x1 = min(line_bbox_data[2], table_bbox.x1)
|
||
overlap_y1 = min(line_bbox_data[3], table_bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
# Calculate overlap ratio
|
||
line_height = line_bbox_data[3] - line_bbox_data[1]
|
||
overlap_height = overlap_y1 - overlap_y0
|
||
if line_height > 0:
|
||
overlap_ratio = overlap_height / line_height
|
||
if overlap_ratio >= 0.5: # Line significantly overlaps with table
|
||
line_overlaps_table = True
|
||
break
|
||
|
||
if line_overlaps_table:
|
||
continue # Skip this line
|
||
|
||
# Process valid line
|
||
valid_line_bboxes.append(line_bbox_data)
|
||
|
||
for span in line.get("spans", []):
|
||
text = span.get("text", "")
|
||
if text:
|
||
text_parts.append(text)
|
||
|
||
# Extract style information
|
||
style = StyleInfo(
|
||
font_name=span.get("font"),
|
||
font_size=span.get("size"),
|
||
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
|
||
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
|
||
text_color=span.get("color")
|
||
)
|
||
styles.append(style)
|
||
|
||
# Create span child element for inline styling
|
||
span_bbox_data = span.get("bbox", [0, 0, 0, 0])
|
||
span_bbox = BoundingBox(
|
||
x0=span_bbox_data[0],
|
||
y0=span_bbox_data[1],
|
||
x1=span_bbox_data[2],
|
||
y1=span_bbox_data[3]
|
||
)
|
||
|
||
span_element = DocumentElement(
|
||
element_id=f"span_{page_num}_{counter}_{span_counter}",
|
||
type=ElementType.TEXT, # Spans are always text
|
||
content=text,
|
||
bbox=span_bbox,
|
||
style=style,
|
||
confidence=1.0,
|
||
metadata={"span_index": span_counter}
|
||
)
|
||
span_children.append(span_element)
|
||
span_counter += 1
|
||
|
||
if not text_parts:
|
||
return None # All lines overlapped with tables
|
||
|
||
full_text = "".join(text_parts)
|
||
|
||
# Calculate bbox from valid lines only
|
||
if valid_line_bboxes:
|
||
min_x0 = min(b[0] for b in valid_line_bboxes)
|
||
min_y0 = min(b[1] for b in valid_line_bboxes)
|
||
max_x1 = max(b[2] for b in valid_line_bboxes)
|
||
max_y1 = max(b[3] for b in valid_line_bboxes)
|
||
bbox = BoundingBox(x0=min_x0, y0=min_y0, x1=max_x1, y1=max_y1)
|
||
else:
|
||
# Fallback to original bbox if no valid lines found
|
||
bbox_data = block.get("bbox", [0, 0, 0, 0])
|
||
bbox = BoundingBox(x0=bbox_data[0], y0=bbox_data[1], x1=bbox_data[2], y1=bbox_data[3])
|
||
|
||
# Determine element type based on content and style
|
||
element_type = self._infer_element_type(full_text, styles)
|
||
|
||
# Use the most common style for the block
|
||
if styles:
|
||
block_style = styles[0] # Could be improved with style merging
|
||
else:
|
||
block_style = None
|
||
|
||
return DocumentElement(
|
||
element_id=f"text_{page_num}_{counter}",
|
||
type=element_type,
|
||
content=full_text,
|
||
bbox=bbox,
|
||
style=block_style,
|
||
confidence=1.0, # Direct extraction has perfect confidence
|
||
children=span_children # Store span children for inline styling
|
||
)
|
||
|
||
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
|
||
"""Infer element type based on text content and styling"""
|
||
text_lower = text.lower().strip()
|
||
|
||
# Check for common patterns
|
||
if len(text_lower) < 100 and styles:
|
||
# Short text with large font might be title/header
|
||
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
|
||
if avg_size > 16:
|
||
return ElementType.TITLE
|
||
elif avg_size > 14:
|
||
return ElementType.HEADER
|
||
|
||
# Check for list patterns
|
||
if re.match(r'^[\d•·▪▫◦‣⁃]\s', text_lower):
|
||
return ElementType.LIST_ITEM
|
||
|
||
# Check for page numbers
|
||
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
|
||
return ElementType.PAGE_NUMBER
|
||
|
||
# Check for footnote patterns
|
||
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
|
||
return ElementType.FOOTNOTE
|
||
|
||
# Default to paragraph for longer text, text for shorter
|
||
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
|
||
|
||
def _is_likely_chart(self, data: list, table) -> bool:
|
||
"""
|
||
Detect if a "table" detected by find_tables() is actually a chart/graph.
|
||
|
||
Charts often get misclassified as tables because they have grid lines.
|
||
Characteristics of a chart misclassified as table:
|
||
1. High percentage of empty cells (>60%)
|
||
2. Content patterns that look like axis labels (numbers, units like °C, %, etc.)
|
||
3. Single cell contains multi-line text with chart-like patterns
|
||
4. Cell content contains typical chart axis patterns
|
||
|
||
Args:
|
||
data: Extracted table data (list of lists)
|
||
table: PyMuPDF table object
|
||
|
||
Returns:
|
||
True if the table is likely a chart
|
||
"""
|
||
if not data:
|
||
return False
|
||
|
||
# Count total cells and empty cells
|
||
total_cells = 0
|
||
empty_cells = 0
|
||
multi_line_cells = 0
|
||
axis_pattern_cells = 0
|
||
|
||
# Patterns that suggest chart axis labels
|
||
import re
|
||
axis_patterns = [
|
||
r'^-?\d+$', # Simple numbers (axis ticks)
|
||
r'^-?\d+\.?\d*$', # Decimal numbers
|
||
r'°[CF]', # Temperature units
|
||
r'%$', # Percentage
|
||
r'\bppm\b', # Parts per million
|
||
r'\bmin\b', # Minutes
|
||
r'\bsec\b', # Seconds
|
||
r'\bTime\b', # Time axis label
|
||
r'\bTemperature\b', # Temperature axis label
|
||
r'[Aa]xis', # Axis label
|
||
]
|
||
|
||
for row in data:
|
||
for cell in row:
|
||
total_cells += 1
|
||
cell_text = str(cell).strip() if cell else ""
|
||
|
||
if not cell_text:
|
||
empty_cells += 1
|
||
else:
|
||
# Check for multi-line content
|
||
if '\n' in cell_text:
|
||
multi_line_cells += 1
|
||
|
||
# Check for axis patterns
|
||
for pattern in axis_patterns:
|
||
if re.search(pattern, cell_text, re.IGNORECASE):
|
||
axis_pattern_cells += 1
|
||
break
|
||
|
||
# Calculate metrics
|
||
empty_ratio = empty_cells / total_cells if total_cells > 0 else 0
|
||
|
||
# Decision criteria for chart detection:
|
||
# 1. Very high empty cell ratio (>70%) suggests it's a chart grid
|
||
if empty_ratio > 0.7:
|
||
logger.debug(f"Chart detection: high empty ratio {empty_ratio:.2f} (>70%)")
|
||
return True
|
||
|
||
# 2. High empty ratio + axis patterns suggests chart
|
||
if empty_ratio > 0.5 and axis_pattern_cells >= 3:
|
||
logger.debug(f"Chart detection: empty ratio {empty_ratio:.2f} + {axis_pattern_cells} axis patterns")
|
||
return True
|
||
|
||
# 3. Multi-line cell with axis patterns in first cell (often chart legend text)
|
||
if multi_line_cells >= 1 and axis_pattern_cells >= 2:
|
||
first_cell = str(data[0][0]).strip() if data and data[0] else ""
|
||
if '\n' in first_cell and len(first_cell.split('\n')) >= 5:
|
||
logger.debug(f"Chart detection: first cell has {len(first_cell.split(chr(10)))} lines with axis patterns")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]:
|
||
"""Process a natively detected table"""
|
||
try:
|
||
# Extract table data
|
||
data = table.extract()
|
||
if not data or len(data) < self.min_table_rows:
|
||
return None
|
||
|
||
# Check if this "table" is actually a chart (misclassified by find_tables)
|
||
if self._is_likely_chart(data, table):
|
||
logger.info(f"Skipping table_{page_num}_{counter} - detected as chart (not table)")
|
||
return None
|
||
|
||
# Get table bounding box
|
||
bbox_data = table.bbox
|
||
bbox = BoundingBox(
|
||
x0=bbox_data[0],
|
||
y0=bbox_data[1],
|
||
x1=bbox_data[2],
|
||
y1=bbox_data[3]
|
||
)
|
||
|
||
# Extract column widths from table cells by analyzing X boundaries
|
||
column_widths = []
|
||
if hasattr(table, 'cells') and table.cells:
|
||
# Collect all unique X boundaries (both left and right edges)
|
||
x_boundaries = set()
|
||
for cell in table.cells:
|
||
x_boundaries.add(round(cell[0], 1)) # x0 (left edge)
|
||
x_boundaries.add(round(cell[2], 1)) # x1 (right edge)
|
||
|
||
# Sort boundaries to get column edges
|
||
sorted_x = sorted(x_boundaries)
|
||
|
||
# Calculate column widths from adjacent boundaries
|
||
if len(sorted_x) >= 2:
|
||
column_widths = [sorted_x[i+1] - sorted_x[i] for i in range(len(sorted_x)-1)]
|
||
logger.debug(f"Calculated column widths from {len(sorted_x)} boundaries: {column_widths}")
|
||
|
||
# Extract row heights from table cells by analyzing Y boundaries
|
||
row_heights = []
|
||
if hasattr(table, 'cells') and table.cells:
|
||
# Collect all unique Y boundaries (both top and bottom edges)
|
||
y_boundaries = set()
|
||
for cell in table.cells:
|
||
y_boundaries.add(round(cell[1], 1)) # y0 (top edge)
|
||
y_boundaries.add(round(cell[3], 1)) # y1 (bottom edge)
|
||
|
||
# Sort boundaries to get row edges
|
||
sorted_y = sorted(y_boundaries)
|
||
|
||
# Calculate row heights from adjacent boundaries
|
||
if len(sorted_y) >= 2:
|
||
row_heights = [sorted_y[i+1] - sorted_y[i] for i in range(len(sorted_y)-1)]
|
||
logger.debug(f"Calculated row heights from {len(sorted_y)} boundaries: {row_heights}")
|
||
|
||
# Create table cells
|
||
# Note: Include ALL cells (even empty ones) to preserve table structure
|
||
# This is critical for correct HTML generation and PDF rendering
|
||
cells = []
|
||
for row_idx, row in enumerate(data):
|
||
for col_idx, cell_text in enumerate(row):
|
||
# Always add cell, even if empty, to maintain table structure
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=str(cell_text) if cell_text else ""
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(data),
|
||
cols=max(len(row) for row in data) if data else 0,
|
||
cells=cells,
|
||
headers=data[0] if data else None # Assume first row is header
|
||
)
|
||
|
||
# Store column widths and row heights in metadata
|
||
metadata = {}
|
||
if column_widths:
|
||
metadata["column_widths"] = column_widths
|
||
if row_heights:
|
||
metadata["row_heights"] = row_heights
|
||
metadata = metadata if metadata else None
|
||
|
||
return DocumentElement(
|
||
element_id=f"table_{page_num}_{counter}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata=metadata
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing native table: {e}")
|
||
return None
|
||
|
||
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
|
||
"""Detect tables by analyzing text positioning"""
|
||
tables = []
|
||
|
||
# Get all words with positions
|
||
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
|
||
|
||
if not words:
|
||
return tables
|
||
|
||
# Group words by approximate row (y-coordinate)
|
||
rows = {}
|
||
for word in words:
|
||
y = round(word[1] / 5) * 5 # Round to nearest 5 points
|
||
if y not in rows:
|
||
rows[y] = []
|
||
rows[y].append({
|
||
'x0': word[0],
|
||
'y0': word[1],
|
||
'x1': word[2],
|
||
'y1': word[3],
|
||
'text': word[4],
|
||
'block': word[5] if len(word) > 5 else 0
|
||
})
|
||
|
||
# Sort rows by y-coordinate
|
||
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
|
||
|
||
# Find potential tables (consecutive rows with multiple columns)
|
||
current_table_rows = []
|
||
tables_found = []
|
||
|
||
for y, words_in_row in sorted_rows:
|
||
words_in_row.sort(key=lambda w: w['x0'])
|
||
|
||
if len(words_in_row) >= self.min_table_cols:
|
||
# Check if this could be a table row
|
||
x_positions = [w['x0'] for w in words_in_row]
|
||
|
||
# Check for somewhat regular spacing
|
||
if self._has_regular_spacing(x_positions):
|
||
current_table_rows.append((y, words_in_row))
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
|
||
# Don't forget the last table
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
|
||
# Convert detected tables to DocumentElements
|
||
for table_idx, table_rows in enumerate(tables_found):
|
||
if not table_rows:
|
||
continue
|
||
|
||
# Calculate table bounding box
|
||
all_words = []
|
||
for _, words in table_rows:
|
||
all_words.extend(words)
|
||
|
||
min_x = min(w['x0'] for w in all_words)
|
||
min_y = min(w['y0'] for w in all_words)
|
||
max_x = max(w['x1'] for w in all_words)
|
||
max_y = max(w['y1'] for w in all_words)
|
||
|
||
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
|
||
|
||
# Create table cells
|
||
cells = []
|
||
for row_idx, (y, words) in enumerate(table_rows):
|
||
# Group words into columns
|
||
columns = self._group_into_columns(words, table_rows)
|
||
for col_idx, col_text in enumerate(columns):
|
||
if col_text:
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=col_text
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(table_rows),
|
||
cols=max(len(self._group_into_columns(words, table_rows))
|
||
for _, words in table_rows),
|
||
cells=cells
|
||
)
|
||
|
||
element = DocumentElement(
|
||
element_id=f"table_{page_num}_{counter + table_idx}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=0.8, # Lower confidence for positional detection
|
||
metadata={"detection_method": "positional"}
|
||
)
|
||
tables.append(element)
|
||
|
||
return tables
|
||
|
||
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
|
||
"""Check if x positions have somewhat regular spacing"""
|
||
if len(x_positions) < 3:
|
||
return False
|
||
|
||
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
|
||
avg_spacing = sum(spacings) / len(spacings)
|
||
|
||
# Check if spacings are within tolerance of average
|
||
for spacing in spacings:
|
||
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
|
||
"""Group words into columns based on x-position"""
|
||
if not words:
|
||
return []
|
||
|
||
# Find common column positions across all rows
|
||
all_x_positions = []
|
||
for _, row_words in all_rows:
|
||
all_x_positions.extend([w['x0'] for w in row_words])
|
||
|
||
# Cluster x-positions to find columns
|
||
column_positions = self._cluster_positions(all_x_positions)
|
||
|
||
# Assign words to columns
|
||
columns = [""] * len(column_positions)
|
||
for word in words:
|
||
# Find closest column
|
||
closest_col = 0
|
||
min_dist = float('inf')
|
||
for col_idx, col_x in enumerate(column_positions):
|
||
dist = abs(word['x0'] - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
closest_col = col_idx
|
||
|
||
if columns[closest_col]:
|
||
columns[closest_col] += " " + word['text']
|
||
else:
|
||
columns[closest_col] = word['text']
|
||
|
||
return columns
|
||
|
||
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
|
||
"""Cluster positions to find common columns"""
|
||
if not positions:
|
||
return []
|
||
|
||
sorted_pos = sorted(positions)
|
||
clusters = [[sorted_pos[0]]]
|
||
|
||
for pos in sorted_pos[1:]:
|
||
# Check if position belongs to current cluster
|
||
if pos - clusters[-1][-1] < threshold:
|
||
clusters[-1].append(pos)
|
||
else:
|
||
clusters.append([pos])
|
||
|
||
# Return average position of each cluster
|
||
return [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
def _extract_images(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path]) -> List[DocumentElement]:
|
||
"""Extract images from page"""
|
||
elements = []
|
||
image_list = page.get_images()
|
||
|
||
for img_idx, img in enumerate(image_list):
|
||
try:
|
||
xref = img[0]
|
||
|
||
# Get image position(s)
|
||
img_rects = page.get_image_rects(xref)
|
||
if not img_rects:
|
||
continue
|
||
|
||
rect = img_rects[0] # Use first occurrence
|
||
bbox = BoundingBox(
|
||
x0=rect.x0,
|
||
y0=rect.y0,
|
||
x1=rect.x1,
|
||
y1=rect.y1
|
||
)
|
||
|
||
# Extract image data
|
||
pix = fitz.Pixmap(page.parent, xref)
|
||
image_data = {
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"xref": xref
|
||
}
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
|
||
image_path = output_dir / image_filename
|
||
pix.save(str(image_path))
|
||
# Store relative filename only (consistent with OCR track)
|
||
# PDF generator will join with result_dir to get full path
|
||
image_data["saved_path"] = image_filename
|
||
logger.debug(f"Saved image to {image_path}")
|
||
|
||
element = DocumentElement(
|
||
element_id=f"image_{page_num}_{counter + img_idx}",
|
||
type=ElementType.IMAGE,
|
||
content=image_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata={
|
||
"image_index": img_idx,
|
||
"xref": xref
|
||
}
|
||
)
|
||
elements.append(element)
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting image {img_idx}: {e}")
|
||
|
||
return elements
|
||
|
||
def has_missing_images(self, page: fitz.Page) -> bool:
|
||
"""
|
||
Detect if a page likely has images that weren't extracted.
|
||
|
||
This checks for inline image blocks (type=1 in text dict) which indicate
|
||
graphics composed of many small image blocks (like logos) that
|
||
page.get_images() cannot detect.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
|
||
Returns:
|
||
True if there are likely missing images that need OCR extraction
|
||
"""
|
||
try:
|
||
# Check if get_images found anything
|
||
standard_images = page.get_images()
|
||
if standard_images:
|
||
return False # Standard images were found, no need for fallback
|
||
|
||
# Check for inline image blocks (type=1)
|
||
text_dict = page.get_text("dict", sort=True)
|
||
blocks = text_dict.get("blocks", [])
|
||
|
||
image_block_count = sum(1 for b in blocks if b.get("type") == 1)
|
||
|
||
# If there are many inline image blocks, likely there's a logo or graphic
|
||
if image_block_count >= 10:
|
||
logger.info(f"Detected {image_block_count} inline image blocks - may need OCR for image extraction")
|
||
return True
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Error checking for missing images: {e}")
|
||
return False
|
||
|
||
def check_document_for_missing_images(self, pdf_path: Path) -> List[int]:
|
||
"""
|
||
Check a PDF document for pages that likely have missing images.
|
||
|
||
This opens the PDF and checks each page for inline image blocks
|
||
that weren't extracted by get_images().
|
||
|
||
Args:
|
||
pdf_path: Path to the PDF file
|
||
|
||
Returns:
|
||
List of page numbers (1-indexed) that have missing images
|
||
"""
|
||
pages_with_missing_images = []
|
||
|
||
try:
|
||
doc = fitz.open(str(pdf_path))
|
||
for page_num in range(len(doc)):
|
||
page = doc[page_num]
|
||
if self.has_missing_images(page):
|
||
pages_with_missing_images.append(page_num + 1) # 1-indexed
|
||
doc.close()
|
||
|
||
if pages_with_missing_images:
|
||
logger.info(f"Document has missing images on pages: {pages_with_missing_images}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error checking document for missing images: {e}")
|
||
|
||
return pages_with_missing_images
|
||
|
||
def render_inline_image_regions(
|
||
self,
|
||
pdf_path: Path,
|
||
unified_doc: 'UnifiedDocument',
|
||
pages: List[int],
|
||
output_dir: Optional[Path] = None
|
||
) -> int:
|
||
"""
|
||
Render inline image regions and add them to the unified document.
|
||
|
||
This is a fallback when OCR doesn't detect images. It clusters inline
|
||
image blocks (type=1) and renders them as images.
|
||
|
||
Args:
|
||
pdf_path: Path to the PDF file
|
||
unified_doc: UnifiedDocument to add images to
|
||
pages: List of page numbers (1-indexed) to process
|
||
output_dir: Directory to save rendered images
|
||
|
||
Returns:
|
||
Number of images added
|
||
"""
|
||
images_added = 0
|
||
|
||
try:
|
||
doc = fitz.open(str(pdf_path))
|
||
|
||
for page_num in pages:
|
||
if page_num < 1 or page_num > len(doc):
|
||
continue
|
||
|
||
page = doc[page_num - 1] # 0-indexed
|
||
page_rect = page.rect
|
||
|
||
# Get inline image blocks
|
||
text_dict = page.get_text("dict", sort=True)
|
||
blocks = text_dict.get("blocks", [])
|
||
|
||
image_blocks = []
|
||
for block in blocks:
|
||
if block.get("type") == 1: # Image block
|
||
bbox = block.get("bbox")
|
||
if bbox:
|
||
image_blocks.append(fitz.Rect(bbox))
|
||
|
||
if len(image_blocks) < 5: # Reduced from 10
|
||
logger.debug(f"Page {page_num}: Only {len(image_blocks)} inline image blocks, skipping")
|
||
continue
|
||
|
||
logger.info(f"Page {page_num}: Found {len(image_blocks)} inline image blocks")
|
||
|
||
# Cluster nearby image blocks
|
||
regions = self._cluster_nearby_rects(image_blocks, tolerance=5.0)
|
||
logger.info(f"Page {page_num}: Clustered into {len(regions)} regions")
|
||
|
||
# Find the corresponding page in unified_doc
|
||
target_page = None
|
||
for p in unified_doc.pages:
|
||
if p.page_number == page_num:
|
||
target_page = p
|
||
break
|
||
|
||
if not target_page:
|
||
continue
|
||
|
||
for region_idx, region_rect in enumerate(regions):
|
||
logger.info(f"Page {page_num} region {region_idx}: {region_rect} (w={region_rect.width:.1f}, h={region_rect.height:.1f})")
|
||
|
||
# Skip very small regions
|
||
if region_rect.width < 30 or region_rect.height < 30:
|
||
logger.info(f" -> Skipped: too small (min 30x30)")
|
||
continue
|
||
|
||
# Skip regions that are primarily in the table area (below top 40%)
|
||
# But allow regions that START in the top portion
|
||
page_30_pct = page_rect.height * 0.3
|
||
page_40_pct = page_rect.height * 0.4
|
||
if region_rect.y0 > page_40_pct:
|
||
logger.info(f" -> Skipped: y0={region_rect.y0:.1f} > 40% of page ({page_40_pct:.1f})")
|
||
continue
|
||
|
||
logger.info(f"Rendering inline image region {region_idx} on page {page_num}: {region_rect}")
|
||
|
||
try:
|
||
# Add small padding
|
||
clip_rect = region_rect + (-2, -2, 2, 2)
|
||
clip_rect.intersect(page_rect)
|
||
|
||
# Render at 2x resolution
|
||
mat = fitz.Matrix(2, 2)
|
||
pix = page.get_pixmap(clip=clip_rect, matrix=mat, alpha=False)
|
||
|
||
# Create bounding box
|
||
bbox = BoundingBox(
|
||
x0=clip_rect.x0,
|
||
y0=clip_rect.y0,
|
||
x1=clip_rect.x1,
|
||
y1=clip_rect.y1
|
||
)
|
||
|
||
image_data = {
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": "rgb",
|
||
"type": "inline_region"
|
||
}
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
doc_id = unified_doc.document_id or "unknown"
|
||
image_filename = f"{doc_id}_p{page_num}_logo{region_idx}.png"
|
||
image_path = output_dir / image_filename
|
||
pix.save(str(image_path))
|
||
image_data["saved_path"] = image_filename
|
||
logger.info(f"Saved inline image region to {image_path}")
|
||
|
||
element = DocumentElement(
|
||
element_id=f"logo_{page_num}_{region_idx}",
|
||
type=ElementType.LOGO,
|
||
content=image_data,
|
||
bbox=bbox,
|
||
confidence=0.9,
|
||
metadata={
|
||
"region_type": "inline_image_blocks",
|
||
"block_count": len(image_blocks)
|
||
}
|
||
)
|
||
target_page.elements.append(element)
|
||
images_added += 1
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering inline image region {region_idx}: {e}")
|
||
|
||
doc.close()
|
||
|
||
if images_added > 0:
|
||
logger.info(f"Added {images_added} inline image regions to document")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering inline image regions: {e}")
|
||
|
||
return images_added
|
||
|
||
def _cluster_nearby_rects(self, rects: List[fitz.Rect], tolerance: float = 5.0) -> List[fitz.Rect]:
|
||
"""Cluster nearby rectangles into regions."""
|
||
if not rects:
|
||
return []
|
||
|
||
sorted_rects = sorted(rects, key=lambda r: (r.y0, r.x0))
|
||
|
||
merged = []
|
||
for rect in sorted_rects:
|
||
merged_with_existing = False
|
||
for i, region in enumerate(merged):
|
||
expanded = region + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded.intersects(rect):
|
||
merged[i] = region | rect
|
||
merged_with_existing = True
|
||
break
|
||
if not merged_with_existing:
|
||
merged.append(rect)
|
||
|
||
# Second pass: merge any regions that now overlap
|
||
changed = True
|
||
while changed:
|
||
changed = False
|
||
new_merged = []
|
||
skip = set()
|
||
|
||
for i, r1 in enumerate(merged):
|
||
if i in skip:
|
||
continue
|
||
current = r1
|
||
for j, r2 in enumerate(merged[i+1:], start=i+1):
|
||
if j in skip:
|
||
continue
|
||
expanded = current + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded.intersects(r2):
|
||
current = current | r2
|
||
skip.add(j)
|
||
changed = True
|
||
new_merged.append(current)
|
||
merged = new_merged
|
||
|
||
return merged
|
||
|
||
def _extract_vector_graphics(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path],
|
||
table_bboxes: Optional[List[BoundingBox]] = None) -> List[DocumentElement]:
|
||
"""
|
||
Extract vector graphics (charts, diagrams) from page.
|
||
|
||
This method identifies regions that are composed of vector drawing commands
|
||
(paths, lines, rectangles) rather than embedded raster images. These are
|
||
typically charts created in Excel, vector diagrams, or other graphics.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
page_num: Page number (1-indexed)
|
||
document_id: Unique document identifier
|
||
counter: Starting counter for element IDs
|
||
output_dir: Directory to save rendered graphics
|
||
table_bboxes: List of table bounding boxes to exclude table border drawings
|
||
|
||
Returns:
|
||
List of DocumentElement objects representing vector graphics
|
||
"""
|
||
elements = []
|
||
|
||
try:
|
||
# Get all drawing commands
|
||
drawings = page.get_drawings()
|
||
if not drawings:
|
||
return elements
|
||
|
||
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
|
||
|
||
# Filter out drawings that are likely table borders
|
||
# Table borders are typically thin rectangular lines within table regions
|
||
non_table_drawings = self._filter_table_border_drawings(drawings, table_bboxes)
|
||
logger.debug(f"After filtering table borders: {len(non_table_drawings)} drawings remain")
|
||
|
||
if not non_table_drawings:
|
||
logger.debug("All drawings appear to be table borders, no vector graphics to extract")
|
||
return elements
|
||
|
||
# Cluster drawings into groups (charts, diagrams, etc.)
|
||
try:
|
||
# Use custom clustering that only considers non-table drawings
|
||
drawing_clusters = self._cluster_non_table_drawings(page, non_table_drawings)
|
||
logger.debug(f"Clustered into {len(drawing_clusters)} groups")
|
||
except (AttributeError, TypeError) as e:
|
||
# cluster_drawings not available or has different signature
|
||
# Fallback: try to identify charts by analyzing drawing density
|
||
logger.warning(f"Custom clustering failed ({e}), using fallback method")
|
||
drawing_clusters = self._cluster_drawings_fallback(page, non_table_drawings)
|
||
|
||
for cluster_idx, bbox in enumerate(drawing_clusters):
|
||
# Ignore small regions (likely noise or separator lines)
|
||
if bbox.width < 50 or bbox.height < 50:
|
||
logger.debug(f"Skipping small cluster {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f}")
|
||
continue
|
||
|
||
# Render the region to a raster image
|
||
# matrix=fitz.Matrix(2, 2) increases resolution to ~200 DPI
|
||
try:
|
||
pix = page.get_pixmap(clip=bbox, matrix=fitz.Matrix(2, 2))
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
filename = f"{document_id}_p{page_num}_chart{cluster_idx}.png"
|
||
filepath = output_dir / filename
|
||
pix.save(str(filepath))
|
||
|
||
# Create DocumentElement
|
||
image_data = {
|
||
"saved_path": str(filepath),
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"source": "vector_graphics"
|
||
}
|
||
|
||
element = DocumentElement(
|
||
element_id=f"chart_{page_num}_{counter + cluster_idx}",
|
||
type=ElementType.CHART, # Use CHART type for vector graphics
|
||
content=image_data,
|
||
bbox=BoundingBox(
|
||
x0=bbox.x0,
|
||
y0=bbox.y0,
|
||
x1=bbox.x1,
|
||
y1=bbox.y1
|
||
),
|
||
confidence=0.85, # Slightly lower confidence than raster images
|
||
metadata={
|
||
"cluster_index": cluster_idx,
|
||
"drawing_count": len(drawings)
|
||
}
|
||
)
|
||
elements.append(element)
|
||
logger.debug(f"Extracted chart {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f} -> {filepath}")
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering vector graphic cluster {cluster_idx}: {e}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting vector graphics: {e}")
|
||
|
||
return elements
|
||
|
||
def _cluster_drawings_fallback(self, page: fitz.Page, drawings: list) -> list:
|
||
"""
|
||
Fallback method to cluster drawings when cluster_drawings() is not available.
|
||
|
||
This uses a simple spatial clustering approach based on bounding boxes.
|
||
"""
|
||
if not drawings:
|
||
return []
|
||
|
||
# Collect all drawing bounding boxes
|
||
bboxes = []
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if rect:
|
||
bboxes.append(fitz.Rect(rect))
|
||
|
||
if not bboxes:
|
||
return []
|
||
|
||
# Simple clustering: merge overlapping or nearby rectangles
|
||
clusters = []
|
||
tolerance = 20
|
||
|
||
for bbox in bboxes:
|
||
# Try to merge with existing cluster
|
||
merged = False
|
||
for i, cluster in enumerate(clusters):
|
||
# Check if bbox is close to this cluster
|
||
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded_cluster.intersects(bbox):
|
||
# Merge bbox into cluster
|
||
clusters[i] = cluster | bbox # Union of rectangles
|
||
merged = True
|
||
break
|
||
|
||
if not merged:
|
||
# Create new cluster
|
||
clusters.append(bbox)
|
||
|
||
# Filter out very small clusters
|
||
filtered_clusters = [c for c in clusters if c.width >= 50 and c.height >= 50]
|
||
|
||
logger.debug(f"Fallback clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
|
||
|
||
return filtered_clusters
|
||
|
||
def _filter_table_border_drawings(self, drawings: list, table_bboxes: Optional[List[BoundingBox]]) -> list:
|
||
"""
|
||
Filter out drawings that are likely table borders.
|
||
|
||
Table borders are typically:
|
||
- Thin rectangular lines (height or width < 5pt)
|
||
- Located within or on the edge of table bounding boxes
|
||
|
||
Args:
|
||
drawings: List of PyMuPDF drawing objects
|
||
table_bboxes: List of table bounding boxes
|
||
|
||
Returns:
|
||
List of drawings that are NOT table borders (likely logos, charts, etc.)
|
||
"""
|
||
if not table_bboxes:
|
||
return drawings
|
||
|
||
non_table_drawings = []
|
||
table_border_count = 0
|
||
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if not rect:
|
||
continue
|
||
|
||
draw_rect = fitz.Rect(rect)
|
||
|
||
# Check if this drawing is a thin line (potential table border)
|
||
is_thin_line = draw_rect.width < 5 or draw_rect.height < 5
|
||
|
||
# Check if drawing overlaps significantly with any table
|
||
overlaps_table = False
|
||
for table_bbox in table_bboxes:
|
||
table_rect = fitz.Rect(table_bbox.x0, table_bbox.y0, table_bbox.x1, table_bbox.y1)
|
||
|
||
# Expand table rect slightly to include border lines on edges
|
||
expanded_table = table_rect + (-5, -5, 5, 5)
|
||
|
||
if expanded_table.contains(draw_rect) or expanded_table.intersects(draw_rect):
|
||
# Calculate overlap ratio
|
||
intersection = draw_rect & expanded_table
|
||
if not intersection.is_empty:
|
||
overlap_ratio = intersection.get_area() / draw_rect.get_area() if draw_rect.get_area() > 0 else 0
|
||
|
||
# If drawing is mostly inside table region, it's likely a border
|
||
if overlap_ratio > 0.8:
|
||
overlaps_table = True
|
||
break
|
||
|
||
# Keep drawing if it's NOT (thin line AND overlapping table)
|
||
# This keeps: logos (complex shapes), charts outside tables, etc.
|
||
if is_thin_line and overlaps_table:
|
||
table_border_count += 1
|
||
else:
|
||
non_table_drawings.append(drawing)
|
||
|
||
if table_border_count > 0:
|
||
logger.debug(f"Filtered out {table_border_count} table border drawings")
|
||
|
||
return non_table_drawings
|
||
|
||
def _cluster_non_table_drawings(self, page: fitz.Page, drawings: list) -> list:
|
||
"""
|
||
Cluster non-table drawings into groups.
|
||
|
||
This method clusters drawings that have been pre-filtered to exclude table borders.
|
||
It uses a more conservative clustering approach suitable for logos and charts.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
drawings: Pre-filtered list of drawings (excluding table borders)
|
||
|
||
Returns:
|
||
List of fitz.Rect representing clustered drawing regions
|
||
"""
|
||
if not drawings:
|
||
return []
|
||
|
||
# Collect all drawing bounding boxes
|
||
bboxes = []
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if rect:
|
||
bboxes.append(fitz.Rect(rect))
|
||
|
||
if not bboxes:
|
||
return []
|
||
|
||
# More conservative clustering with smaller tolerance
|
||
# This prevents grouping distant graphics together
|
||
clusters = []
|
||
tolerance = 10 # Smaller tolerance than fallback (was 20)
|
||
|
||
for bbox in bboxes:
|
||
# Try to merge with existing cluster
|
||
merged = False
|
||
for i, cluster in enumerate(clusters):
|
||
# Check if bbox is close to this cluster
|
||
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded_cluster.intersects(bbox):
|
||
# Merge bbox into cluster
|
||
clusters[i] = cluster | bbox # Union of rectangles
|
||
merged = True
|
||
break
|
||
|
||
if not merged:
|
||
# Create new cluster
|
||
clusters.append(bbox)
|
||
|
||
# Filter out very small clusters (noise)
|
||
# Keep minimum 30x30 for logos (smaller than default 50x50)
|
||
filtered_clusters = [c for c in clusters if c.width >= 30 and c.height >= 30]
|
||
|
||
logger.debug(f"Non-table clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
|
||
|
||
return filtered_clusters
|
||
|
||
def _deduplicate_table_chart_overlap(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""
|
||
Intelligently resolve TABLE-CHART overlaps based on table structure completeness.
|
||
|
||
When a region is detected as both TABLE and CHART:
|
||
- Calculate cell completeness = actual_cells / (rows × cols)
|
||
- If completeness ≥50% → Real table with complete structure → Keep TABLE
|
||
- If completeness <50% → False positive (chart detected as table) → Keep CHART
|
||
|
||
Args:
|
||
elements: List of extracted elements
|
||
|
||
Returns:
|
||
Filtered list with low-quality overlaps removed
|
||
"""
|
||
# Collect all tables and charts
|
||
tables = [elem for elem in elements if elem.type == ElementType.TABLE]
|
||
charts = [elem for elem in elements if elem.type == ElementType.CHART]
|
||
|
||
if not tables or not charts:
|
||
return elements # No potential conflicts
|
||
|
||
# Analyze TABLE structure completeness
|
||
table_completeness = {}
|
||
for table in tables:
|
||
if hasattr(table.content, 'rows') and hasattr(table.content, 'cols') and hasattr(table.content, 'cells'):
|
||
expected_cells = table.content.rows * table.content.cols
|
||
actual_cells = len(table.content.cells)
|
||
|
||
if expected_cells > 0:
|
||
completeness = actual_cells / expected_cells
|
||
table_completeness[table.element_id] = completeness
|
||
else:
|
||
table_completeness[table.element_id] = 0.0
|
||
else:
|
||
table_completeness[table.element_id] = 0.0
|
||
|
||
# Check overlaps and decide what to keep
|
||
filtered_elements = []
|
||
removed_charts = 0
|
||
removed_tables = 0
|
||
|
||
# Process TABLEs
|
||
for table in tables:
|
||
if not table.bbox:
|
||
filtered_elements.append(table)
|
||
continue
|
||
|
||
# Check if this TABLE overlaps with any CHART
|
||
overlaps_chart = False
|
||
for chart in charts:
|
||
if not chart.bbox:
|
||
continue
|
||
|
||
# Calculate overlap
|
||
overlap_x0 = max(table.bbox.x0, chart.bbox.x0)
|
||
overlap_y0 = max(table.bbox.y0, chart.bbox.y0)
|
||
overlap_x1 = min(table.bbox.x1, chart.bbox.x1)
|
||
overlap_y1 = min(table.bbox.y1, chart.bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
||
table_area = (table.bbox.x1 - table.bbox.x0) * (table.bbox.y1 - table.bbox.y0)
|
||
|
||
if table_area > 0:
|
||
overlap_ratio = overlap_area / table_area
|
||
|
||
if overlap_ratio >= 0.8:
|
||
overlaps_chart = True
|
||
completeness = table_completeness.get(table.element_id, 0.0)
|
||
|
||
logger.debug(
|
||
f"TABLE-CHART overlap: {table.element_id} vs {chart.element_id}: "
|
||
f"{overlap_ratio*100:.1f}% overlap, TABLE cell completeness: {completeness*100:.1f}%"
|
||
)
|
||
|
||
# Decision: Keep TABLE only if structure is complete
|
||
if completeness < 0.5: # <50% cell completeness
|
||
logger.info(
|
||
f"Removing incomplete TABLE {table.element_id} "
|
||
f"({completeness*100:.1f}% completeness, overlaps with CHART {chart.element_id})"
|
||
)
|
||
removed_tables += 1
|
||
break
|
||
else:
|
||
logger.info(
|
||
f"Keeping TABLE {table.element_id} with {completeness*100:.1f}% completeness "
|
||
f"(will remove overlapping CHART {chart.element_id})"
|
||
)
|
||
|
||
if not overlaps_chart or table_completeness.get(table.element_id, 0.0) >= 0.5:
|
||
filtered_elements.append(table)
|
||
|
||
# Process CHARTs
|
||
for chart in charts:
|
||
if not chart.bbox:
|
||
filtered_elements.append(chart)
|
||
continue
|
||
|
||
# Check if this CHART should be removed due to overlap with high-quality TABLE
|
||
should_remove = False
|
||
for table in tables:
|
||
if not table.bbox:
|
||
continue
|
||
|
||
# Calculate overlap
|
||
overlap_x0 = max(chart.bbox.x0, table.bbox.x0)
|
||
overlap_y0 = max(chart.bbox.y0, table.bbox.y0)
|
||
overlap_x1 = min(chart.bbox.x1, table.bbox.x1)
|
||
overlap_y1 = min(chart.bbox.y1, table.bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
||
chart_area = (chart.bbox.x1 - chart.bbox.x0) * (chart.bbox.y1 - chart.bbox.y0)
|
||
|
||
if chart_area > 0:
|
||
overlap_ratio = overlap_area / chart_area
|
||
|
||
if overlap_ratio >= 0.8:
|
||
completeness = table_completeness.get(table.element_id, 0.0)
|
||
|
||
# Remove CHART only if TABLE structure is complete
|
||
if completeness >= 0.5:
|
||
should_remove = True
|
||
logger.info(
|
||
f"Removing CHART {chart.element_id} "
|
||
f"({overlap_ratio*100:.1f}% overlap with TABLE {table.element_id} having {completeness*100:.1f}% completeness)"
|
||
)
|
||
removed_charts += 1
|
||
break
|
||
|
||
if not should_remove:
|
||
filtered_elements.append(chart)
|
||
|
||
# Process all other elements
|
||
for elem in elements:
|
||
if elem.type not in [ElementType.TABLE, ElementType.CHART]:
|
||
filtered_elements.append(elem)
|
||
|
||
if removed_charts > 0 or removed_tables > 0:
|
||
logger.info(
|
||
f"Deduplication complete: removed {removed_tables} incomplete TABLE(s), "
|
||
f"{removed_charts} overlapping CHART(s)"
|
||
)
|
||
|
||
return filtered_elements |