Files
OCR/backend/app/services/direct_extraction_engine.py
egg 24253ac15e feat: unify Direct Track PDF rendering and simplify export options
Backend changes:
- Apply background image + invisible text layer to all Direct Track PDFs
- Add CHART to regions_to_avoid for text extraction
- Improve visual fidelity for native PDFs and Office documents

Frontend changes:
- Remove JSON, UnifiedDocument, Markdown download buttons
- Simplify to 2-column layout with only Layout PDF and Reflow PDF
- Remove translation JSON download and Layout PDF option
- Keep only Reflow PDF for translated document downloads
- Clean up unused imports (FileJson, Database, FileOutput)

Archives two OpenSpec proposals:
- unify-direct-track-pdf-rendering
- simplify-frontend-export-options

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 07:50:43 +08:00

3599 lines
146 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Direct Extraction Engine using PyMuPDF
Handles direct text and structure extraction from editable PDFs without OCR.
This provides much faster processing and perfect accuracy for documents with
extractable text.
"""
import os
import logging
import fitz # PyMuPDF
import uuid
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
from datetime import datetime
import re
from ..models.unified_document import (
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
ElementType, ProcessingTrack
)
logger = logging.getLogger(__name__)
class DirectExtractionEngine:
"""
Engine for direct text extraction from editable PDFs using PyMuPDF.
This engine provides:
- Fast text extraction with exact positioning
- Font and style information preservation
- Table structure detection
- Image extraction with coordinates
- Hyperlink and annotation extraction
"""
def __init__(self,
enable_table_detection: bool = True,
enable_image_extraction: bool = True,
min_table_rows: int = 2,
min_table_cols: int = 2,
min_image_area: float = 200.0,
# Preprocessing pipeline options
enable_content_sanitization: bool = True,
enable_hidden_layer_removal: bool = True,
enable_whiteout_detection: bool = True,
whiteout_iou_threshold: float = 0.8,
enable_page_number_filter: bool = True,
enable_garble_detection: bool = True,
garble_ocr_fallback_threshold: float = 0.1):
"""
Initialize the extraction engine.
Args:
enable_table_detection: Whether to detect and extract tables
enable_image_extraction: Whether to extract images
min_table_rows: Minimum rows for table detection
min_table_cols: Minimum columns for table detection
min_image_area: Minimum image area in pixels squared (default 200)
Images smaller than this are filtered as decorations
Preprocessing pipeline options:
enable_content_sanitization: Run clean_contents() to fix malformed PDF streams
enable_hidden_layer_removal: Remove content from hidden OCG layers
enable_whiteout_detection: Detect and filter text covered by white rectangles
whiteout_iou_threshold: IoU threshold for white-out detection (default 0.8)
enable_page_number_filter: Filter out detected page numbers
enable_garble_detection: Detect garbled text (cid:xxxx patterns)
garble_ocr_fallback_threshold: Garble rate threshold to recommend OCR fallback
"""
self.enable_table_detection = enable_table_detection
self.enable_image_extraction = enable_image_extraction
self.min_table_rows = min_table_rows
self.min_table_cols = min_table_cols
self.min_image_area = min_image_area
# Preprocessing pipeline options
self.enable_content_sanitization = enable_content_sanitization
self.enable_hidden_layer_removal = enable_hidden_layer_removal
self.enable_whiteout_detection = enable_whiteout_detection
self.whiteout_iou_threshold = whiteout_iou_threshold
self.enable_page_number_filter = enable_page_number_filter
self.enable_garble_detection = enable_garble_detection
self.garble_ocr_fallback_threshold = garble_ocr_fallback_threshold
def extract(self,
file_path: Union[str, Path],
output_dir: Optional[Path] = None) -> UnifiedDocument:
"""
Extract content from PDF file to UnifiedDocument format.
Args:
file_path: Path to PDF file (string or Path object)
output_dir: Optional directory to save extracted images.
If not provided, creates a temporary directory in storage/results/{document_id}/
Returns:
UnifiedDocument with extracted content
"""
# Ensure file_path is a Path object
if isinstance(file_path, str):
file_path = Path(file_path)
start_time = datetime.now()
document_id = str(uuid.uuid4())[:8] # Short ID for cleaner paths
try:
doc = fitz.open(str(file_path))
# If no output_dir provided, create default directory for image extraction
if output_dir is None and self.enable_image_extraction:
# Create temporary directory in storage/results
default_output_dir = Path("storage/results") / document_id
default_output_dir.mkdir(parents=True, exist_ok=True)
output_dir = default_output_dir
logger.debug(f"Created default output directory: {output_dir}")
# Extract document metadata
metadata = self._extract_metadata(file_path, doc, start_time)
# Extract pages
pages = []
for page_num in range(len(doc)):
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
page = self._extract_page(
doc[page_num],
page_num + 1,
document_id,
output_dir,
doc # Pass doc for covering image detection
)
pages.append(page)
doc.close()
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
metadata.processing_time = processing_time
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages
)
except Exception as e:
logger.error(f"Error during direct extraction: {e}")
# Return partial result with error information
processing_time = (datetime.now() - start_time).total_seconds()
if 'metadata' not in locals():
metadata = DocumentMetadata(
filename=file_path.name,
file_type="pdf",
file_size=file_path.stat().st_size if file_path.exists() else 0,
created_at=datetime.now(),
processing_track=ProcessingTrack.DIRECT,
processing_time=processing_time
)
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages if 'pages' in locals() else [],
processing_errors=[{
"error": str(e),
"type": type(e).__name__
}]
)
def _extract_metadata(self,
file_path: Path,
doc: fitz.Document,
start_time: datetime) -> DocumentMetadata:
"""Extract document metadata"""
pdf_metadata = doc.metadata
return DocumentMetadata(
filename=file_path.name,
file_type="pdf",
file_size=file_path.stat().st_size,
created_at=start_time,
processing_track=ProcessingTrack.DIRECT,
processing_time=0.0, # Will be updated later
title=pdf_metadata.get("title"),
author=pdf_metadata.get("author"),
subject=pdf_metadata.get("subject"),
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
producer=pdf_metadata.get("producer"),
creator=pdf_metadata.get("creator"),
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
)
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
"""Parse PDF date string to datetime"""
if not date_str:
return None
try:
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
# Example: D:20240101120000+09'00
if date_str.startswith("D:"):
date_str = date_str[2:]
# Extract just the date/time part (first 14 characters)
if len(date_str) >= 14:
date_part = date_str[:14]
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
except:
pass
return None
def _extract_page(self,
page: fitz.Page,
page_num: int,
document_id: str,
output_dir: Optional[Path],
doc: fitz.Document = None) -> Page:
"""Extract content from a single page with preprocessing pipeline."""
elements = []
element_counter = 0
# =====================================================================
# PREPROCESSING PIPELINE
# =====================================================================
# Step 1: Run preprocessing (sanitization, white-out detection, covering images)
preprocess_result = self._preprocess_page(page, page_num, doc)
covered_bboxes = preprocess_result.get('covered_word_bboxes', [])
# Get page-level metadata (for final Page metadata)
drawings = page.get_drawings()
links = page.get_links()
# Get page dimensions
rect = page.rect
dimensions = Dimensions(
width=rect.width,
height=rect.height,
dpi=72 # PDF standard DPI
)
# Extract tables first (if enabled) to get table regions
table_bboxes = []
if self.enable_table_detection:
try:
# Try native table detection (PyMuPDF 1.23.0+)
tables = page.find_tables()
for table_idx, table in enumerate(tables):
element = self._process_native_table(
table, page, page_num, element_counter
)
if element and element.bbox:
elements.append(element)
table_bboxes.append(element.bbox)
element_counter += 1
except AttributeError:
# Fallback to positional table detection
logger.debug("Native table detection not available, using positional detection")
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
for elem in table_elements:
if elem.bbox:
table_bboxes.append(elem.bbox)
elements.extend(table_elements)
element_counter += len(table_elements)
# Extract text blocks with formatting (sort=True for reading order)
# Filter out lines that overlap with table regions OR covered by white-out
text_dict = page.get_text("dict", sort=True)
for block_idx, block in enumerate(text_dict.get("blocks", [])):
if block.get("type") == 0: # Text block
element = self._process_text_block(
block, page_num, element_counter, table_bboxes
)
if element:
# Step 1.3: Skip text covered by white-out rectangles
if covered_bboxes and element.bbox:
if self._is_text_in_covered_regions(element.bbox, covered_bboxes):
logger.debug(f"Skipping white-out covered text: {element.element_id}")
continue
elements.append(element)
element_counter += 1
# Extract images (if enabled)
# Pass covering_images and covering_rect_bboxes to filter out redaction/covering rectangles
if self.enable_image_extraction:
covering_images = preprocess_result.get('covering_images', [])
covering_rect_bboxes = preprocess_result.get('covering_rect_bboxes', [])
image_elements = self._extract_images(
page, page_num, document_id, element_counter, output_dir,
covering_images=covering_images,
covered_bboxes=covering_rect_bboxes # Pass actual covering vector rectangles
)
elements.extend(image_elements)
element_counter += len(image_elements)
# Extract vector graphics (charts, diagrams) from drawing commands
# Pass table_bboxes to filter out table border drawings before clustering
if self.enable_image_extraction:
vector_elements = self._extract_vector_graphics(
page, page_num, document_id, element_counter, output_dir,
table_bboxes=table_bboxes
)
elements.extend(vector_elements)
element_counter += len(vector_elements)
# Extract hyperlinks
links = page.get_links()
for link_idx, link in enumerate(links):
# Create link annotation element if it has URI
if link.get("uri"):
from_rect = link.get("from")
if from_rect:
element = DocumentElement(
element_id=f"link_{page_num}_{element_counter}",
type=ElementType.REFERENCE,
content={"uri": link["uri"], "type": "hyperlink"},
bbox=BoundingBox(
x0=from_rect.x0,
y0=from_rect.y0,
x1=from_rect.x1,
y1=from_rect.y1
),
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
)
elements.append(element)
element_counter += 1
# PyMuPDF's sort=True already provides good reading order for multi-column layouts
# (top-to-bottom, left-to-right within each row). We don't need to re-sort.
# NOTE: If sort=True is not used in get_text(), uncomment the line below:
# elements = self._sort_elements_for_reading_order(elements, dimensions)
# Deduplicate: Remove CHART elements that overlap with TABLE elements
# (Tables have structured data, so they take priority over vector graphics)
elements = self._deduplicate_table_chart_overlap(elements)
# Post-process elements for header/footer detection and structure
elements = self._detect_headers_footers(elements, dimensions)
elements = self._build_section_hierarchy(elements)
elements = self._build_nested_lists(elements)
# =====================================================================
# POST-PROCESSING PIPELINE
# =====================================================================
# Step 2.3: Filter page numbers
elements = self._filter_page_numbers(elements, dimensions.height)
# Step 3.2-3.3: Garble detection and OCR fallback recommendation
covering_images = preprocess_result.get('covering_images', [])
page_metadata = {
"has_drawings": len(drawings) > 0,
"drawing_count": len(drawings),
"link_count": len(links),
"preprocessing": {
"sanitized": preprocess_result.get('sanitized', False),
"whiteout_regions_found": len(covered_bboxes) - len(covering_images), # Vector rects only
"covering_images_found": len(covering_images),
"covering_images": covering_images # Full details for debugging
}
}
# Calculate garble rate for the page
if self.enable_garble_detection:
full_text = ' '.join(
elem.get_text() if hasattr(elem, 'get_text') else str(elem.content)
for elem in elements
if elem.type in [ElementType.TEXT, ElementType.PARAGRAPH, ElementType.TITLE]
)
garble_rate = self._calculate_garble_rate(full_text)
page_metadata['garble_rate'] = garble_rate
page_metadata['needs_ocr_fallback'] = self._should_fallback_to_ocr(full_text, page_num)
return Page(
page_number=page_num,
elements=elements,
dimensions=dimensions,
metadata=page_metadata
)
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
"""
Sort elements by reading order, handling multi-column layouts.
For multi-column layouts (e.g., two-column documents), this ensures
elements are ordered correctly: top-to-bottom, then left-to-right
within each row.
Args:
elements: List of document elements
dimensions: Page dimensions
Returns:
Sorted list of elements in reading order
"""
if not elements:
return elements
# Detect if page has multi-column layout
text_elements = [e for e in elements if e.bbox and e.is_text]
if len(text_elements) < 3:
# Too few elements to determine layout, just sort by Y position
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
# Cluster x-positions to detect columns
x_positions = [e.bbox.x0 for e in text_elements]
columns = self._detect_columns(x_positions, dimensions.width)
if len(columns) <= 1:
# Single column layout - simple top-to-bottom sort
logger.debug(f"Detected single-column layout")
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}")
# Multi-column layout - use newspaper-style reading order
# (complete left column, then right column, etc.)
# This is more appropriate for technical documents and data sheets
element_data = []
for elem in elements:
if not elem.bbox:
element_data.append((elem, 0, 0))
continue
# Find which column this element belongs to
col_idx = 0
min_dist = float('inf')
for i, col_x in enumerate(columns):
dist = abs(elem.bbox.x0 - col_x)
if dist < min_dist:
min_dist = dist
col_idx = i
element_data.append((elem, col_idx, elem.bbox.y0))
# Sort by: column first, then Y position within column
# This gives newspaper-style reading: complete column 1, then column 2, etc.
element_data.sort(key=lambda x: (x[1], x[2]))
logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)")
return [e[0] for e in element_data]
def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]:
"""
Detect column positions from x-coordinates of text elements.
Args:
x_positions: List of x-coordinates (left edges of text)
page_width: Page width in points
Returns:
List of column x-positions (sorted left to right)
"""
if not x_positions:
return []
# Cluster x-positions to find column starts
# Use k-means-like approach: find groups of x-positions
threshold = page_width * 0.15 # 15% of page width as clustering threshold
sorted_x = sorted(set(x_positions))
if not sorted_x:
return []
clusters = [[sorted_x[0]]]
for x in sorted_x[1:]:
# Check if x belongs to current cluster
cluster_center = sum(clusters[-1]) / len(clusters[-1])
if abs(x - cluster_center) < threshold:
clusters[-1].append(x)
else:
# Start new cluster
clusters.append([x])
# Return average x position of each cluster (column start)
column_positions = [sum(cluster) / len(cluster) for cluster in clusters]
# Filter out columns that are too close to each other
min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart
filtered_columns = [column_positions[0]]
for col_x in column_positions[1:]:
if col_x - filtered_columns[-1] >= min_column_width:
filtered_columns.append(col_x)
return filtered_columns
def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
"""Detect and mark header/footer elements based on page position"""
page_height = dimensions.height
header_threshold = page_height * 0.1 # Top 10% of page
footer_threshold = page_height * 0.9 # Bottom 10% of page
for elem in elements:
# Skip non-text elements
if not elem.is_text:
continue
# Check if element is in header region
if elem.bbox.y1 <= header_threshold:
# Only mark as header if it's short text
if isinstance(elem.content, str) and len(elem.content) < 200:
elem.type = ElementType.HEADER
elem.metadata['is_page_header'] = True
# Check if element is in footer region
elif elem.bbox.y0 >= footer_threshold:
# Short text in footer region
if isinstance(elem.content, str) and len(elem.content) < 200:
elem.type = ElementType.FOOTER
elem.metadata['is_page_footer'] = True
return elements
def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]:
"""Build hierarchical section structure based on font sizes"""
# Collect all headers with their font sizes
headers = []
for elem in elements:
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
# Get average font size from style
font_size = 12.0 # Default
if elem.style and elem.style.font_size:
font_size = elem.style.font_size
headers.append((elem, font_size))
if not headers:
return elements
# Sort headers by font size to determine hierarchy levels
font_sizes = sorted(set(size for _, size in headers), reverse=True)
size_to_level = {size: level for level, size in enumerate(font_sizes, 1)}
# Assign section levels to headers
for elem, font_size in headers:
level = size_to_level.get(font_size, 1)
elem.metadata['section_level'] = level
elem.metadata['font_size'] = font_size
# Build parent-child relationships between headers
header_stack = [] # Stack of (element, level)
for elem, font_size in headers:
level = elem.metadata['section_level']
# Pop headers that are at same or lower level (larger font)
while header_stack and header_stack[-1][1] >= level:
header_stack.pop()
# Set parent header
if header_stack:
parent = header_stack[-1][0]
elem.metadata['parent_section'] = parent.element_id
if 'child_sections' not in parent.metadata:
parent.metadata['child_sections'] = []
parent.metadata['child_sections'].append(elem.element_id)
header_stack.append((elem, level))
# Link content to nearest preceding header at same or higher level
current_header = None
for elem in elements:
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
current_header = elem
elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]:
elem.metadata['section_id'] = current_header.element_id
return elements
def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]:
"""Build nested list structure from flat list items"""
# Group list items
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
if not list_items:
return elements
# Sort by position (top to bottom)
list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0))
# Detect indentation levels based on x position
x_positions = [item.bbox.x0 for item in list_items]
if not x_positions:
return elements
min_x = min(x_positions)
indent_unit = 20 # Typical indent size in points
# Assign nesting levels
for item in list_items:
indent = item.bbox.x0 - min_x
level = int(indent / indent_unit)
item.metadata['list_level'] = level
# Build parent-child relationships
item_stack = [] # Stack of (element, level)
for item in list_items:
level = item.metadata.get('list_level', 0)
# Pop items at same or deeper level
while item_stack and item_stack[-1][1] >= level:
item_stack.pop()
# Set parent
if item_stack:
parent = item_stack[-1][0]
item.metadata['parent_item'] = parent.element_id
if 'children' not in parent.metadata:
parent.metadata['children'] = []
parent.metadata['children'].append(item.element_id)
# Also add to actual children list
parent.children.append(item)
item_stack.append((item, level))
return elements
def _process_text_block(self, block: Dict, page_num: int, counter: int,
table_bboxes: List[BoundingBox] = None) -> Optional[DocumentElement]:
"""
Process a text block into a DocumentElement.
Args:
block: Text block from PyMuPDF
page_num: Page number
counter: Element counter
table_bboxes: List of table bounding boxes to filter overlapping lines
Returns:
DocumentElement or None if all lines overlap with tables
"""
if table_bboxes is None:
table_bboxes = []
# Extract text content and span information
# Filter out lines that significantly overlap with table regions
text_parts = []
styles = []
span_children = [] # Store span-level children for inline styling
span_counter = 0
valid_line_bboxes = [] # Track bboxes of valid lines for overall bbox calculation
for line in block.get("lines", []):
line_bbox_data = line.get("bbox", [0, 0, 0, 0])
# Check if this line overlaps with any table region
line_overlaps_table = False
for table_bbox in table_bboxes:
overlap_x0 = max(line_bbox_data[0], table_bbox.x0)
overlap_y0 = max(line_bbox_data[1], table_bbox.y0)
overlap_x1 = min(line_bbox_data[2], table_bbox.x1)
overlap_y1 = min(line_bbox_data[3], table_bbox.y1)
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
# Calculate overlap ratio
line_height = line_bbox_data[3] - line_bbox_data[1]
overlap_height = overlap_y1 - overlap_y0
if line_height > 0:
overlap_ratio = overlap_height / line_height
if overlap_ratio >= 0.5: # Line significantly overlaps with table
line_overlaps_table = True
break
if line_overlaps_table:
continue # Skip this line
# Process valid line
valid_line_bboxes.append(line_bbox_data)
for span in line.get("spans", []):
text = span.get("text", "")
if text:
text_parts.append(text)
# Extract style information
style = StyleInfo(
font_name=span.get("font"),
font_size=span.get("size"),
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
text_color=span.get("color")
)
styles.append(style)
# Create span child element for inline styling
span_bbox_data = span.get("bbox", [0, 0, 0, 0])
span_bbox = BoundingBox(
x0=span_bbox_data[0],
y0=span_bbox_data[1],
x1=span_bbox_data[2],
y1=span_bbox_data[3]
)
span_element = DocumentElement(
element_id=f"span_{page_num}_{counter}_{span_counter}",
type=ElementType.TEXT, # Spans are always text
content=text,
bbox=span_bbox,
style=style,
confidence=1.0,
metadata={"span_index": span_counter}
)
span_children.append(span_element)
span_counter += 1
if not text_parts:
return None # All lines overlapped with tables
full_text = "".join(text_parts)
# Calculate bbox from valid lines only
if valid_line_bboxes:
min_x0 = min(b[0] for b in valid_line_bboxes)
min_y0 = min(b[1] for b in valid_line_bboxes)
max_x1 = max(b[2] for b in valid_line_bboxes)
max_y1 = max(b[3] for b in valid_line_bboxes)
bbox = BoundingBox(x0=min_x0, y0=min_y0, x1=max_x1, y1=max_y1)
else:
# Fallback to original bbox if no valid lines found
bbox_data = block.get("bbox", [0, 0, 0, 0])
bbox = BoundingBox(x0=bbox_data[0], y0=bbox_data[1], x1=bbox_data[2], y1=bbox_data[3])
# Determine element type based on content and style
element_type = self._infer_element_type(full_text, styles)
# Use the most common style for the block
if styles:
block_style = styles[0] # Could be improved with style merging
else:
block_style = None
return DocumentElement(
element_id=f"text_{page_num}_{counter}",
type=element_type,
content=full_text,
bbox=bbox,
style=block_style,
confidence=1.0, # Direct extraction has perfect confidence
children=span_children # Store span children for inline styling
)
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
"""Infer element type based on text content and styling"""
text_lower = text.lower().strip()
# Check for common patterns
if len(text_lower) < 100 and styles:
# Short text with large font might be title/header
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
if avg_size > 16:
return ElementType.TITLE
elif avg_size > 14:
return ElementType.HEADER
# Check for list patterns
if re.match(r'^[\d•·▪▫◦‣]\s', text_lower):
return ElementType.LIST_ITEM
# Check for page numbers
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
return ElementType.PAGE_NUMBER
# Check for footnote patterns
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
return ElementType.FOOTNOTE
# Default to paragraph for longer text, text for shorter
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
def _is_likely_chart(self, data: list, table) -> bool:
"""
Detect if a "table" detected by find_tables() is actually a chart/graph.
Charts often get misclassified as tables because they have grid lines.
Characteristics of a chart misclassified as table:
1. High percentage of empty cells (>60%)
2. Content patterns that look like axis labels (numbers, units like °C, %, etc.)
3. Single cell contains multi-line text with chart-like patterns
4. Cell content contains typical chart axis patterns
Args:
data: Extracted table data (list of lists)
table: PyMuPDF table object
Returns:
True if the table is likely a chart
"""
if not data:
return False
# Count total cells and empty cells
total_cells = 0
empty_cells = 0
multi_line_cells = 0
axis_pattern_cells = 0
# Patterns that suggest chart axis labels
import re
axis_patterns = [
r'^-?\d+$', # Simple numbers (axis ticks)
r'^-?\d+\.?\d*$', # Decimal numbers
r'°[CF]', # Temperature units
r'%$', # Percentage
r'\bppm\b', # Parts per million
r'\bmin\b', # Minutes
r'\bsec\b', # Seconds
r'\bTime\b', # Time axis label
r'\bTemperature\b', # Temperature axis label
r'[Aa]xis', # Axis label
]
for row in data:
for cell in row:
total_cells += 1
cell_text = str(cell).strip() if cell else ""
if not cell_text:
empty_cells += 1
else:
# Check for multi-line content
if '\n' in cell_text:
multi_line_cells += 1
# Check for axis patterns
for pattern in axis_patterns:
if re.search(pattern, cell_text, re.IGNORECASE):
axis_pattern_cells += 1
break
# Calculate metrics
empty_ratio = empty_cells / total_cells if total_cells > 0 else 0
non_empty_cells = total_cells - empty_cells
# Count cells with meaningful table content (units, CJK, technical terms)
table_content_cells = 0
table_content_patterns = [
r'[一-龥ぁ-んァ-ン]', # CJK characters (Chinese, Japanese)
r'\b(Wt%|MPa|GPa|W/mK|ppm|cps|rpm)\b', # Technical units
r'\b(RT|TMA|DMA)\b', # Technical abbreviations
r'±', # Plus-minus symbol (common in specs)
r'\d+\s*[x×]\s*\d+', # Dimensions like "10x10"
]
for row in data:
for cell in row:
cell_text = str(cell).strip() if cell else ""
if cell_text:
for pattern in table_content_patterns:
if re.search(pattern, cell_text):
table_content_cells += 1
break
# Decision criteria for chart detection:
# Tables with technical/CJK content are likely real tables, not charts
if table_content_cells >= 5:
logger.debug(f"Table detection: {table_content_cells} cells with table-like content")
return False
# If table has many rows with data, it's likely a real table
rows_with_content = sum(1 for row in data if any(str(cell).strip() for cell in row if cell))
if rows_with_content >= 5 and non_empty_cells >= 10:
logger.debug(f"Table detection: {rows_with_content} rows with content, {non_empty_cells} non-empty cells")
return False
# 1. Extremely high empty cell ratio (>90%) suggests it's a chart grid
if empty_ratio > 0.9:
logger.debug(f"Chart detection: very high empty ratio {empty_ratio:.2f} (>90%)")
return True
# 2. High empty ratio + many axis patterns suggests chart
if empty_ratio > 0.7 and axis_pattern_cells >= 5:
logger.debug(f"Chart detection: empty ratio {empty_ratio:.2f} + {axis_pattern_cells} axis patterns")
return True
# 3. Multi-line cell with axis patterns in first cell (often chart legend text)
if multi_line_cells >= 1 and axis_pattern_cells >= 3:
first_cell = str(data[0][0]).strip() if data and data[0] else ""
if '\n' in first_cell and len(first_cell.split('\n')) >= 5:
logger.debug(f"Chart detection: first cell has {len(first_cell.split(chr(10)))} lines with axis patterns")
return True
return False
def _process_native_table(self, table, fitz_page, page_num: int, counter: int) -> Optional[DocumentElement]:
"""
Process a natively detected table with proper merged cell handling.
Uses PyMuPDF's table.rows to detect cell spans:
- table.rows provides per-row cell info where None indicates merged positions
- We calculate row_span/col_span by counting consecutive None values
"""
try:
# Extract table data (text content)
data = table.extract()
if not data or len(data) < self.min_table_rows:
return None
# Check if this "table" is actually a chart (misclassified by find_tables)
if self._is_likely_chart(data, table):
logger.info(f"Skipping table_{page_num}_{counter} - detected as chart (not table)")
return None
# Get table bounding box
bbox_data = table.bbox
bbox = BoundingBox(
x0=bbox_data[0],
y0=bbox_data[1],
x1=bbox_data[2],
y1=bbox_data[3]
)
# Get table dimensions
num_rows = table.row_count
num_cols = table.col_count
if num_cols < self.min_table_cols:
return None
# Build cell grid from table.rows
# Each row has .cells which is a list of bbox tuples or None for merged cells
table_rows = getattr(table, 'rows', None)
# Create a 2D grid to store cell bboxes (None = merged/covered)
cell_grid = [[None for _ in range(num_cols)] for _ in range(num_rows)]
if table_rows:
for row_idx, row in enumerate(table_rows):
row_cells = row.cells if hasattr(row, 'cells') else []
for col_idx, cell_bbox in enumerate(row_cells):
if col_idx < num_cols:
cell_grid[row_idx][col_idx] = cell_bbox
# Create a 2D grid to track which cells are covered by merges
# covered[row][col] = (owner_row, owner_col) if covered, None if actual cell
covered = [[None for _ in range(num_cols)] for _ in range(num_rows)]
# Calculate spans for each cell by analyzing None patterns
cell_spans = {} # (row, col) -> (row_span, col_span, cell_bbox)
for row_idx in range(num_rows):
for col_idx in range(num_cols):
cell_bbox = cell_grid[row_idx][col_idx]
if cell_bbox is None:
# This position is covered by a merged cell - skip
continue
if covered[row_idx][col_idx] is not None:
# Already marked as covered
continue
# This is an actual cell - calculate its span
# Find col_span: count consecutive None values to the right
col_span = 1
for c in range(col_idx + 1, num_cols):
if cell_grid[row_idx][c] is None and covered[row_idx][c] is None:
col_span += 1
else:
break
# Find row_span: count consecutive None values below
# (checking the same column range as col_span)
row_span = 1
for r in range(row_idx + 1, num_rows):
# Check if all cells in this row's span range are None
all_none = True
for c in range(col_idx, col_idx + col_span):
if c < num_cols:
if cell_grid[r][c] is not None or covered[r][c] is not None:
all_none = False
break
if all_none:
row_span += 1
else:
break
# Store the span info
cell_spans[(row_idx, col_idx)] = (
row_span,
col_span,
BoundingBox(x0=cell_bbox[0], y0=cell_bbox[1],
x1=cell_bbox[2], y1=cell_bbox[3])
)
# Mark covered positions
for dr in range(row_span):
for dc in range(col_span):
if dr == 0 and dc == 0:
continue
cr, cc = row_idx + dr, col_idx + dc
if cr < num_rows and cc < num_cols:
covered[cr][cc] = (row_idx, col_idx)
# Extract column widths and row heights from actual cell rectangles
column_widths = []
row_heights = []
# Collect unique X and Y boundaries from non-None cells
x_boundaries = set()
y_boundaries = set()
for row_idx in range(num_rows):
for col_idx in range(num_cols):
cell = cell_grid[row_idx][col_idx]
if cell is not None:
x_boundaries.add(round(cell[0], 1)) # x0
x_boundaries.add(round(cell[2], 1)) # x1
y_boundaries.add(round(cell[1], 1)) # y0
y_boundaries.add(round(cell[3], 1)) # y1
sorted_x = sorted(x_boundaries)
sorted_y = sorted(y_boundaries)
if len(sorted_x) >= 2:
column_widths = [sorted_x[i+1] - sorted_x[i] for i in range(len(sorted_x)-1)]
if len(sorted_y) >= 2:
row_heights = [sorted_y[i+1] - sorted_y[i] for i in range(len(sorted_y)-1)]
# Create table cells with proper span information
cells = []
for row_idx in range(num_rows):
row_data = data[row_idx] if row_idx < len(data) else []
for col_idx in range(num_cols):
# Skip cells that are covered by a merged cell
if covered[row_idx][col_idx] is not None:
continue
# Skip if not an actual cell
if cell_grid[row_idx][col_idx] is None:
continue
# Get cell content
cell_text = row_data[col_idx] if col_idx < len(row_data) else ""
# Get span info
row_span, col_span, cell_bbox = cell_spans.get(
(row_idx, col_idx),
(1, 1, None)
)
cells.append(TableCell(
row=row_idx,
col=col_idx,
row_span=row_span,
col_span=col_span,
content=str(cell_text) if cell_text else "",
bbox=cell_bbox
))
# Try to detect visual column and row boundaries from page drawings
# This is more accurate than PyMuPDF's column detection for complex tables
visual_boundaries = self._detect_visual_column_boundaries(
fitz_page, bbox_data, column_widths
)
# Use table.cells (flat list of bboxes) for more accurate row detection
raw_table_cells = getattr(table, 'cells', None)
row_boundaries = self._detect_visual_row_boundaries(
fitz_page, bbox_data, raw_table_cells
)
if visual_boundaries:
# Remap cells to visual columns and rows
cells, column_widths, num_cols, num_rows = self._remap_cells_to_visual_columns(
cells, column_widths, num_rows, num_cols, visual_boundaries, row_boundaries
)
else:
# Fallback to narrow column merging (doesn't modify rows)
cells, column_widths, num_cols = self._merge_narrow_columns(
cells, column_widths, num_rows, num_cols,
min_column_width=10.0
)
# Create table data
table_data = TableData(
rows=num_rows,
cols=num_cols,
cells=cells,
headers=data[0] if data else None
)
# Store metadata
metadata = {}
if column_widths:
metadata["column_widths"] = column_widths
if row_heights:
metadata["row_heights"] = row_heights
# Add merge statistics for debugging
merged_cells_count = sum(1 for c in cells if c.row_span > 1 or c.col_span > 1)
if merged_cells_count > 0:
metadata["merged_cell_count"] = merged_cells_count
logger.info(f"Table {page_num}_{counter}: {len(cells)} cells (grid: {num_rows}x{num_cols}), {merged_cells_count} merged")
metadata = metadata if metadata else None
return DocumentElement(
element_id=f"table_{page_num}_{counter}",
type=ElementType.TABLE,
content=table_data,
bbox=bbox,
confidence=1.0,
metadata=metadata
)
except Exception as e:
logger.error(f"Error processing native table: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _merge_narrow_columns(
self,
cells: List[TableCell],
column_widths: List[float],
num_rows: int,
num_cols: int,
min_column_width: float = 10.0
) -> Tuple[List[TableCell], List[float], int]:
"""
Merge narrow empty columns (border artifacts) with adjacent content columns.
PyMuPDF sometimes detects table border lines as separate columns,
resulting in many ~5pt wide columns. This method:
1. Identifies which columns have actual content
2. Uses EMPTY narrow columns as separators between logical column groups
3. Merges each group (narrow cols with content + wide col) into one logical column
Args:
cells: List of TableCell objects
column_widths: List of column widths
num_rows: Number of rows
num_cols: Number of columns
min_column_width: Minimum width to consider as real column (default 10pt)
Returns:
Tuple of (merged_cells, merged_widths, new_num_cols)
"""
if not column_widths or len(column_widths) != num_cols:
return cells, column_widths, num_cols
# Count narrow columns
narrow_count = sum(1 for w in column_widths if w < min_column_width)
if narrow_count == 0:
return cells, column_widths, num_cols
# Determine which columns have content
cols_with_content = set()
for cell in cells:
if cell.content and cell.content.strip():
# Mark all columns this cell spans
for c in range(cell.col, cell.col + cell.col_span):
if c < num_cols:
cols_with_content.add(c)
logger.info(f"Columns with content: {sorted(cols_with_content)}")
# Identify column groups separated by EMPTY narrow columns
# Strategy: empty narrow columns act as separators
col_groups = [] # List of lists, each inner list is columns in a group
current_group = []
for col_idx in range(num_cols):
width = column_widths[col_idx]
is_narrow = width < min_column_width
has_content = col_idx in cols_with_content
if is_narrow and not has_content:
# Empty narrow column = separator
if current_group:
col_groups.append(current_group)
current_group = []
else:
# Content column or narrow column with content
current_group.append(col_idx)
# Don't forget the last group
if current_group:
col_groups.append(current_group)
logger.info(f"Column groups: {col_groups}")
if len(col_groups) == num_cols:
# No grouping possible
return cells, column_widths, num_cols
# Build column mapping: old_col -> new_col
col_mapping = {}
new_widths = []
for new_col_idx, group in enumerate(col_groups):
group_width = sum(column_widths[c] for c in group)
# Add width of separators between previous group and this one
if new_col_idx > 0 and group:
prev_group = col_groups[new_col_idx - 1]
if prev_group:
# Add separator widths
for c in range(prev_group[-1] + 1, group[0]):
group_width += column_widths[c]
new_widths.append(group_width)
for old_col in group:
col_mapping[old_col] = new_col_idx
new_num_cols = len(col_groups)
logger.info(f"Column reduction: {num_cols} -> {new_num_cols}")
logger.debug(f"Column mapping: {col_mapping}")
# Remap cells to new column indices
# Group cells by (row, new_col) to handle merging
cell_map = {} # (row, new_col) -> list of cells
for cell in cells:
new_col = col_mapping.get(cell.col)
if new_col is None:
# Column was a separator - skip this cell
continue
key = (cell.row, new_col)
if key not in cell_map:
cell_map[key] = []
cell_map[key].append(cell)
# Create merged cells
merged_cells = []
processed = set()
for (row, new_col), cell_list in sorted(cell_map.items()):
if (row, new_col) in processed:
continue
# Sort cells by original column to maintain left-to-right order
cell_list.sort(key=lambda c: c.col)
# Collect all non-empty content from cells in this position
contents = []
for c in cell_list:
if c.content and c.content.strip():
contents.append(c.content.strip())
# Join contents with newline (for multi-column merged data)
merged_content = '\n'.join(contents) if contents else ''
# Use the first cell with content for span calculation
content_cell = None
for c in cell_list:
if c.content and c.content.strip():
content_cell = c
break
if content_cell is None:
content_cell = cell_list[0]
# Calculate new col_span by mapping old span to new columns
old_col_start = content_cell.col
old_col_end = old_col_start + content_cell.col_span - 1
new_col_start = col_mapping.get(old_col_start, new_col)
new_col_end = col_mapping.get(old_col_end, new_col_start)
new_col_span = max(1, new_col_end - new_col_start + 1)
# Merge bbox from all cells in this position
bbox = content_cell.bbox
for c in cell_list:
if c.bbox and bbox:
bbox = BoundingBox(
x0=min(bbox.x0, c.bbox.x0),
y0=min(bbox.y0, c.bbox.y0),
x1=max(bbox.x1, c.bbox.x1),
y1=max(bbox.y1, c.bbox.y1)
)
elif c.bbox:
bbox = c.bbox
merged_cells.append(TableCell(
row=row,
col=new_col,
row_span=content_cell.row_span,
col_span=new_col_span,
content=merged_content,
bbox=bbox
))
processed.add((row, new_col))
logger.info(f"Cell count: {len(cells)} -> {len(merged_cells)}")
return merged_cells, new_widths, new_num_cols
def _detect_visual_column_boundaries(
self,
page: fitz.Page,
table_bbox: Tuple[float, float, float, float],
pymupdf_widths: List[float]
) -> Optional[List[float]]:
"""
Detect actual column boundaries from page drawings (rectangles).
For tables with complex merged cells, PyMuPDF's column detection often
creates too many columns. This method analyzes the visual rectangles
(cell backgrounds) to find the MAIN column boundaries by frequency analysis.
Strategy:
1. Collect all cell rectangles from drawings
2. Count how frequently each x boundary appears (rounded to 5pt)
3. Keep only boundaries that appear frequently (>= threshold)
4. These are the main column boundaries that span most rows
Args:
page: PyMuPDF page object
table_bbox: Table bounding box (x0, y0, x1, y1)
pymupdf_widths: Column widths from PyMuPDF detection
Returns:
List of column boundary x-coordinates, or None if detection fails
"""
try:
from collections import Counter
# Collect cell rectangles from page drawings
cell_rects = []
drawings = page.get_drawings()
for d in drawings:
if d.get('items'):
for item in d['items']:
if item[0] == 're': # Rectangle
rect = item[1]
# Filter: within table bounds, large enough to be a cell
if (rect.x0 >= table_bbox[0] - 5 and
rect.x1 <= table_bbox[2] + 5 and
rect.y0 >= table_bbox[1] - 5 and
rect.y1 <= table_bbox[3] + 5):
width = rect.x1 - rect.x0
height = rect.y1 - rect.y0
if width > 30 and height > 15:
cell_rects.append(rect)
if len(cell_rects) < 4:
# Not enough cell rectangles detected
logger.debug(f"Only {len(cell_rects)} cell rectangles found, skipping visual detection")
return None
logger.debug(f"Found {len(cell_rects)} cell rectangles for visual column detection")
# Count frequency of each boundary (rounded to 5pt)
boundary_counts = Counter()
for r in cell_rects:
boundary_counts[round(r.x0 / 5) * 5] += 1
boundary_counts[round(r.x1 / 5) * 5] += 1
# Keep only boundaries that appear frequently
# Use 8% threshold to catch internal column boundaries (like nested sub-columns)
min_frequency = max(3, len(cell_rects) * 0.08)
frequent_boundaries = sorted([
x for x, count in boundary_counts.items()
if count >= min_frequency
])
# Always include table edges
table_left = round(table_bbox[0] / 5) * 5
table_right = round(table_bbox[2] / 5) * 5
if not frequent_boundaries or frequent_boundaries[0] > table_left + 10:
frequent_boundaries.insert(0, table_left)
if not frequent_boundaries or frequent_boundaries[-1] < table_right - 10:
frequent_boundaries.append(table_right)
logger.debug(f"Frequent boundaries (min_freq={min_frequency:.0f}): {frequent_boundaries}")
if len(frequent_boundaries) < 3:
# Need at least 3 boundaries for 2 columns
return None
# Merge close boundaries (within 10pt) - take the one with higher frequency
def merge_close_by_frequency(boundaries, counts, threshold=10):
if not boundaries:
return []
result = [boundaries[0]]
for b in boundaries[1:]:
if b - result[-1] <= threshold:
# Keep the one with higher frequency
if counts[b] > counts[result[-1]]:
result[-1] = b
else:
result.append(b)
return result
merged_boundaries = merge_close_by_frequency(
frequent_boundaries, boundary_counts, threshold=10
)
if len(merged_boundaries) < 3:
return None
# Calculate column widths
widths = [merged_boundaries[i+1] - merged_boundaries[i]
for i in range(len(merged_boundaries)-1)]
logger.info(f"Visual column detection: {len(widths)} columns")
logger.info(f" Boundaries: {merged_boundaries}")
logger.info(f" Widths: {[round(w) for w in widths]}")
return merged_boundaries
except Exception as e:
logger.warning(f"Visual column detection failed: {e}")
import traceback
logger.debug(traceback.format_exc())
return None
def _detect_visual_row_boundaries(
self,
page: fitz.Page,
table_bbox: Tuple[float, float, float, float],
table_cells: Optional[List] = None
) -> Optional[List[float]]:
"""
Detect actual row boundaries from table cell bboxes.
Uses cell bboxes from PyMuPDF table detection for more accurate
row boundary detection than page drawings.
Args:
page: PyMuPDF page object
table_bbox: Table bounding box (x0, y0, x1, y1)
table_cells: List of cell bboxes from table.cells (preferred)
Returns:
List of row boundary y-coordinates, or None if detection fails
"""
try:
from collections import Counter
boundary_counts = Counter()
cell_count = 0
if table_cells:
# Use table cells directly (more accurate for row detection)
for cell_bbox in table_cells:
if cell_bbox:
y0 = round(cell_bbox[1] / 5) * 5
y1 = round(cell_bbox[3] / 5) * 5
boundary_counts[y0] += 1
boundary_counts[y1] += 1
cell_count += 1
else:
# Fallback to page drawings
drawings = page.get_drawings()
for d in drawings:
if d.get('items'):
for item in d['items']:
if item[0] == 're':
rect = item[1]
if (rect.x0 >= table_bbox[0] - 5 and
rect.x1 <= table_bbox[2] + 5 and
rect.y0 >= table_bbox[1] - 5 and
rect.y1 <= table_bbox[3] + 5):
width = rect.x1 - rect.x0
height = rect.y1 - rect.y0
if width > 30 and height > 15:
y0 = round(rect.y0 / 5) * 5
y1 = round(rect.y1 / 5) * 5
boundary_counts[y0] += 1
boundary_counts[y1] += 1
cell_count += 1
if cell_count < 4:
logger.debug(f"Only {cell_count} cells found, skipping visual row detection")
return None
# Keep only boundaries that appear frequently
# Use 8% threshold similar to column detection
min_frequency = max(3, cell_count * 0.08)
frequent_boundaries = sorted([
y for y, count in boundary_counts.items()
if count >= min_frequency
])
# Always include table edges
table_top = round(table_bbox[1] / 5) * 5
table_bottom = round(table_bbox[3] / 5) * 5
if not frequent_boundaries or frequent_boundaries[0] > table_top + 10:
frequent_boundaries.insert(0, table_top)
if not frequent_boundaries or frequent_boundaries[-1] < table_bottom - 10:
frequent_boundaries.append(table_bottom)
logger.debug(f"Frequent Y boundaries (min_freq={min_frequency:.0f}): {frequent_boundaries}")
if len(frequent_boundaries) < 3:
# Need at least 3 boundaries for 2 rows
return None
# Merge close boundaries (within 10pt) - take the one with higher frequency
def merge_close_by_frequency(boundaries, counts, threshold=10):
if not boundaries:
return []
result = [boundaries[0]]
for b in boundaries[1:]:
if b - result[-1] <= threshold:
# Keep the one with higher frequency
if counts[b] > counts[result[-1]]:
result[-1] = b
else:
result.append(b)
return result
merged_boundaries = merge_close_by_frequency(
frequent_boundaries, boundary_counts, threshold=10
)
if len(merged_boundaries) < 3:
return None
# Calculate row heights
heights = [merged_boundaries[i+1] - merged_boundaries[i]
for i in range(len(merged_boundaries)-1)]
logger.info(f"Visual row detection: {len(heights)} rows")
logger.info(f" Y Boundaries: {merged_boundaries}")
logger.info(f" Heights: {[round(h) for h in heights]}")
return merged_boundaries
except Exception as e:
logger.warning(f"Visual row detection failed: {e}")
import traceback
logger.debug(traceback.format_exc())
return None
def _remap_cells_to_visual_columns(
self,
cells: List[TableCell],
column_widths: List[float],
num_rows: int,
num_cols: int,
visual_boundaries: List[float],
row_boundaries: Optional[List[float]] = None
) -> Tuple[List[TableCell], List[float], int, int]:
"""
Remap cells from PyMuPDF columns to visual columns based on cell bbox.
Args:
cells: List of TableCell objects from PyMuPDF
column_widths: Original column widths from PyMuPDF
num_rows: Number of rows
num_cols: Original number of columns
visual_boundaries: Column boundaries from visual detection
row_boundaries: Row boundaries from visual detection (optional)
Returns:
Tuple of (remapped_cells, new_widths, new_num_cols, new_num_rows)
"""
try:
new_num_cols = len(visual_boundaries) - 1
new_widths = [visual_boundaries[i+1] - visual_boundaries[i]
for i in range(new_num_cols)]
new_num_rows = len(row_boundaries) - 1 if row_boundaries else num_rows
logger.info(f"Remapping {len(cells)} cells from {num_cols} to {new_num_cols} visual columns")
if row_boundaries:
logger.info(f"Using {new_num_rows} visual rows for row_span calculation")
# Map each cell to visual column and row based on its bbox
# This ensures spanning cells are placed at their correct position
cell_map = {} # (visual_row, start_col) -> list of cells
for cell in cells:
if not cell.bbox:
continue
# Find start column based on left edge of cell
cell_x0 = cell.bbox.x0
start_col = 0
# First check if cell_x0 is very close to any boundary (within 5pt)
# If so, it belongs to the column that starts at that boundary
snapped = False
for i in range(1, len(visual_boundaries)): # Skip first (left edge)
if abs(cell_x0 - visual_boundaries[i]) <= 5:
start_col = min(i, new_num_cols - 1)
snapped = True
break
# If not snapped to boundary, use standard containment check
if not snapped:
for i in range(new_num_cols):
if visual_boundaries[i] <= cell_x0 < visual_boundaries[i+1]:
start_col = i
break
elif cell_x0 >= visual_boundaries[-1]:
start_col = new_num_cols - 1
# Find visual row based on top edge of cell
visual_row = cell.row # Default to original row
if row_boundaries:
cell_y0 = cell.bbox.y0
for i in range(new_num_rows):
if row_boundaries[i] <= cell_y0 + 5 < row_boundaries[i+1]:
visual_row = i
break
elif cell_y0 >= row_boundaries[-1] - 5:
visual_row = new_num_rows - 1
key = (visual_row, start_col)
if key not in cell_map:
cell_map[key] = []
cell_map[key].append(cell)
# Create remapped cells
remapped_cells = []
processed = set()
for (visual_row, start_col), cell_list in sorted(cell_map.items()):
if (visual_row, start_col) in processed:
continue
# Sort by original column
cell_list.sort(key=lambda c: c.col)
# Merge content from all cells at this position
contents = []
for c in cell_list:
if c.content and c.content.strip():
contents.append(c.content.strip())
merged_content = '\n'.join(contents) if contents else ''
# Use the cell with tallest bbox for row span calculation
# (handles case where multiple cells merge into one)
tallest_cell = max(cell_list, key=lambda c: (c.bbox.y1 - c.bbox.y0) if c.bbox else 0)
widest_cell = max(cell_list, key=lambda c: (c.bbox.x1 - c.bbox.x0) if c.bbox else 0)
# Calculate col_span based on right edge of widest cell
col_span = 1
if widest_cell.bbox:
cell_x1 = widest_cell.bbox.x1
end_col = start_col
for i in range(start_col, new_num_cols):
if cell_x1 > visual_boundaries[i] + 5: # 5pt tolerance
end_col = i
col_span = max(1, end_col - start_col + 1)
# Calculate row_span based on visual row boundaries
row_span = 1
if row_boundaries and tallest_cell.bbox:
cell_y1 = tallest_cell.bbox.y1
# Find end row based on bottom edge of tallest cell
end_row = visual_row
for i in range(visual_row, new_num_rows):
if cell_y1 > row_boundaries[i] + 5: # 5pt tolerance
end_row = i
row_span = max(1, end_row - visual_row + 1)
# Merge bbox from all cells
merged_bbox = tallest_cell.bbox
for c in cell_list:
if c.bbox and merged_bbox:
merged_bbox = BoundingBox(
x0=min(merged_bbox.x0, c.bbox.x0),
y0=min(merged_bbox.y0, c.bbox.y0),
x1=max(merged_bbox.x1, c.bbox.x1),
y1=max(merged_bbox.y1, c.bbox.y1)
)
elif c.bbox:
merged_bbox = c.bbox
remapped_cells.append(TableCell(
row=visual_row,
col=start_col,
row_span=row_span,
col_span=col_span,
content=merged_content,
bbox=merged_bbox
))
processed.add((visual_row, start_col))
# Filter out cells that are covered by spans from other cells
# Build a set of positions covered by spans
covered_positions = set()
for cell in remapped_cells:
if cell.col_span > 1 or cell.row_span > 1:
for r in range(cell.row, cell.row + cell.row_span):
for c in range(cell.col, cell.col + cell.col_span):
if (r, c) != (cell.row, cell.col): # Don't cover the origin
covered_positions.add((r, c))
# Remove covered cells
final_cells = [
cell for cell in remapped_cells
if (cell.row, cell.col) not in covered_positions
]
logger.info(f"Remapped to {len(final_cells)} cells in {new_num_cols} columns x {new_num_rows} rows (filtered {len(remapped_cells) - len(final_cells)} covered cells)")
return final_cells, new_widths, new_num_cols, new_num_rows
except Exception as e:
logger.error(f"Cell remapping failed: {e}")
# Fallback to original
return cells, column_widths, num_cols, num_rows
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
"""Detect tables by analyzing text positioning"""
tables = []
# Get all words with positions
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
if not words:
return tables
# Group words by approximate row (y-coordinate)
rows = {}
for word in words:
y = round(word[1] / 5) * 5 # Round to nearest 5 points
if y not in rows:
rows[y] = []
rows[y].append({
'x0': word[0],
'y0': word[1],
'x1': word[2],
'y1': word[3],
'text': word[4],
'block': word[5] if len(word) > 5 else 0
})
# Sort rows by y-coordinate
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
# Find potential tables (consecutive rows with multiple columns)
current_table_rows = []
tables_found = []
for y, words_in_row in sorted_rows:
words_in_row.sort(key=lambda w: w['x0'])
if len(words_in_row) >= self.min_table_cols:
# Check if this could be a table row
x_positions = [w['x0'] for w in words_in_row]
# Check for somewhat regular spacing
if self._has_regular_spacing(x_positions):
current_table_rows.append((y, words_in_row))
else:
# End current table if exists
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
current_table_rows = []
else:
# End current table if exists
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
current_table_rows = []
# Don't forget the last table
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
# Convert detected tables to DocumentElements
for table_idx, table_rows in enumerate(tables_found):
if not table_rows:
continue
# Calculate table bounding box
all_words = []
for _, words in table_rows:
all_words.extend(words)
min_x = min(w['x0'] for w in all_words)
min_y = min(w['y0'] for w in all_words)
max_x = max(w['x1'] for w in all_words)
max_y = max(w['y1'] for w in all_words)
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
# Create table cells
cells = []
for row_idx, (y, words) in enumerate(table_rows):
# Group words into columns
columns = self._group_into_columns(words, table_rows)
for col_idx, col_text in enumerate(columns):
if col_text:
cells.append(TableCell(
row=row_idx,
col=col_idx,
content=col_text
))
# Create table data
table_data = TableData(
rows=len(table_rows),
cols=max(len(self._group_into_columns(words, table_rows))
for _, words in table_rows),
cells=cells
)
element = DocumentElement(
element_id=f"table_{page_num}_{counter + table_idx}",
type=ElementType.TABLE,
content=table_data,
bbox=bbox,
confidence=0.8, # Lower confidence for positional detection
metadata={"detection_method": "positional"}
)
tables.append(element)
return tables
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
"""Check if x positions have somewhat regular spacing"""
if len(x_positions) < 3:
return False
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
avg_spacing = sum(spacings) / len(spacings)
# Check if spacings are within tolerance of average
for spacing in spacings:
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
return False
return True
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
"""Group words into columns based on x-position"""
if not words:
return []
# Find common column positions across all rows
all_x_positions = []
for _, row_words in all_rows:
all_x_positions.extend([w['x0'] for w in row_words])
# Cluster x-positions to find columns
column_positions = self._cluster_positions(all_x_positions)
# Assign words to columns
columns = [""] * len(column_positions)
for word in words:
# Find closest column
closest_col = 0
min_dist = float('inf')
for col_idx, col_x in enumerate(column_positions):
dist = abs(word['x0'] - col_x)
if dist < min_dist:
min_dist = dist
closest_col = col_idx
if columns[closest_col]:
columns[closest_col] += " " + word['text']
else:
columns[closest_col] = word['text']
return columns
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
"""Cluster positions to find common columns"""
if not positions:
return []
sorted_pos = sorted(positions)
clusters = [[sorted_pos[0]]]
for pos in sorted_pos[1:]:
# Check if position belongs to current cluster
if pos - clusters[-1][-1] < threshold:
clusters[-1].append(pos)
else:
clusters.append([pos])
# Return average position of each cluster
return [sum(cluster) / len(cluster) for cluster in clusters]
def _extract_images(self,
page: fitz.Page,
page_num: int,
document_id: str,
counter: int,
output_dir: Optional[Path],
covering_images: Optional[List[Dict]] = None,
covered_bboxes: Optional[List[fitz.Rect]] = None) -> List[DocumentElement]:
"""
Extract images from page, filtering out tiny decoration images and covering images.
Filtering applied:
1. Images smaller than min_image_area (default 200 px²) are filtered as decorations
2. Images that match detected covering images (redaction rectangles) are filtered
3. Dark images that overlap significantly with covering vector rectangles are filtered
"""
elements = []
image_list = page.get_images()
filtered_tiny = 0
filtered_covering = 0
covering_images = covering_images or []
covered_bboxes = covered_bboxes or []
# Build covering image xrefs for quick lookup
covering_xrefs = set()
for cov in covering_images:
if 'xref' in cov:
covering_xrefs.add(cov['xref'])
for img_idx, img in enumerate(image_list):
try:
xref = img[0]
# Filter out covering images (redaction rectangles)
if xref in covering_xrefs:
filtered_covering += 1
logger.debug(f"Filtering covering image {img_idx} (xref={xref})")
continue
# Get image position(s)
img_rects = page.get_image_rects(xref)
if not img_rects:
continue
rect = img_rects[0] # Use first occurrence
# Calculate image area and filter tiny decoration images
image_area = (rect.x1 - rect.x0) * (rect.y1 - rect.y0)
if self.min_image_area > 0 and image_area < self.min_image_area:
filtered_tiny += 1
logger.debug(
f"Filtering tiny image {img_idx}: area={image_area:.1f} px² "
f"< threshold={self.min_image_area} px²"
)
continue
# Check for IoU overlap with covering images (for cases without matching xref)
is_covering = False
if covering_images:
for cov in covering_images:
cov_bbox = cov.get('bbox', [])
if len(cov_bbox) >= 4:
iou = self._calculate_iou(
[rect.x0, rect.y0, rect.x1, rect.y1],
cov_bbox
)
if iou > 0.8: # High overlap indicates same image
is_covering = True
filtered_covering += 1
logger.debug(
f"Filtering covering image {img_idx} by IoU={iou:.2f}"
)
break
if is_covering:
continue
# Check if dark image overlaps with covering vector rectangles
# This catches cases where dark images are placed on top of covering rectangles
if covered_bboxes:
img_rect = fitz.Rect(rect)
for cov_rect in covered_bboxes:
intersection = img_rect & cov_rect
if not intersection.is_empty:
img_area = img_rect.width * img_rect.height
if img_area > 0:
overlap_ratio = (intersection.width * intersection.height) / img_area
# If significant overlap (>50%), check if image is dark
if overlap_ratio > 0.5:
# Analyze image darkness
try:
from PIL import Image
import io
base_image = page.parent.extract_image(xref)
img_bytes = base_image.get('image')
if img_bytes:
pil_img = Image.open(io.BytesIO(img_bytes))
if pil_img.mode != 'RGB':
pil_img = pil_img.convert('RGB')
img_small = pil_img.resize((min(30, pil_img.width), min(30, pil_img.height)))
pixels = list(img_small.getdata())
if pixels:
avg_r = sum(p[0] for p in pixels) / len(pixels)
avg_g = sum(p[1] for p in pixels) / len(pixels)
avg_b = sum(p[2] for p in pixels) / len(pixels)
max_channel = max(avg_r, avg_g, avg_b)
# Filter dark images (max channel <= 60)
if max_channel <= 60:
filtered_covering += 1
logger.debug(
f"Filtering dark image {img_idx} overlapping with covering rect "
f"(overlap={overlap_ratio:.1%}, max_channel={max_channel:.1f})"
)
is_covering = True
break
except Exception as e:
logger.debug(f"Failed to analyze image darkness: {e}")
if is_covering:
continue
bbox = BoundingBox(
x0=rect.x0,
y0=rect.y0,
x1=rect.x1,
y1=rect.y1
)
# Extract image data
pix = fitz.Pixmap(page.parent, xref)
image_data = {
"width": pix.width,
"height": pix.height,
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
"xref": xref,
"area": image_area
}
# Save image if output directory provided
if output_dir:
output_dir.mkdir(parents=True, exist_ok=True)
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
image_path = output_dir / image_filename
pix.save(str(image_path))
# Store relative filename only (consistent with OCR track)
# PDF generator will join with result_dir to get full path
image_data["saved_path"] = image_filename
logger.debug(f"Saved image to {image_path}")
element = DocumentElement(
element_id=f"image_{page_num}_{counter + img_idx}",
type=ElementType.IMAGE,
content=image_data,
bbox=bbox,
confidence=1.0,
metadata={
"image_index": img_idx,
"xref": xref
}
)
elements.append(element)
pix = None # Free memory
except Exception as e:
logger.error(f"Error extracting image {img_idx}: {e}")
if filtered_tiny > 0 or filtered_covering > 0:
logger.info(
f"Page {page_num}: Filtered images - "
f"{filtered_tiny} tiny (< {self.min_image_area} px²), "
f"{filtered_covering} covering/redaction"
)
return elements
def has_missing_images(self, page: fitz.Page) -> bool:
"""
Detect if a page likely has images that weren't extracted.
This checks for inline image blocks (type=1 in text dict) which indicate
graphics composed of many small image blocks (like logos) that
page.get_images() cannot detect.
Args:
page: PyMuPDF page object
Returns:
True if there are likely missing images that need OCR extraction
"""
try:
# Check if get_images found anything
standard_images = page.get_images()
if standard_images:
return False # Standard images were found, no need for fallback
# Check for inline image blocks (type=1)
text_dict = page.get_text("dict", sort=True)
blocks = text_dict.get("blocks", [])
image_block_count = sum(1 for b in blocks if b.get("type") == 1)
# If there are many inline image blocks, likely there's a logo or graphic
if image_block_count >= 10:
logger.info(f"Detected {image_block_count} inline image blocks - may need OCR for image extraction")
return True
return False
except Exception as e:
logger.warning(f"Error checking for missing images: {e}")
return False
def check_document_for_missing_images(self, pdf_path: Path) -> List[int]:
"""
Check a PDF document for pages that likely have missing images.
This opens the PDF and checks each page for inline image blocks
that weren't extracted by get_images().
Args:
pdf_path: Path to the PDF file
Returns:
List of page numbers (1-indexed) that have missing images
"""
pages_with_missing_images = []
try:
doc = fitz.open(str(pdf_path))
for page_num in range(len(doc)):
page = doc[page_num]
if self.has_missing_images(page):
pages_with_missing_images.append(page_num + 1) # 1-indexed
doc.close()
if pages_with_missing_images:
logger.info(f"Document has missing images on pages: {pages_with_missing_images}")
except Exception as e:
logger.error(f"Error checking document for missing images: {e}")
return pages_with_missing_images
def render_inline_image_regions(
self,
pdf_path: Path,
unified_doc: 'UnifiedDocument',
pages: List[int],
output_dir: Optional[Path] = None
) -> int:
"""
Render inline image regions and add them to the unified document.
This is a fallback when OCR doesn't detect images. It clusters inline
image blocks (type=1) and renders them as images.
Args:
pdf_path: Path to the PDF file
unified_doc: UnifiedDocument to add images to
pages: List of page numbers (1-indexed) to process
output_dir: Directory to save rendered images
Returns:
Number of images added
"""
images_added = 0
try:
doc = fitz.open(str(pdf_path))
for page_num in pages:
if page_num < 1 or page_num > len(doc):
continue
page = doc[page_num - 1] # 0-indexed
page_rect = page.rect
# Get inline image blocks
text_dict = page.get_text("dict", sort=True)
blocks = text_dict.get("blocks", [])
image_blocks = []
for block in blocks:
if block.get("type") == 1: # Image block
bbox = block.get("bbox")
if bbox:
image_blocks.append(fitz.Rect(bbox))
if len(image_blocks) < 5: # Reduced from 10
logger.debug(f"Page {page_num}: Only {len(image_blocks)} inline image blocks, skipping")
continue
logger.info(f"Page {page_num}: Found {len(image_blocks)} inline image blocks")
# Cluster nearby image blocks
regions = self._cluster_nearby_rects(image_blocks, tolerance=5.0)
logger.info(f"Page {page_num}: Clustered into {len(regions)} regions")
# Find the corresponding page in unified_doc
target_page = None
for p in unified_doc.pages:
if p.page_number == page_num:
target_page = p
break
if not target_page:
continue
for region_idx, region_rect in enumerate(regions):
logger.info(f"Page {page_num} region {region_idx}: {region_rect} (w={region_rect.width:.1f}, h={region_rect.height:.1f})")
# Skip very small regions
if region_rect.width < 30 or region_rect.height < 30:
logger.info(f" -> Skipped: too small (min 30x30)")
continue
# Skip regions that are primarily in the table area (below top 40%)
# But allow regions that START in the top portion
page_30_pct = page_rect.height * 0.3
page_40_pct = page_rect.height * 0.4
if region_rect.y0 > page_40_pct:
logger.info(f" -> Skipped: y0={region_rect.y0:.1f} > 40% of page ({page_40_pct:.1f})")
continue
logger.info(f"Rendering inline image region {region_idx} on page {page_num}: {region_rect}")
try:
# Add small padding
clip_rect = region_rect + (-2, -2, 2, 2)
clip_rect.intersect(page_rect)
# Render at 2x resolution
mat = fitz.Matrix(2, 2)
pix = page.get_pixmap(clip=clip_rect, matrix=mat, alpha=False)
# Create bounding box
bbox = BoundingBox(
x0=clip_rect.x0,
y0=clip_rect.y0,
x1=clip_rect.x1,
y1=clip_rect.y1
)
image_data = {
"width": pix.width,
"height": pix.height,
"colorspace": "rgb",
"type": "inline_region"
}
# Save image if output directory provided
if output_dir:
output_dir.mkdir(parents=True, exist_ok=True)
doc_id = unified_doc.document_id or "unknown"
image_filename = f"{doc_id}_p{page_num}_logo{region_idx}.png"
image_path = output_dir / image_filename
pix.save(str(image_path))
image_data["saved_path"] = image_filename
logger.info(f"Saved inline image region to {image_path}")
element = DocumentElement(
element_id=f"logo_{page_num}_{region_idx}",
type=ElementType.LOGO,
content=image_data,
bbox=bbox,
confidence=0.9,
metadata={
"region_type": "inline_image_blocks",
"block_count": len(image_blocks)
}
)
target_page.elements.append(element)
images_added += 1
pix = None # Free memory
except Exception as e:
logger.error(f"Error rendering inline image region {region_idx}: {e}")
doc.close()
if images_added > 0:
logger.info(f"Added {images_added} inline image regions to document")
except Exception as e:
logger.error(f"Error rendering inline image regions: {e}")
return images_added
def _cluster_nearby_rects(self, rects: List[fitz.Rect], tolerance: float = 5.0) -> List[fitz.Rect]:
"""Cluster nearby rectangles into regions."""
if not rects:
return []
sorted_rects = sorted(rects, key=lambda r: (r.y0, r.x0))
merged = []
for rect in sorted_rects:
merged_with_existing = False
for i, region in enumerate(merged):
expanded = region + (-tolerance, -tolerance, tolerance, tolerance)
if expanded.intersects(rect):
merged[i] = region | rect
merged_with_existing = True
break
if not merged_with_existing:
merged.append(rect)
# Second pass: merge any regions that now overlap
changed = True
while changed:
changed = False
new_merged = []
skip = set()
for i, r1 in enumerate(merged):
if i in skip:
continue
current = r1
for j, r2 in enumerate(merged[i+1:], start=i+1):
if j in skip:
continue
expanded = current + (-tolerance, -tolerance, tolerance, tolerance)
if expanded.intersects(r2):
current = current | r2
skip.add(j)
changed = True
new_merged.append(current)
merged = new_merged
return merged
def _extract_vector_graphics(self,
page: fitz.Page,
page_num: int,
document_id: str,
counter: int,
output_dir: Optional[Path],
table_bboxes: Optional[List[BoundingBox]] = None) -> List[DocumentElement]:
"""
Extract vector graphics (charts, diagrams) from page.
This method identifies regions that are composed of vector drawing commands
(paths, lines, rectangles) rather than embedded raster images. These are
typically charts created in Excel, vector diagrams, or other graphics.
Args:
page: PyMuPDF page object
page_num: Page number (1-indexed)
document_id: Unique document identifier
counter: Starting counter for element IDs
output_dir: Directory to save rendered graphics
table_bboxes: List of table bounding boxes to exclude table border drawings
Returns:
List of DocumentElement objects representing vector graphics
"""
elements = []
try:
# Get all drawing commands
drawings = page.get_drawings()
if not drawings:
return elements
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
# Filter out drawings that are likely table borders
# Table borders are typically thin rectangular lines within table regions
non_table_drawings = self._filter_table_border_drawings(drawings, table_bboxes)
logger.debug(f"After filtering table borders: {len(non_table_drawings)} drawings remain")
if not non_table_drawings:
logger.debug("All drawings appear to be table borders, no vector graphics to extract")
return elements
# Cluster drawings into groups (charts, diagrams, etc.)
try:
# Use custom clustering that only considers non-table drawings
drawing_clusters = self._cluster_non_table_drawings(page, non_table_drawings)
logger.debug(f"Clustered into {len(drawing_clusters)} groups")
except (AttributeError, TypeError) as e:
# cluster_drawings not available or has different signature
# Fallback: try to identify charts by analyzing drawing density
logger.warning(f"Custom clustering failed ({e}), using fallback method")
drawing_clusters = self._cluster_drawings_fallback(page, non_table_drawings)
# Get page dimensions for filtering
page_rect = page.rect
page_area = page_rect.width * page_rect.height
for cluster_idx, bbox in enumerate(drawing_clusters):
# Ignore small regions (likely noise or separator lines)
if bbox.width < 50 or bbox.height < 50:
logger.debug(f"Skipping small cluster {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f}")
continue
# Ignore very large regions that cover most of the page
# These are usually background elements, page borders, or misdetected regions
cluster_area = bbox.width * bbox.height
if cluster_area > page_area * 0.7: # More than 70% of page
logger.debug(f"Skipping large cluster {cluster_idx}: covers {cluster_area/page_area*100:.0f}% of page")
continue
# Render the region to a raster image
# matrix=fitz.Matrix(2, 2) increases resolution to ~200 DPI
try:
pix = page.get_pixmap(clip=bbox, matrix=fitz.Matrix(2, 2))
# Save image if output directory provided
if output_dir:
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"{document_id}_p{page_num}_chart{cluster_idx}.png"
filepath = output_dir / filename
pix.save(str(filepath))
# Create DocumentElement
image_data = {
"saved_path": str(filepath),
"width": pix.width,
"height": pix.height,
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
"source": "vector_graphics"
}
element = DocumentElement(
element_id=f"chart_{page_num}_{counter + cluster_idx}",
type=ElementType.CHART, # Use CHART type for vector graphics
content=image_data,
bbox=BoundingBox(
x0=bbox.x0,
y0=bbox.y0,
x1=bbox.x1,
y1=bbox.y1
),
confidence=0.85, # Slightly lower confidence than raster images
metadata={
"cluster_index": cluster_idx,
"drawing_count": len(drawings)
}
)
elements.append(element)
logger.debug(f"Extracted chart {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f} -> {filepath}")
pix = None # Free memory
except Exception as e:
logger.error(f"Error rendering vector graphic cluster {cluster_idx}: {e}")
continue
except Exception as e:
logger.error(f"Error extracting vector graphics: {e}")
return elements
def _cluster_drawings_fallback(self, page: fitz.Page, drawings: list) -> list:
"""
Fallback method to cluster drawings when cluster_drawings() is not available.
This uses a simple spatial clustering approach based on bounding boxes.
"""
if not drawings:
return []
# Collect all drawing bounding boxes
bboxes = []
for drawing in drawings:
rect = drawing.get('rect')
if rect:
bboxes.append(fitz.Rect(rect))
if not bboxes:
return []
# Simple clustering: merge overlapping or nearby rectangles
clusters = []
tolerance = 20
for bbox in bboxes:
# Try to merge with existing cluster
merged = False
for i, cluster in enumerate(clusters):
# Check if bbox is close to this cluster
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
if expanded_cluster.intersects(bbox):
# Merge bbox into cluster
clusters[i] = cluster | bbox # Union of rectangles
merged = True
break
if not merged:
# Create new cluster
clusters.append(bbox)
# Filter out very small clusters
filtered_clusters = [c for c in clusters if c.width >= 50 and c.height >= 50]
logger.debug(f"Fallback clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
return filtered_clusters
def _filter_table_border_drawings(self, drawings: list, table_bboxes: Optional[List[BoundingBox]]) -> list:
"""
Filter out drawings that are likely table borders.
Table borders are typically:
- Thin rectangular lines (height or width < 5pt)
- Located within or on the edge of table bounding boxes
Args:
drawings: List of PyMuPDF drawing objects
table_bboxes: List of table bounding boxes
Returns:
List of drawings that are NOT table borders (likely logos, charts, etc.)
"""
if not table_bboxes:
return drawings
non_table_drawings = []
table_border_count = 0
for drawing in drawings:
rect = drawing.get('rect')
if not rect:
continue
draw_rect = fitz.Rect(rect)
# Check if this drawing is a thin line (potential table border)
is_thin_line = draw_rect.width < 5 or draw_rect.height < 5
# Check if drawing overlaps significantly with any table
overlaps_table = False
for table_bbox in table_bboxes:
table_rect = fitz.Rect(table_bbox.x0, table_bbox.y0, table_bbox.x1, table_bbox.y1)
# Expand table rect slightly to include border lines on edges
expanded_table = table_rect + (-5, -5, 5, 5)
if expanded_table.contains(draw_rect) or expanded_table.intersects(draw_rect):
# Calculate overlap ratio
intersection = draw_rect & expanded_table
if not intersection.is_empty:
overlap_ratio = intersection.get_area() / draw_rect.get_area() if draw_rect.get_area() > 0 else 0
# If drawing is mostly inside table region, it's likely a border
if overlap_ratio > 0.8:
overlaps_table = True
break
# Keep drawing if it's NOT (thin line AND overlapping table)
# This keeps: logos (complex shapes), charts outside tables, etc.
if is_thin_line and overlaps_table:
table_border_count += 1
else:
non_table_drawings.append(drawing)
if table_border_count > 0:
logger.debug(f"Filtered out {table_border_count} table border drawings")
return non_table_drawings
def _cluster_non_table_drawings(self, page: fitz.Page, drawings: list) -> list:
"""
Cluster non-table drawings into groups.
This method clusters drawings that have been pre-filtered to exclude table borders.
It uses a more conservative clustering approach suitable for logos and charts.
Args:
page: PyMuPDF page object
drawings: Pre-filtered list of drawings (excluding table borders)
Returns:
List of fitz.Rect representing clustered drawing regions
"""
if not drawings:
return []
# Collect all drawing bounding boxes
bboxes = []
for drawing in drawings:
rect = drawing.get('rect')
if rect:
bboxes.append(fitz.Rect(rect))
if not bboxes:
return []
# More conservative clustering with smaller tolerance
# This prevents grouping distant graphics together
clusters = []
tolerance = 10 # Smaller tolerance than fallback (was 20)
for bbox in bboxes:
# Try to merge with existing cluster
merged = False
for i, cluster in enumerate(clusters):
# Check if bbox is close to this cluster
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
if expanded_cluster.intersects(bbox):
# Merge bbox into cluster
clusters[i] = cluster | bbox # Union of rectangles
merged = True
break
if not merged:
# Create new cluster
clusters.append(bbox)
# Filter out very small clusters (noise)
# Keep minimum 30x30 for logos (smaller than default 50x50)
filtered_clusters = [c for c in clusters if c.width >= 30 and c.height >= 30]
logger.debug(f"Non-table clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
return filtered_clusters
def _deduplicate_table_chart_overlap(self, elements: List[DocumentElement]) -> List[DocumentElement]:
"""
Intelligently resolve TABLE-CHART overlaps based on table structure completeness.
When a region is detected as both TABLE and CHART:
- Calculate cell completeness = actual_cells / (rows × cols)
- If completeness ≥50% → Real table with complete structure → Keep TABLE
- If completeness <50% → False positive (chart detected as table) → Keep CHART
Args:
elements: List of extracted elements
Returns:
Filtered list with low-quality overlaps removed
"""
# Collect all tables and charts
tables = [elem for elem in elements if elem.type == ElementType.TABLE]
charts = [elem for elem in elements if elem.type == ElementType.CHART]
if not tables or not charts:
return elements # No potential conflicts
# Analyze TABLE structure completeness
# For tables with merged cells, completeness = positions covered / total positions
table_completeness = {}
for table in tables:
if hasattr(table.content, 'rows') and hasattr(table.content, 'cols') and hasattr(table.content, 'cells'):
expected_positions = table.content.rows * table.content.cols
# Calculate actual coverage accounting for merged cells
# Each cell covers row_span × col_span positions
covered_positions = 0
for cell in table.content.cells:
row_span = getattr(cell, 'row_span', 1) or 1
col_span = getattr(cell, 'col_span', 1) or 1
covered_positions += row_span * col_span
if expected_positions > 0:
completeness = covered_positions / expected_positions
table_completeness[table.element_id] = completeness
else:
table_completeness[table.element_id] = 0.0
else:
table_completeness[table.element_id] = 0.0
# Check overlaps and decide what to keep
filtered_elements = []
removed_charts = 0
removed_tables = 0
# Process TABLEs
for table in tables:
if not table.bbox:
filtered_elements.append(table)
continue
# Check if this TABLE overlaps with any CHART
overlaps_chart = False
for chart in charts:
if not chart.bbox:
continue
# Calculate overlap
overlap_x0 = max(table.bbox.x0, chart.bbox.x0)
overlap_y0 = max(table.bbox.y0, chart.bbox.y0)
overlap_x1 = min(table.bbox.x1, chart.bbox.x1)
overlap_y1 = min(table.bbox.y1, chart.bbox.y1)
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
table_area = (table.bbox.x1 - table.bbox.x0) * (table.bbox.y1 - table.bbox.y0)
if table_area > 0:
overlap_ratio = overlap_area / table_area
if overlap_ratio >= 0.8:
overlaps_chart = True
completeness = table_completeness.get(table.element_id, 0.0)
logger.debug(
f"TABLE-CHART overlap: {table.element_id} vs {chart.element_id}: "
f"{overlap_ratio*100:.1f}% overlap, TABLE cell completeness: {completeness*100:.1f}%"
)
# Decision: Keep TABLE only if structure is complete
if completeness < 0.5: # <50% cell completeness
logger.info(
f"Removing incomplete TABLE {table.element_id} "
f"({completeness*100:.1f}% completeness, overlaps with CHART {chart.element_id})"
)
removed_tables += 1
break
else:
logger.info(
f"Keeping TABLE {table.element_id} with {completeness*100:.1f}% completeness "
f"(will remove overlapping CHART {chart.element_id})"
)
if not overlaps_chart or table_completeness.get(table.element_id, 0.0) >= 0.5:
filtered_elements.append(table)
# Process CHARTs
for chart in charts:
if not chart.bbox:
filtered_elements.append(chart)
continue
# Check if this CHART should be removed due to overlap with high-quality TABLE
should_remove = False
for table in tables:
if not table.bbox:
continue
# Calculate overlap
overlap_x0 = max(chart.bbox.x0, table.bbox.x0)
overlap_y0 = max(chart.bbox.y0, table.bbox.y0)
overlap_x1 = min(chart.bbox.x1, table.bbox.x1)
overlap_y1 = min(chart.bbox.y1, table.bbox.y1)
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
chart_area = (chart.bbox.x1 - chart.bbox.x0) * (chart.bbox.y1 - chart.bbox.y0)
if chart_area > 0:
overlap_ratio = overlap_area / chart_area
if overlap_ratio >= 0.8:
completeness = table_completeness.get(table.element_id, 0.0)
# Remove CHART only if TABLE structure is complete
if completeness >= 0.5:
should_remove = True
logger.info(
f"Removing CHART {chart.element_id} "
f"({overlap_ratio*100:.1f}% overlap with TABLE {table.element_id} having {completeness*100:.1f}% completeness)"
)
removed_charts += 1
break
if not should_remove:
filtered_elements.append(chart)
# Process all other elements
for elem in elements:
if elem.type not in [ElementType.TABLE, ElementType.CHART]:
filtered_elements.append(elem)
if removed_charts > 0 or removed_tables > 0:
logger.info(
f"Deduplication complete: removed {removed_tables} incomplete TABLE(s), "
f"{removed_charts} overlapping CHART(s)"
)
return filtered_elements
# =========================================================================
# PDF Preprocessing Pipeline Methods
# =========================================================================
def _preprocess_page(self, page: fitz.Page, page_num: int, doc: fitz.Document = None) -> Dict[str, Any]:
"""
Run preprocessing pipeline on a page before extraction.
Pipeline steps:
1. Content sanitization (clean_contents)
2. Hidden layer detection (OCG)
3. White-out/black-out detection (vector rectangles)
4. Covering image detection (embedded black/white images)
Args:
page: PyMuPDF page object
page_num: Page number (1-indexed)
doc: PyMuPDF document object (needed for image analysis)
Returns:
Dict with preprocessing results:
- covered_word_bboxes: List of bboxes for text covered by rectangles/images
- covering_images: List of covering image info
- hidden_layers: List of hidden OCG layer names
- sanitized: Whether content was sanitized
"""
result = {
'covered_word_bboxes': [],
'covering_images': [],
'hidden_layers': [],
'sanitized': False
}
# Step 1.1: Content sanitization
if self.enable_content_sanitization:
try:
page.clean_contents(sanitize=True)
result['sanitized'] = True
logger.debug(f"Page {page_num}: Content stream sanitized")
except Exception as e:
logger.warning(f"Page {page_num}: Content sanitization failed: {e}")
# Step 1.3: White-out/black-out detection (vector rectangles)
if self.enable_whiteout_detection:
covered, covering_rect_bboxes = self._detect_whiteout_covered_text(page, page_num)
result['covered_word_bboxes'] = [fitz.Rect(w['bbox']) for w in covered]
result['covered_words_detail'] = covered # Include color_type info
result['covering_rect_bboxes'] = covering_rect_bboxes # Actual covering rectangles
if covered:
# Count by color type
white_covered = sum(1 for c in covered if c.get('color_type') == 'white')
black_covered = sum(1 for c in covered if c.get('color_type') == 'black')
other_covered = len(covered) - white_covered - black_covered
logger.info(f"Page {page_num}: Detected {len(covered)} covered text regions "
f"(white: {white_covered}, black/redaction: {black_covered}, other: {other_covered})")
# Step 1.4: Covering image detection (embedded black/white images)
if self.enable_whiteout_detection and doc is not None:
covering_images = self._detect_covering_images(page, doc, page_num)
result['covering_images'] = covering_images
# Add covering image bboxes to the covered_word_bboxes list
for img in covering_images:
result['covered_word_bboxes'].append(fitz.Rect(img['bbox']))
if covering_images:
black_imgs = sum(1 for c in covering_images if c['color_type'] == 'image_black')
white_imgs = sum(1 for c in covering_images if c['color_type'] == 'image_white')
logger.info(f"Page {page_num}: Detected {len(covering_images)} covering images "
f"(black: {black_imgs}, white: {white_imgs})")
return result
def _detect_whiteout_covered_text(self, page: fitz.Page, page_num: int) -> Tuple[List[Dict], List[fitz.Rect]]:
"""
Detect text covered by solid color rectangles (white-out, black redaction, or any solid fill).
Uses IoU (Intersection over Union) to determine if text is covered.
Args:
page: PyMuPDF page object
page_num: Page number for logging
Returns:
Tuple of:
- List of dicts with covered text info: {'text', 'bbox', 'coverage', 'color_type'}
- List of covering rectangle bboxes (fitz.Rect)
"""
covered_words = []
covering_rect_bboxes = [] # Return the actual rectangles
page_rect = page.rect # Page boundaries
# Get all drawings and find solid-filled rectangles
drawings = page.get_drawings()
covering_rects = [] # List of (rect, color_type)
for d in drawings:
fill_color = d.get('fill')
if fill_color and isinstance(fill_color, (tuple, list)) and len(fill_color) >= 3:
r, g, b = fill_color[:3]
rect = d.get('rect')
if not rect:
continue
fitz_rect = fitz.Rect(rect)
# Skip very small rectangles (likely not covering blocks)
if fitz_rect.width < 5 or fitz_rect.height < 5:
continue
# Skip rectangles completely outside page boundaries
if not fitz_rect.intersects(page_rect):
continue
# Clip rectangle to page boundaries
fitz_rect = fitz_rect & page_rect
# Detect white rectangles (white-out / correction tape)
# Must be pure white (>= 0.98) to avoid false positives from light backgrounds
if r >= 0.98 and g >= 0.98 and b >= 0.98:
covering_rects.append((fitz_rect, 'white'))
# Detect dark rectangles (redaction / censoring)
# Includes pure black AND dark gray (threshold 0.3)
# Dark gray is commonly used for redaction boxes
elif max(r, g, b) <= 0.3:
covering_rects.append((fitz_rect, 'black'))
if not covering_rects:
return covered_words, covering_rect_bboxes
# Extract covering rectangle bboxes for image filtering
covering_rect_bboxes = [rect for rect, _ in covering_rects]
# Log detected covering rectangles by type
white_count = sum(1 for _, t in covering_rects if t == 'white')
black_count = sum(1 for _, t in covering_rects if t == 'black')
logger.debug(f"Page {page_num}: Found {len(covering_rects)} potential covering rectangles "
f"(white: {white_count}, black/redaction: {black_count})")
# Get all text words with bounding boxes
# words format: (x0, y0, x1, y1, word, block_no, line_no, word_no)
words = page.get_text("words")
for word_info in words:
word_rect = fitz.Rect(word_info[:4])
word_text = word_info[4]
word_area = word_rect.width * word_rect.height
if word_area <= 0:
continue
for cover_rect, color_type in covering_rects:
# Calculate intersection
intersection = word_rect & cover_rect
if intersection.is_empty:
continue
intersection_area = intersection.width * intersection.height
coverage_ratio = intersection_area / word_area
# Check if coverage exceeds IoU threshold
if coverage_ratio >= self.whiteout_iou_threshold:
covered_words.append({
'text': word_text,
'bbox': tuple(word_rect),
'coverage': coverage_ratio,
'color_type': color_type
})
break # Word is covered, no need to check other rects
return covered_words, covering_rect_bboxes
def _detect_covering_images(self, page: fitz.Page, doc: fitz.Document, page_num: int) -> List[Dict]:
"""
Detect embedded images that are mostly black/white AND actually cover text.
Only reports images that:
1. Are mostly solid black or white
2. Are within page boundaries
3. Actually overlap with text content (IoU check)
4. Are rendered AFTER the text they overlap (z-order check)
Args:
page: PyMuPDF page object
doc: PyMuPDF document object (needed for image extraction)
page_num: Page number for logging
Returns:
List of dicts with covering image info: {'bbox', 'color_type', 'avg_color', 'covered_text_count'}
"""
covering_images = []
page_rect = page.rect # Page boundaries
try:
# Get all images on the page with their positions
image_list = page.get_images(full=True)
if not image_list:
return covering_images
# Get rendering order (z-order) using get_bboxlog()
# Items rendered later (higher index) appear on top
bboxlog = page.get_bboxlog()
# Build a map of bbox -> sequence number for images and text
# This helps determine if an image is rendered before or after text
image_seqnos = {} # bbox tuple -> seqno
text_seqnos = {} # bbox tuple -> seqno
for seqno, (action_type, bbox) in enumerate(bboxlog):
bbox_tuple = tuple(fitz.Rect(bbox))
if "image" in action_type:
image_seqnos[bbox_tuple] = seqno
elif "text" in action_type:
text_seqnos[bbox_tuple] = seqno
# Get all text words for coverage check
words = page.get_text("words") # (x0, y0, x1, y1, word, block_no, line_no, word_no)
for img_info in image_list:
xref = img_info[0]
width = img_info[2]
height = img_info[3]
# Skip very small images (icons, bullets)
if width < 20 or height < 10:
continue
try:
# Extract image data
base_image = doc.extract_image(xref)
img_bytes = base_image.get('image')
if not img_bytes:
continue
# Analyze image color using PIL
from PIL import Image
import io
img = Image.open(io.BytesIO(img_bytes))
if img.mode != 'RGB':
img = img.convert('RGB')
# Sample pixels for efficiency (don't analyze every pixel)
img_small = img.resize((min(50, img.width), min(50, img.height)))
pixels = list(img_small.getdata())
if not pixels:
continue
avg_r = sum(p[0] for p in pixels) / len(pixels)
avg_g = sum(p[1] for p in pixels) / len(pixels)
avg_b = sum(p[2] for p in pixels) / len(pixels)
# Determine if image is mostly black or white
# Use max channel value to detect dark images (allows slight color tint)
max_channel = max(avg_r, avg_g, avg_b)
min_channel = min(avg_r, avg_g, avg_b)
color_type = None
is_pure_solid = False # Pure black/white should always be filtered
if max_channel <= 40: # Dark image (any channel <= 40)
color_type = 'image_black'
# Check if it's pure solid black (should always filter)
if max_channel <= 5:
is_pure_solid = True
elif min_channel >= 245: # Bright image (any channel >= 245)
color_type = 'image_white'
if min_channel >= 250:
is_pure_solid = True
if color_type:
# Get image position on page
for img_rect in page.get_image_rects(xref):
# Skip images completely outside page boundaries
if not img_rect.intersects(page_rect):
continue
# Clip image rect to page boundaries
clipped_rect = img_rect & page_rect
# Get image's rendering sequence number
img_bbox_tuple = tuple(clipped_rect)
img_seqno = image_seqnos.get(img_bbox_tuple, -1)
# If we can't find exact match, try to find closest match
if img_seqno == -1:
for bbox_tuple, seqno in image_seqnos.items():
if fitz.Rect(bbox_tuple).intersects(clipped_rect):
# Use the matching seqno
img_seqno = seqno
break
# Check if image actually covers any text (IoU check)
# AND is rendered AFTER the text (z-order check)
covered_text_count = 0
is_background_image = False
for word_info in words:
word_rect = fitz.Rect(word_info[:4])
word_area = word_rect.width * word_rect.height
if word_area <= 0:
continue
intersection = word_rect & clipped_rect
if not intersection.is_empty:
intersection_area = intersection.width * intersection.height
coverage_ratio = intersection_area / word_area
# Count as covered if >= 50% of word is under the image
if coverage_ratio >= 0.5:
# Z-order check: Find the text's rendering sequence
text_seqno = -1
for bbox_tuple, seqno in text_seqnos.items():
text_bbox = fitz.Rect(bbox_tuple)
if text_bbox.intersects(word_rect):
text_seqno = seqno
break
# Only count as covered if image is rendered AFTER text
# If image is rendered BEFORE text, it's a background
if img_seqno > text_seqno and text_seqno >= 0:
covered_text_count += 1
elif img_seqno < text_seqno and img_seqno >= 0:
# Image is rendered before text = background
is_background_image = True
# Skip this image if it's detected as a background image
if is_background_image and covered_text_count == 0:
logger.debug(f"Page {page_num}: Skipping background image xref={xref} "
f"(rendered before text, seqno={img_seqno})")
continue
# Report if image covers text OR is pure solid black/white
# Pure solid fills are likely redaction/placeholder boxes
# But skip if it's a background image (rendered before text)
if covered_text_count > 0 or (is_pure_solid and not is_background_image):
covering_images.append({
'xref': xref, # Include xref for filtering
'bbox': tuple(clipped_rect),
'color_type': color_type,
'avg_color': (avg_r, avg_g, avg_b),
'size': (width, height),
'covered_text_count': covered_text_count,
'is_pure_solid': is_pure_solid,
'is_background': is_background_image,
'render_seqno': img_seqno
})
except Exception as e:
logger.debug(f"Page {page_num}: Failed to analyze image xref={xref}: {e}")
continue
if covering_images:
black_count = sum(1 for c in covering_images if c['color_type'] == 'image_black')
white_count = sum(1 for c in covering_images if c['color_type'] == 'image_white')
total_covered = sum(c.get('covered_text_count', 0) for c in covering_images)
logger.debug(f"Page {page_num}: Found {len(covering_images)} covering images "
f"(black: {black_count}, white: {white_count}, covering {total_covered} text regions)")
except Exception as e:
logger.warning(f"Page {page_num}: Failed to detect covering images: {e}")
return covering_images
def _get_hidden_ocg_layers(self, doc: fitz.Document) -> List[str]:
"""
Get list of hidden Optional Content Group (OCG) layer names.
Args:
doc: PyMuPDF document object
Returns:
List of hidden layer names
"""
hidden_layers = []
try:
ocgs = doc.get_ocgs()
if not ocgs:
return hidden_layers
for ocg_xref, ocg_info in ocgs.items():
# Check if layer is hidden by default
if ocg_info.get('on') == False:
layer_name = ocg_info.get('name', f'OCG_{ocg_xref}')
hidden_layers.append(layer_name)
logger.debug(f"Found hidden OCG layer: {layer_name}")
except Exception as e:
logger.warning(f"Failed to get OCG layers: {e}")
return hidden_layers
def _calculate_garble_rate(self, text: str) -> float:
"""
Calculate the rate of garbled characters in text.
Detects:
- (cid:xxxx) patterns (missing ToUnicode map)
- Replacement character U+FFFD
- Private Use Area (PUA) characters
Args:
text: Text to analyze
Returns:
Garble rate as float between 0.0 and 1.0
"""
if not text:
return 0.0
# Count (cid:xxxx) patterns
cid_pattern = r'\(cid:\d+\)'
cid_matches = re.findall(cid_pattern, text)
cid_char_count = sum(len(m) for m in cid_matches)
# Count replacement characters (U+FFFD)
replacement_count = text.count('\ufffd')
# Count Private Use Area characters (U+E000 to U+F8FF)
pua_count = sum(1 for c in text if 0xE000 <= ord(c) <= 0xF8FF)
total_garble = cid_char_count + replacement_count + pua_count
total_chars = len(text)
return total_garble / total_chars if total_chars > 0 else 0.0
def _should_fallback_to_ocr(self, page_text: str, page_num: int) -> bool:
"""
Determine if page should use OCR fallback based on garble rate.
Args:
page_text: Extracted text from page
page_num: Page number for logging
Returns:
True if OCR fallback is recommended
"""
if not self.enable_garble_detection:
return False
garble_rate = self._calculate_garble_rate(page_text)
if garble_rate > self.garble_ocr_fallback_threshold:
logger.warning(
f"Page {page_num}: High garble rate detected ({garble_rate:.1%}). "
f"OCR fallback recommended."
)
return True
return False
def _is_page_number(self, text: str) -> bool:
"""
Check if text is likely a page number.
Args:
text: Text to check
Returns:
True if text matches page number patterns
"""
text = text.strip()
# Pure number
if text.isdigit() and len(text) <= 4:
return True
# Common patterns
patterns = [
r'^page\s*\d+$', # "Page 1"
r'^-?\s*\d+\s*-?$', # "- 1 -" or "-1-"
r'^\d+\s*/\s*\d+$', # "1/10"
r'^第\s*\d+\s*[頁页]$', # "第1頁" or "第1页"
r'^p\.?\s*\d+$', # "P.1" or "p1"
]
for pattern in patterns:
if re.match(pattern, text, re.IGNORECASE):
return True
return False
def _filter_page_numbers(self, elements: List[DocumentElement], page_height: float) -> List[DocumentElement]:
"""
Filter out page number elements.
Page numbers are typically:
- In the bottom 10% of the page
- Match numeric/page number patterns
Args:
elements: List of document elements
page_height: Page height for position calculation
Returns:
Filtered list without page numbers
"""
if not self.enable_page_number_filter:
return elements
filtered = []
removed_count = 0
for elem in elements:
# Only filter text elements
if elem.type not in [ElementType.TEXT, ElementType.PARAGRAPH]:
filtered.append(elem)
continue
# Check position - must be in bottom 10% of page
if elem.bbox:
y_rel = elem.bbox.y0 / page_height
if y_rel > 0.90:
# Get text content
text = elem.get_text() if hasattr(elem, 'get_text') else str(elem.content)
if self._is_page_number(text):
removed_count += 1
logger.debug(f"Filtered page number: '{text}'")
continue
filtered.append(elem)
if removed_count > 0:
logger.info(f"Filtered {removed_count} page number element(s)")
return filtered
def _calculate_iou(self, bbox1: List[float], bbox2: List[float]) -> float:
"""
Calculate Intersection over Union (IoU) for two bounding boxes.
Args:
bbox1: First bounding box [x0, y0, x1, y1]
bbox2: Second bounding box [x0, y0, x1, y1]
Returns:
IoU value between 0.0 and 1.0
"""
# Calculate intersection
x0 = max(bbox1[0], bbox2[0])
y0 = max(bbox1[1], bbox2[1])
x1 = min(bbox1[2], bbox2[2])
y1 = min(bbox1[3], bbox2[3])
# No intersection
if x0 >= x1 or y0 >= y1:
return 0.0
intersection = (x1 - x0) * (y1 - y0)
# Calculate areas
area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
# Calculate union
union = area1 + area2 - intersection
if union <= 0:
return 0.0
return intersection / union
def _is_text_in_covered_regions(self, bbox: BoundingBox, covered_bboxes: List[fitz.Rect]) -> bool:
"""
Check if a text bbox overlaps with any covered (white-out) regions.
Args:
bbox: Text bounding box
covered_bboxes: List of covered region rectangles
Returns:
True if text overlaps with covered regions
"""
if not covered_bboxes or not bbox:
return False
text_rect = fitz.Rect(bbox.x0, bbox.y0, bbox.x1, bbox.y1)
for covered_rect in covered_bboxes:
if text_rect.intersects(covered_rect):
# Calculate overlap ratio
intersection = text_rect & covered_rect
if not intersection.is_empty:
text_area = text_rect.width * text_rect.height
if text_area > 0:
overlap_ratio = (intersection.width * intersection.height) / text_area
if overlap_ratio >= self.whiteout_iou_threshold:
return True
return False
# =========================================================================
# Phase 4: GS Distillation - Exception Handler
# =========================================================================
@staticmethod
def is_ghostscript_available() -> bool:
"""Check if Ghostscript is available on the system."""
import shutil
return shutil.which('gs') is not None
def _should_trigger_gs_repair(self, file_path: Path) -> Tuple[bool, str]:
"""
Determine if Ghostscript repair should be triggered.
Triggers on:
1. High garble rate (>10% cid:xxxx patterns) in extracted text
2. Severe mupdf structural errors during opening
Args:
file_path: Path to PDF file
Returns:
Tuple of (should_repair, reason)
"""
import io
import sys
reason = ""
try:
# Capture mupdf warnings
old_stderr = sys.stderr
sys.stderr = captured_stderr = io.StringIO()
doc = fitz.open(str(file_path))
# Restore stderr and get warnings
sys.stderr = old_stderr
warnings = captured_stderr.getvalue()
# Check for severe structural errors
severe_keywords = ['error', 'invalid xref', 'corrupt', 'damaged', 'repair']
for keyword in severe_keywords:
if keyword.lower() in warnings.lower():
reason = f"Structural error detected: {keyword}"
doc.close()
return True, reason
# Check garble rate on first page
if len(doc) > 0:
page = doc[0]
text = page.get_text("text")
garble_rate = self._calculate_garble_rate(text)
if garble_rate > self.garble_ocr_fallback_threshold:
reason = f"High garble rate: {garble_rate:.1%}"
doc.close()
return True, reason
doc.close()
return False, ""
except Exception as e:
reason = f"Error opening PDF: {str(e)}"
return True, reason
def _repair_pdf_with_gs(self, input_path: Path, output_path: Path) -> bool:
"""
Repair a PDF using Ghostscript distillation.
This re-renders the PDF through Ghostscript's PDF interpreter,
which can fix many structural issues.
Args:
input_path: Path to input PDF
output_path: Path to save repaired PDF
Returns:
True if repair succeeded, False otherwise
"""
import subprocess
import shutil
if not self.is_ghostscript_available():
logger.warning("Ghostscript not available, cannot repair PDF")
return False
try:
# GS command for PDF repair/distillation
cmd = [
'gs',
'-dNOPAUSE',
'-dBATCH',
'-dSAFER',
'-sDEVICE=pdfwrite',
'-dPDFSETTINGS=/prepress',
'-dDetectDuplicateImages=true',
'-dCompressFonts=true',
'-dSubsetFonts=true',
f'-sOutputFile={output_path}',
str(input_path)
]
logger.info(f"Running Ghostscript repair: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60 # 60 second timeout
)
if result.returncode == 0 and output_path.exists():
logger.info(f"Ghostscript repair successful: {output_path}")
return True
else:
logger.error(f"Ghostscript repair failed: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error("Ghostscript repair timed out")
return False
except Exception as e:
logger.error(f"Ghostscript repair error: {e}")
return False
def extract_with_repair(self,
file_path: Path,
output_dir: Optional[Path] = None,
enable_gs_repair: bool = False) -> UnifiedDocument:
"""
Extract content with optional Ghostscript repair for damaged PDFs.
This method first checks if the PDF needs repair, and if so,
attempts to repair it using Ghostscript before extraction.
Args:
file_path: Path to PDF file
output_dir: Optional directory to save extracted images
enable_gs_repair: Whether to attempt GS repair on problematic PDFs
Returns:
UnifiedDocument with extracted content
"""
import tempfile
# Check if repair is needed and enabled
if enable_gs_repair:
should_repair, reason = self._should_trigger_gs_repair(file_path)
if should_repair:
logger.warning(f"PDF repair triggered: {reason}")
if self.is_ghostscript_available():
# Create temporary file for repaired PDF
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
if self._repair_pdf_with_gs(file_path, tmp_path):
logger.info("Using repaired PDF for extraction")
result = self.extract(tmp_path, output_dir)
# Add repair metadata
if result.metadata:
result.metadata.gs_repaired = True
return result
else:
logger.warning("GS repair failed, trying original file")
finally:
# Cleanup temp file
if tmp_path.exists():
tmp_path.unlink()
else:
logger.warning("Ghostscript not available, skipping repair")
# Normal extraction
return self.extract(file_path, output_dir)
def get_pages_needing_ocr(self, doc: UnifiedDocument) -> List[int]:
"""
Get list of page numbers that need OCR fallback.
This method checks each page's metadata for the 'needs_ocr_fallback' flag
set during extraction when high garble rates are detected.
Args:
doc: UnifiedDocument from extraction
Returns:
List of page numbers (1-indexed) that need OCR processing
"""
pages_needing_ocr = []
for page in doc.pages:
if page.metadata and page.metadata.get('needs_ocr_fallback', False):
pages_needing_ocr.append(page.page_number)
if pages_needing_ocr:
logger.info(f"Pages needing OCR fallback: {pages_needing_ocr}")
return pages_needing_ocr
def get_extraction_quality_report(self, doc: UnifiedDocument) -> Dict[str, Any]:
"""
Generate a quality report for the extraction.
This report helps determine if additional processing (OCR, manual review)
is needed.
Args:
doc: UnifiedDocument from extraction
Returns:
Dict with quality metrics:
- total_pages: int
- pages_with_issues: list of page numbers with problems
- average_garble_rate: float
- needs_ocr_fallback: bool (any page needs OCR)
- preprocessing_stats: dict with sanitization/whiteout counts
"""
report = {
'total_pages': len(doc.pages),
'pages_with_issues': [],
'garble_rates': {},
'average_garble_rate': 0.0,
'needs_ocr_fallback': False,
'preprocessing_stats': {
'pages_sanitized': 0,
'total_whiteout_regions': 0,
'total_covering_images': 0
}
}
total_garble = 0.0
pages_with_garble = 0
for page in doc.pages:
metadata = page.metadata or {}
# Check garble rate
garble_rate = metadata.get('garble_rate', 0.0)
if garble_rate > 0:
report['garble_rates'][page.page_number] = garble_rate
total_garble += garble_rate
pages_with_garble += 1
# Check OCR fallback flag
if metadata.get('needs_ocr_fallback', False):
report['pages_with_issues'].append(page.page_number)
report['needs_ocr_fallback'] = True
# Preprocessing stats
preprocessing = metadata.get('preprocessing', {})
if preprocessing.get('sanitized', False):
report['preprocessing_stats']['pages_sanitized'] += 1
report['preprocessing_stats']['total_whiteout_regions'] += preprocessing.get('whiteout_regions_found', 0)
report['preprocessing_stats']['total_covering_images'] += preprocessing.get('covering_images_found', 0)
# Calculate average garble rate
if pages_with_garble > 0:
report['average_garble_rate'] = total_garble / pages_with_garble
return report