Vector rectangles: - Add page boundary check (skip out-of-bounds rectangles) - Clip rectangles to page boundaries Covering images: - Add page boundary check (skip out-of-bounds images) - Add IoU-based text coverage verification - Only report images that actually cover text (>= 50% word coverage) - Add covered_text_count to detection results This reduces false positives from black logos or decorative images that don't actually cover any text content. Test results (edit3.pdf): - Before: 10 covering images detected - After: 6 covering images detected (4 filtered - no text coverage) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2601 lines
102 KiB
Python
2601 lines
102 KiB
Python
"""
|
||
Direct Extraction Engine using PyMuPDF
|
||
|
||
Handles direct text and structure extraction from editable PDFs without OCR.
|
||
This provides much faster processing and perfect accuracy for documents with
|
||
extractable text.
|
||
"""
|
||
|
||
import os
|
||
import logging
|
||
import fitz # PyMuPDF
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||
from datetime import datetime
|
||
import re
|
||
|
||
from ..models.unified_document import (
|
||
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
|
||
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
|
||
ElementType, ProcessingTrack
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class DirectExtractionEngine:
|
||
"""
|
||
Engine for direct text extraction from editable PDFs using PyMuPDF.
|
||
|
||
This engine provides:
|
||
- Fast text extraction with exact positioning
|
||
- Font and style information preservation
|
||
- Table structure detection
|
||
- Image extraction with coordinates
|
||
- Hyperlink and annotation extraction
|
||
"""
|
||
|
||
def __init__(self,
|
||
enable_table_detection: bool = True,
|
||
enable_image_extraction: bool = True,
|
||
min_table_rows: int = 2,
|
||
min_table_cols: int = 2,
|
||
# Preprocessing pipeline options
|
||
enable_content_sanitization: bool = True,
|
||
enable_hidden_layer_removal: bool = True,
|
||
enable_whiteout_detection: bool = True,
|
||
whiteout_iou_threshold: float = 0.8,
|
||
enable_page_number_filter: bool = True,
|
||
enable_garble_detection: bool = True,
|
||
garble_ocr_fallback_threshold: float = 0.1):
|
||
"""
|
||
Initialize the extraction engine.
|
||
|
||
Args:
|
||
enable_table_detection: Whether to detect and extract tables
|
||
enable_image_extraction: Whether to extract images
|
||
min_table_rows: Minimum rows for table detection
|
||
min_table_cols: Minimum columns for table detection
|
||
|
||
Preprocessing pipeline options:
|
||
enable_content_sanitization: Run clean_contents() to fix malformed PDF streams
|
||
enable_hidden_layer_removal: Remove content from hidden OCG layers
|
||
enable_whiteout_detection: Detect and filter text covered by white rectangles
|
||
whiteout_iou_threshold: IoU threshold for white-out detection (default 0.8)
|
||
enable_page_number_filter: Filter out detected page numbers
|
||
enable_garble_detection: Detect garbled text (cid:xxxx patterns)
|
||
garble_ocr_fallback_threshold: Garble rate threshold to recommend OCR fallback
|
||
"""
|
||
self.enable_table_detection = enable_table_detection
|
||
self.enable_image_extraction = enable_image_extraction
|
||
self.min_table_rows = min_table_rows
|
||
self.min_table_cols = min_table_cols
|
||
|
||
# Preprocessing pipeline options
|
||
self.enable_content_sanitization = enable_content_sanitization
|
||
self.enable_hidden_layer_removal = enable_hidden_layer_removal
|
||
self.enable_whiteout_detection = enable_whiteout_detection
|
||
self.whiteout_iou_threshold = whiteout_iou_threshold
|
||
self.enable_page_number_filter = enable_page_number_filter
|
||
self.enable_garble_detection = enable_garble_detection
|
||
self.garble_ocr_fallback_threshold = garble_ocr_fallback_threshold
|
||
|
||
def extract(self,
|
||
file_path: Path,
|
||
output_dir: Optional[Path] = None) -> UnifiedDocument:
|
||
"""
|
||
Extract content from PDF file to UnifiedDocument format.
|
||
|
||
Args:
|
||
file_path: Path to PDF file
|
||
output_dir: Optional directory to save extracted images.
|
||
If not provided, creates a temporary directory in storage/results/{document_id}/
|
||
|
||
Returns:
|
||
UnifiedDocument with extracted content
|
||
"""
|
||
start_time = datetime.now()
|
||
document_id = str(uuid.uuid4())[:8] # Short ID for cleaner paths
|
||
|
||
try:
|
||
doc = fitz.open(str(file_path))
|
||
|
||
# If no output_dir provided, create default directory for image extraction
|
||
if output_dir is None and self.enable_image_extraction:
|
||
# Create temporary directory in storage/results
|
||
default_output_dir = Path("storage/results") / document_id
|
||
default_output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_dir = default_output_dir
|
||
logger.debug(f"Created default output directory: {output_dir}")
|
||
|
||
# Extract document metadata
|
||
metadata = self._extract_metadata(file_path, doc, start_time)
|
||
|
||
# Extract pages
|
||
pages = []
|
||
for page_num in range(len(doc)):
|
||
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
|
||
page = self._extract_page(
|
||
doc[page_num],
|
||
page_num + 1,
|
||
document_id,
|
||
output_dir,
|
||
doc # Pass doc for covering image detection
|
||
)
|
||
pages.append(page)
|
||
|
||
doc.close()
|
||
|
||
# Calculate processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
metadata.processing_time = processing_time
|
||
|
||
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during direct extraction: {e}")
|
||
# Return partial result with error information
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
if 'metadata' not in locals():
|
||
metadata = DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size if file_path.exists() else 0,
|
||
created_at=datetime.now(),
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=processing_time
|
||
)
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages if 'pages' in locals() else [],
|
||
processing_errors=[{
|
||
"error": str(e),
|
||
"type": type(e).__name__
|
||
}]
|
||
)
|
||
|
||
def _extract_metadata(self,
|
||
file_path: Path,
|
||
doc: fitz.Document,
|
||
start_time: datetime) -> DocumentMetadata:
|
||
"""Extract document metadata"""
|
||
pdf_metadata = doc.metadata
|
||
|
||
return DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size,
|
||
created_at=start_time,
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=0.0, # Will be updated later
|
||
title=pdf_metadata.get("title"),
|
||
author=pdf_metadata.get("author"),
|
||
subject=pdf_metadata.get("subject"),
|
||
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
|
||
producer=pdf_metadata.get("producer"),
|
||
creator=pdf_metadata.get("creator"),
|
||
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
|
||
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
|
||
)
|
||
|
||
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
|
||
"""Parse PDF date string to datetime"""
|
||
if not date_str:
|
||
return None
|
||
|
||
try:
|
||
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
|
||
# Example: D:20240101120000+09'00
|
||
if date_str.startswith("D:"):
|
||
date_str = date_str[2:]
|
||
|
||
# Extract just the date/time part (first 14 characters)
|
||
if len(date_str) >= 14:
|
||
date_part = date_str[:14]
|
||
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
|
||
except:
|
||
pass
|
||
|
||
return None
|
||
|
||
def _extract_page(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
output_dir: Optional[Path],
|
||
doc: fitz.Document = None) -> Page:
|
||
"""Extract content from a single page with preprocessing pipeline."""
|
||
elements = []
|
||
element_counter = 0
|
||
|
||
# =====================================================================
|
||
# PREPROCESSING PIPELINE
|
||
# =====================================================================
|
||
# Step 1: Run preprocessing (sanitization, white-out detection, covering images)
|
||
preprocess_result = self._preprocess_page(page, page_num, doc)
|
||
covered_bboxes = preprocess_result.get('covered_word_bboxes', [])
|
||
|
||
# Get page-level metadata (for final Page metadata)
|
||
drawings = page.get_drawings()
|
||
links = page.get_links()
|
||
|
||
# Get page dimensions
|
||
rect = page.rect
|
||
dimensions = Dimensions(
|
||
width=rect.width,
|
||
height=rect.height,
|
||
dpi=72 # PDF standard DPI
|
||
)
|
||
|
||
# Extract tables first (if enabled) to get table regions
|
||
table_bboxes = []
|
||
if self.enable_table_detection:
|
||
try:
|
||
# Try native table detection (PyMuPDF 1.23.0+)
|
||
tables = page.find_tables()
|
||
for table_idx, table in enumerate(tables):
|
||
element = self._process_native_table(
|
||
table, page_num, element_counter
|
||
)
|
||
if element and element.bbox:
|
||
elements.append(element)
|
||
table_bboxes.append(element.bbox)
|
||
element_counter += 1
|
||
except AttributeError:
|
||
# Fallback to positional table detection
|
||
logger.debug("Native table detection not available, using positional detection")
|
||
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
|
||
for elem in table_elements:
|
||
if elem.bbox:
|
||
table_bboxes.append(elem.bbox)
|
||
elements.extend(table_elements)
|
||
element_counter += len(table_elements)
|
||
|
||
# Extract text blocks with formatting (sort=True for reading order)
|
||
# Filter out lines that overlap with table regions OR covered by white-out
|
||
text_dict = page.get_text("dict", sort=True)
|
||
for block_idx, block in enumerate(text_dict.get("blocks", [])):
|
||
if block.get("type") == 0: # Text block
|
||
element = self._process_text_block(
|
||
block, page_num, element_counter, table_bboxes
|
||
)
|
||
if element:
|
||
# Step 1.3: Skip text covered by white-out rectangles
|
||
if covered_bboxes and element.bbox:
|
||
if self._is_text_in_covered_regions(element.bbox, covered_bboxes):
|
||
logger.debug(f"Skipping white-out covered text: {element.element_id}")
|
||
continue
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# Extract images (if enabled)
|
||
if self.enable_image_extraction:
|
||
image_elements = self._extract_images(
|
||
page, page_num, document_id, element_counter, output_dir
|
||
)
|
||
elements.extend(image_elements)
|
||
element_counter += len(image_elements)
|
||
|
||
# Extract vector graphics (charts, diagrams) from drawing commands
|
||
# Pass table_bboxes to filter out table border drawings before clustering
|
||
if self.enable_image_extraction:
|
||
vector_elements = self._extract_vector_graphics(
|
||
page, page_num, document_id, element_counter, output_dir,
|
||
table_bboxes=table_bboxes
|
||
)
|
||
elements.extend(vector_elements)
|
||
element_counter += len(vector_elements)
|
||
|
||
# Extract hyperlinks
|
||
links = page.get_links()
|
||
for link_idx, link in enumerate(links):
|
||
# Create link annotation element if it has URI
|
||
if link.get("uri"):
|
||
from_rect = link.get("from")
|
||
if from_rect:
|
||
element = DocumentElement(
|
||
element_id=f"link_{page_num}_{element_counter}",
|
||
type=ElementType.REFERENCE,
|
||
content={"uri": link["uri"], "type": "hyperlink"},
|
||
bbox=BoundingBox(
|
||
x0=from_rect.x0,
|
||
y0=from_rect.y0,
|
||
x1=from_rect.x1,
|
||
y1=from_rect.y1
|
||
),
|
||
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
|
||
)
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# PyMuPDF's sort=True already provides good reading order for multi-column layouts
|
||
# (top-to-bottom, left-to-right within each row). We don't need to re-sort.
|
||
# NOTE: If sort=True is not used in get_text(), uncomment the line below:
|
||
# elements = self._sort_elements_for_reading_order(elements, dimensions)
|
||
|
||
# Deduplicate: Remove CHART elements that overlap with TABLE elements
|
||
# (Tables have structured data, so they take priority over vector graphics)
|
||
elements = self._deduplicate_table_chart_overlap(elements)
|
||
|
||
# Post-process elements for header/footer detection and structure
|
||
elements = self._detect_headers_footers(elements, dimensions)
|
||
elements = self._build_section_hierarchy(elements)
|
||
elements = self._build_nested_lists(elements)
|
||
|
||
# =====================================================================
|
||
# POST-PROCESSING PIPELINE
|
||
# =====================================================================
|
||
# Step 2.3: Filter page numbers
|
||
elements = self._filter_page_numbers(elements, dimensions.height)
|
||
|
||
# Step 3.2-3.3: Garble detection and OCR fallback recommendation
|
||
covering_images = preprocess_result.get('covering_images', [])
|
||
page_metadata = {
|
||
"has_drawings": len(drawings) > 0,
|
||
"drawing_count": len(drawings),
|
||
"link_count": len(links),
|
||
"preprocessing": {
|
||
"sanitized": preprocess_result.get('sanitized', False),
|
||
"whiteout_regions_found": len(covered_bboxes) - len(covering_images), # Vector rects only
|
||
"covering_images_found": len(covering_images),
|
||
"covering_images": covering_images # Full details for debugging
|
||
}
|
||
}
|
||
|
||
# Calculate garble rate for the page
|
||
if self.enable_garble_detection:
|
||
full_text = ' '.join(
|
||
elem.get_text() if hasattr(elem, 'get_text') else str(elem.content)
|
||
for elem in elements
|
||
if elem.type in [ElementType.TEXT, ElementType.PARAGRAPH, ElementType.TITLE]
|
||
)
|
||
garble_rate = self._calculate_garble_rate(full_text)
|
||
page_metadata['garble_rate'] = garble_rate
|
||
page_metadata['needs_ocr_fallback'] = self._should_fallback_to_ocr(full_text, page_num)
|
||
|
||
return Page(
|
||
page_number=page_num,
|
||
elements=elements,
|
||
dimensions=dimensions,
|
||
metadata=page_metadata
|
||
)
|
||
|
||
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""
|
||
Sort elements by reading order, handling multi-column layouts.
|
||
|
||
For multi-column layouts (e.g., two-column documents), this ensures
|
||
elements are ordered correctly: top-to-bottom, then left-to-right
|
||
within each row.
|
||
|
||
Args:
|
||
elements: List of document elements
|
||
dimensions: Page dimensions
|
||
|
||
Returns:
|
||
Sorted list of elements in reading order
|
||
"""
|
||
if not elements:
|
||
return elements
|
||
|
||
# Detect if page has multi-column layout
|
||
text_elements = [e for e in elements if e.bbox and e.is_text]
|
||
if len(text_elements) < 3:
|
||
# Too few elements to determine layout, just sort by Y position
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
# Cluster x-positions to detect columns
|
||
x_positions = [e.bbox.x0 for e in text_elements]
|
||
columns = self._detect_columns(x_positions, dimensions.width)
|
||
|
||
if len(columns) <= 1:
|
||
# Single column layout - simple top-to-bottom sort
|
||
logger.debug(f"Detected single-column layout")
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}")
|
||
|
||
# Multi-column layout - use newspaper-style reading order
|
||
# (complete left column, then right column, etc.)
|
||
# This is more appropriate for technical documents and data sheets
|
||
element_data = []
|
||
for elem in elements:
|
||
if not elem.bbox:
|
||
element_data.append((elem, 0, 0))
|
||
continue
|
||
|
||
# Find which column this element belongs to
|
||
col_idx = 0
|
||
min_dist = float('inf')
|
||
for i, col_x in enumerate(columns):
|
||
dist = abs(elem.bbox.x0 - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
col_idx = i
|
||
|
||
element_data.append((elem, col_idx, elem.bbox.y0))
|
||
|
||
# Sort by: column first, then Y position within column
|
||
# This gives newspaper-style reading: complete column 1, then column 2, etc.
|
||
element_data.sort(key=lambda x: (x[1], x[2]))
|
||
|
||
logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)")
|
||
return [e[0] for e in element_data]
|
||
|
||
def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]:
|
||
"""
|
||
Detect column positions from x-coordinates of text elements.
|
||
|
||
Args:
|
||
x_positions: List of x-coordinates (left edges of text)
|
||
page_width: Page width in points
|
||
|
||
Returns:
|
||
List of column x-positions (sorted left to right)
|
||
"""
|
||
if not x_positions:
|
||
return []
|
||
|
||
# Cluster x-positions to find column starts
|
||
# Use k-means-like approach: find groups of x-positions
|
||
threshold = page_width * 0.15 # 15% of page width as clustering threshold
|
||
|
||
sorted_x = sorted(set(x_positions))
|
||
if not sorted_x:
|
||
return []
|
||
|
||
clusters = [[sorted_x[0]]]
|
||
|
||
for x in sorted_x[1:]:
|
||
# Check if x belongs to current cluster
|
||
cluster_center = sum(clusters[-1]) / len(clusters[-1])
|
||
if abs(x - cluster_center) < threshold:
|
||
clusters[-1].append(x)
|
||
else:
|
||
# Start new cluster
|
||
clusters.append([x])
|
||
|
||
# Return average x position of each cluster (column start)
|
||
column_positions = [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
# Filter out columns that are too close to each other
|
||
min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart
|
||
filtered_columns = [column_positions[0]]
|
||
for col_x in column_positions[1:]:
|
||
if col_x - filtered_columns[-1] >= min_column_width:
|
||
filtered_columns.append(col_x)
|
||
|
||
return filtered_columns
|
||
|
||
def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""Detect and mark header/footer elements based on page position"""
|
||
page_height = dimensions.height
|
||
header_threshold = page_height * 0.1 # Top 10% of page
|
||
footer_threshold = page_height * 0.9 # Bottom 10% of page
|
||
|
||
for elem in elements:
|
||
# Skip non-text elements
|
||
if not elem.is_text:
|
||
continue
|
||
|
||
# Check if element is in header region
|
||
if elem.bbox.y1 <= header_threshold:
|
||
# Only mark as header if it's short text
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.HEADER
|
||
elem.metadata['is_page_header'] = True
|
||
|
||
# Check if element is in footer region
|
||
elif elem.bbox.y0 >= footer_threshold:
|
||
# Short text in footer region
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.FOOTER
|
||
elem.metadata['is_page_footer'] = True
|
||
|
||
return elements
|
||
|
||
def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build hierarchical section structure based on font sizes"""
|
||
# Collect all headers with their font sizes
|
||
headers = []
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
# Get average font size from style
|
||
font_size = 12.0 # Default
|
||
if elem.style and elem.style.font_size:
|
||
font_size = elem.style.font_size
|
||
headers.append((elem, font_size))
|
||
|
||
if not headers:
|
||
return elements
|
||
|
||
# Sort headers by font size to determine hierarchy levels
|
||
font_sizes = sorted(set(size for _, size in headers), reverse=True)
|
||
size_to_level = {size: level for level, size in enumerate(font_sizes, 1)}
|
||
|
||
# Assign section levels to headers
|
||
for elem, font_size in headers:
|
||
level = size_to_level.get(font_size, 1)
|
||
elem.metadata['section_level'] = level
|
||
elem.metadata['font_size'] = font_size
|
||
|
||
# Build parent-child relationships between headers
|
||
header_stack = [] # Stack of (element, level)
|
||
for elem, font_size in headers:
|
||
level = elem.metadata['section_level']
|
||
|
||
# Pop headers that are at same or lower level (larger font)
|
||
while header_stack and header_stack[-1][1] >= level:
|
||
header_stack.pop()
|
||
|
||
# Set parent header
|
||
if header_stack:
|
||
parent = header_stack[-1][0]
|
||
elem.metadata['parent_section'] = parent.element_id
|
||
if 'child_sections' not in parent.metadata:
|
||
parent.metadata['child_sections'] = []
|
||
parent.metadata['child_sections'].append(elem.element_id)
|
||
|
||
header_stack.append((elem, level))
|
||
|
||
# Link content to nearest preceding header at same or higher level
|
||
current_header = None
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
current_header = elem
|
||
elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]:
|
||
elem.metadata['section_id'] = current_header.element_id
|
||
|
||
return elements
|
||
|
||
def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build nested list structure from flat list items"""
|
||
# Group list items
|
||
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
|
||
if not list_items:
|
||
return elements
|
||
|
||
# Sort by position (top to bottom)
|
||
list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0))
|
||
|
||
# Detect indentation levels based on x position
|
||
x_positions = [item.bbox.x0 for item in list_items]
|
||
if not x_positions:
|
||
return elements
|
||
|
||
min_x = min(x_positions)
|
||
indent_unit = 20 # Typical indent size in points
|
||
|
||
# Assign nesting levels
|
||
for item in list_items:
|
||
indent = item.bbox.x0 - min_x
|
||
level = int(indent / indent_unit)
|
||
item.metadata['list_level'] = level
|
||
|
||
# Build parent-child relationships
|
||
item_stack = [] # Stack of (element, level)
|
||
for item in list_items:
|
||
level = item.metadata.get('list_level', 0)
|
||
|
||
# Pop items at same or deeper level
|
||
while item_stack and item_stack[-1][1] >= level:
|
||
item_stack.pop()
|
||
|
||
# Set parent
|
||
if item_stack:
|
||
parent = item_stack[-1][0]
|
||
item.metadata['parent_item'] = parent.element_id
|
||
if 'children' not in parent.metadata:
|
||
parent.metadata['children'] = []
|
||
parent.metadata['children'].append(item.element_id)
|
||
# Also add to actual children list
|
||
parent.children.append(item)
|
||
|
||
item_stack.append((item, level))
|
||
|
||
return elements
|
||
|
||
def _process_text_block(self, block: Dict, page_num: int, counter: int,
|
||
table_bboxes: List[BoundingBox] = None) -> Optional[DocumentElement]:
|
||
"""
|
||
Process a text block into a DocumentElement.
|
||
|
||
Args:
|
||
block: Text block from PyMuPDF
|
||
page_num: Page number
|
||
counter: Element counter
|
||
table_bboxes: List of table bounding boxes to filter overlapping lines
|
||
|
||
Returns:
|
||
DocumentElement or None if all lines overlap with tables
|
||
"""
|
||
if table_bboxes is None:
|
||
table_bboxes = []
|
||
|
||
# Extract text content and span information
|
||
# Filter out lines that significantly overlap with table regions
|
||
text_parts = []
|
||
styles = []
|
||
span_children = [] # Store span-level children for inline styling
|
||
span_counter = 0
|
||
valid_line_bboxes = [] # Track bboxes of valid lines for overall bbox calculation
|
||
|
||
for line in block.get("lines", []):
|
||
line_bbox_data = line.get("bbox", [0, 0, 0, 0])
|
||
|
||
# Check if this line overlaps with any table region
|
||
line_overlaps_table = False
|
||
for table_bbox in table_bboxes:
|
||
overlap_x0 = max(line_bbox_data[0], table_bbox.x0)
|
||
overlap_y0 = max(line_bbox_data[1], table_bbox.y0)
|
||
overlap_x1 = min(line_bbox_data[2], table_bbox.x1)
|
||
overlap_y1 = min(line_bbox_data[3], table_bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
# Calculate overlap ratio
|
||
line_height = line_bbox_data[3] - line_bbox_data[1]
|
||
overlap_height = overlap_y1 - overlap_y0
|
||
if line_height > 0:
|
||
overlap_ratio = overlap_height / line_height
|
||
if overlap_ratio >= 0.5: # Line significantly overlaps with table
|
||
line_overlaps_table = True
|
||
break
|
||
|
||
if line_overlaps_table:
|
||
continue # Skip this line
|
||
|
||
# Process valid line
|
||
valid_line_bboxes.append(line_bbox_data)
|
||
|
||
for span in line.get("spans", []):
|
||
text = span.get("text", "")
|
||
if text:
|
||
text_parts.append(text)
|
||
|
||
# Extract style information
|
||
style = StyleInfo(
|
||
font_name=span.get("font"),
|
||
font_size=span.get("size"),
|
||
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
|
||
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
|
||
text_color=span.get("color")
|
||
)
|
||
styles.append(style)
|
||
|
||
# Create span child element for inline styling
|
||
span_bbox_data = span.get("bbox", [0, 0, 0, 0])
|
||
span_bbox = BoundingBox(
|
||
x0=span_bbox_data[0],
|
||
y0=span_bbox_data[1],
|
||
x1=span_bbox_data[2],
|
||
y1=span_bbox_data[3]
|
||
)
|
||
|
||
span_element = DocumentElement(
|
||
element_id=f"span_{page_num}_{counter}_{span_counter}",
|
||
type=ElementType.TEXT, # Spans are always text
|
||
content=text,
|
||
bbox=span_bbox,
|
||
style=style,
|
||
confidence=1.0,
|
||
metadata={"span_index": span_counter}
|
||
)
|
||
span_children.append(span_element)
|
||
span_counter += 1
|
||
|
||
if not text_parts:
|
||
return None # All lines overlapped with tables
|
||
|
||
full_text = "".join(text_parts)
|
||
|
||
# Calculate bbox from valid lines only
|
||
if valid_line_bboxes:
|
||
min_x0 = min(b[0] for b in valid_line_bboxes)
|
||
min_y0 = min(b[1] for b in valid_line_bboxes)
|
||
max_x1 = max(b[2] for b in valid_line_bboxes)
|
||
max_y1 = max(b[3] for b in valid_line_bboxes)
|
||
bbox = BoundingBox(x0=min_x0, y0=min_y0, x1=max_x1, y1=max_y1)
|
||
else:
|
||
# Fallback to original bbox if no valid lines found
|
||
bbox_data = block.get("bbox", [0, 0, 0, 0])
|
||
bbox = BoundingBox(x0=bbox_data[0], y0=bbox_data[1], x1=bbox_data[2], y1=bbox_data[3])
|
||
|
||
# Determine element type based on content and style
|
||
element_type = self._infer_element_type(full_text, styles)
|
||
|
||
# Use the most common style for the block
|
||
if styles:
|
||
block_style = styles[0] # Could be improved with style merging
|
||
else:
|
||
block_style = None
|
||
|
||
return DocumentElement(
|
||
element_id=f"text_{page_num}_{counter}",
|
||
type=element_type,
|
||
content=full_text,
|
||
bbox=bbox,
|
||
style=block_style,
|
||
confidence=1.0, # Direct extraction has perfect confidence
|
||
children=span_children # Store span children for inline styling
|
||
)
|
||
|
||
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
|
||
"""Infer element type based on text content and styling"""
|
||
text_lower = text.lower().strip()
|
||
|
||
# Check for common patterns
|
||
if len(text_lower) < 100 and styles:
|
||
# Short text with large font might be title/header
|
||
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
|
||
if avg_size > 16:
|
||
return ElementType.TITLE
|
||
elif avg_size > 14:
|
||
return ElementType.HEADER
|
||
|
||
# Check for list patterns
|
||
if re.match(r'^[\d•·▪▫◦‣⁃]\s', text_lower):
|
||
return ElementType.LIST_ITEM
|
||
|
||
# Check for page numbers
|
||
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
|
||
return ElementType.PAGE_NUMBER
|
||
|
||
# Check for footnote patterns
|
||
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
|
||
return ElementType.FOOTNOTE
|
||
|
||
# Default to paragraph for longer text, text for shorter
|
||
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
|
||
|
||
def _is_likely_chart(self, data: list, table) -> bool:
|
||
"""
|
||
Detect if a "table" detected by find_tables() is actually a chart/graph.
|
||
|
||
Charts often get misclassified as tables because they have grid lines.
|
||
Characteristics of a chart misclassified as table:
|
||
1. High percentage of empty cells (>60%)
|
||
2. Content patterns that look like axis labels (numbers, units like °C, %, etc.)
|
||
3. Single cell contains multi-line text with chart-like patterns
|
||
4. Cell content contains typical chart axis patterns
|
||
|
||
Args:
|
||
data: Extracted table data (list of lists)
|
||
table: PyMuPDF table object
|
||
|
||
Returns:
|
||
True if the table is likely a chart
|
||
"""
|
||
if not data:
|
||
return False
|
||
|
||
# Count total cells and empty cells
|
||
total_cells = 0
|
||
empty_cells = 0
|
||
multi_line_cells = 0
|
||
axis_pattern_cells = 0
|
||
|
||
# Patterns that suggest chart axis labels
|
||
import re
|
||
axis_patterns = [
|
||
r'^-?\d+$', # Simple numbers (axis ticks)
|
||
r'^-?\d+\.?\d*$', # Decimal numbers
|
||
r'°[CF]', # Temperature units
|
||
r'%$', # Percentage
|
||
r'\bppm\b', # Parts per million
|
||
r'\bmin\b', # Minutes
|
||
r'\bsec\b', # Seconds
|
||
r'\bTime\b', # Time axis label
|
||
r'\bTemperature\b', # Temperature axis label
|
||
r'[Aa]xis', # Axis label
|
||
]
|
||
|
||
for row in data:
|
||
for cell in row:
|
||
total_cells += 1
|
||
cell_text = str(cell).strip() if cell else ""
|
||
|
||
if not cell_text:
|
||
empty_cells += 1
|
||
else:
|
||
# Check for multi-line content
|
||
if '\n' in cell_text:
|
||
multi_line_cells += 1
|
||
|
||
# Check for axis patterns
|
||
for pattern in axis_patterns:
|
||
if re.search(pattern, cell_text, re.IGNORECASE):
|
||
axis_pattern_cells += 1
|
||
break
|
||
|
||
# Calculate metrics
|
||
empty_ratio = empty_cells / total_cells if total_cells > 0 else 0
|
||
|
||
# Decision criteria for chart detection:
|
||
# 1. Very high empty cell ratio (>70%) suggests it's a chart grid
|
||
if empty_ratio > 0.7:
|
||
logger.debug(f"Chart detection: high empty ratio {empty_ratio:.2f} (>70%)")
|
||
return True
|
||
|
||
# 2. High empty ratio + axis patterns suggests chart
|
||
if empty_ratio > 0.5 and axis_pattern_cells >= 3:
|
||
logger.debug(f"Chart detection: empty ratio {empty_ratio:.2f} + {axis_pattern_cells} axis patterns")
|
||
return True
|
||
|
||
# 3. Multi-line cell with axis patterns in first cell (often chart legend text)
|
||
if multi_line_cells >= 1 and axis_pattern_cells >= 2:
|
||
first_cell = str(data[0][0]).strip() if data and data[0] else ""
|
||
if '\n' in first_cell and len(first_cell.split('\n')) >= 5:
|
||
logger.debug(f"Chart detection: first cell has {len(first_cell.split(chr(10)))} lines with axis patterns")
|
||
return True
|
||
|
||
return False
|
||
|
||
def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]:
|
||
"""Process a natively detected table"""
|
||
try:
|
||
# Extract table data
|
||
data = table.extract()
|
||
if not data or len(data) < self.min_table_rows:
|
||
return None
|
||
|
||
# Check if this "table" is actually a chart (misclassified by find_tables)
|
||
if self._is_likely_chart(data, table):
|
||
logger.info(f"Skipping table_{page_num}_{counter} - detected as chart (not table)")
|
||
return None
|
||
|
||
# Get table bounding box
|
||
bbox_data = table.bbox
|
||
bbox = BoundingBox(
|
||
x0=bbox_data[0],
|
||
y0=bbox_data[1],
|
||
x1=bbox_data[2],
|
||
y1=bbox_data[3]
|
||
)
|
||
|
||
# Extract column widths from table cells by analyzing X boundaries
|
||
column_widths = []
|
||
if hasattr(table, 'cells') and table.cells:
|
||
# Collect all unique X boundaries (both left and right edges)
|
||
x_boundaries = set()
|
||
for cell in table.cells:
|
||
x_boundaries.add(round(cell[0], 1)) # x0 (left edge)
|
||
x_boundaries.add(round(cell[2], 1)) # x1 (right edge)
|
||
|
||
# Sort boundaries to get column edges
|
||
sorted_x = sorted(x_boundaries)
|
||
|
||
# Calculate column widths from adjacent boundaries
|
||
if len(sorted_x) >= 2:
|
||
column_widths = [sorted_x[i+1] - sorted_x[i] for i in range(len(sorted_x)-1)]
|
||
logger.debug(f"Calculated column widths from {len(sorted_x)} boundaries: {column_widths}")
|
||
|
||
# Extract row heights from table cells by analyzing Y boundaries
|
||
row_heights = []
|
||
if hasattr(table, 'cells') and table.cells:
|
||
# Collect all unique Y boundaries (both top and bottom edges)
|
||
y_boundaries = set()
|
||
for cell in table.cells:
|
||
y_boundaries.add(round(cell[1], 1)) # y0 (top edge)
|
||
y_boundaries.add(round(cell[3], 1)) # y1 (bottom edge)
|
||
|
||
# Sort boundaries to get row edges
|
||
sorted_y = sorted(y_boundaries)
|
||
|
||
# Calculate row heights from adjacent boundaries
|
||
if len(sorted_y) >= 2:
|
||
row_heights = [sorted_y[i+1] - sorted_y[i] for i in range(len(sorted_y)-1)]
|
||
logger.debug(f"Calculated row heights from {len(sorted_y)} boundaries: {row_heights}")
|
||
|
||
# Create table cells
|
||
# Note: Include ALL cells (even empty ones) to preserve table structure
|
||
# This is critical for correct HTML generation and PDF rendering
|
||
cells = []
|
||
for row_idx, row in enumerate(data):
|
||
for col_idx, cell_text in enumerate(row):
|
||
# Always add cell, even if empty, to maintain table structure
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=str(cell_text) if cell_text else ""
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(data),
|
||
cols=max(len(row) for row in data) if data else 0,
|
||
cells=cells,
|
||
headers=data[0] if data else None # Assume first row is header
|
||
)
|
||
|
||
# Store column widths and row heights in metadata
|
||
metadata = {}
|
||
if column_widths:
|
||
metadata["column_widths"] = column_widths
|
||
if row_heights:
|
||
metadata["row_heights"] = row_heights
|
||
metadata = metadata if metadata else None
|
||
|
||
return DocumentElement(
|
||
element_id=f"table_{page_num}_{counter}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata=metadata
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing native table: {e}")
|
||
return None
|
||
|
||
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
|
||
"""Detect tables by analyzing text positioning"""
|
||
tables = []
|
||
|
||
# Get all words with positions
|
||
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
|
||
|
||
if not words:
|
||
return tables
|
||
|
||
# Group words by approximate row (y-coordinate)
|
||
rows = {}
|
||
for word in words:
|
||
y = round(word[1] / 5) * 5 # Round to nearest 5 points
|
||
if y not in rows:
|
||
rows[y] = []
|
||
rows[y].append({
|
||
'x0': word[0],
|
||
'y0': word[1],
|
||
'x1': word[2],
|
||
'y1': word[3],
|
||
'text': word[4],
|
||
'block': word[5] if len(word) > 5 else 0
|
||
})
|
||
|
||
# Sort rows by y-coordinate
|
||
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
|
||
|
||
# Find potential tables (consecutive rows with multiple columns)
|
||
current_table_rows = []
|
||
tables_found = []
|
||
|
||
for y, words_in_row in sorted_rows:
|
||
words_in_row.sort(key=lambda w: w['x0'])
|
||
|
||
if len(words_in_row) >= self.min_table_cols:
|
||
# Check if this could be a table row
|
||
x_positions = [w['x0'] for w in words_in_row]
|
||
|
||
# Check for somewhat regular spacing
|
||
if self._has_regular_spacing(x_positions):
|
||
current_table_rows.append((y, words_in_row))
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
|
||
# Don't forget the last table
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
|
||
# Convert detected tables to DocumentElements
|
||
for table_idx, table_rows in enumerate(tables_found):
|
||
if not table_rows:
|
||
continue
|
||
|
||
# Calculate table bounding box
|
||
all_words = []
|
||
for _, words in table_rows:
|
||
all_words.extend(words)
|
||
|
||
min_x = min(w['x0'] for w in all_words)
|
||
min_y = min(w['y0'] for w in all_words)
|
||
max_x = max(w['x1'] for w in all_words)
|
||
max_y = max(w['y1'] for w in all_words)
|
||
|
||
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
|
||
|
||
# Create table cells
|
||
cells = []
|
||
for row_idx, (y, words) in enumerate(table_rows):
|
||
# Group words into columns
|
||
columns = self._group_into_columns(words, table_rows)
|
||
for col_idx, col_text in enumerate(columns):
|
||
if col_text:
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=col_text
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(table_rows),
|
||
cols=max(len(self._group_into_columns(words, table_rows))
|
||
for _, words in table_rows),
|
||
cells=cells
|
||
)
|
||
|
||
element = DocumentElement(
|
||
element_id=f"table_{page_num}_{counter + table_idx}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=0.8, # Lower confidence for positional detection
|
||
metadata={"detection_method": "positional"}
|
||
)
|
||
tables.append(element)
|
||
|
||
return tables
|
||
|
||
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
|
||
"""Check if x positions have somewhat regular spacing"""
|
||
if len(x_positions) < 3:
|
||
return False
|
||
|
||
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
|
||
avg_spacing = sum(spacings) / len(spacings)
|
||
|
||
# Check if spacings are within tolerance of average
|
||
for spacing in spacings:
|
||
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
|
||
"""Group words into columns based on x-position"""
|
||
if not words:
|
||
return []
|
||
|
||
# Find common column positions across all rows
|
||
all_x_positions = []
|
||
for _, row_words in all_rows:
|
||
all_x_positions.extend([w['x0'] for w in row_words])
|
||
|
||
# Cluster x-positions to find columns
|
||
column_positions = self._cluster_positions(all_x_positions)
|
||
|
||
# Assign words to columns
|
||
columns = [""] * len(column_positions)
|
||
for word in words:
|
||
# Find closest column
|
||
closest_col = 0
|
||
min_dist = float('inf')
|
||
for col_idx, col_x in enumerate(column_positions):
|
||
dist = abs(word['x0'] - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
closest_col = col_idx
|
||
|
||
if columns[closest_col]:
|
||
columns[closest_col] += " " + word['text']
|
||
else:
|
||
columns[closest_col] = word['text']
|
||
|
||
return columns
|
||
|
||
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
|
||
"""Cluster positions to find common columns"""
|
||
if not positions:
|
||
return []
|
||
|
||
sorted_pos = sorted(positions)
|
||
clusters = [[sorted_pos[0]]]
|
||
|
||
for pos in sorted_pos[1:]:
|
||
# Check if position belongs to current cluster
|
||
if pos - clusters[-1][-1] < threshold:
|
||
clusters[-1].append(pos)
|
||
else:
|
||
clusters.append([pos])
|
||
|
||
# Return average position of each cluster
|
||
return [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
def _extract_images(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path]) -> List[DocumentElement]:
|
||
"""Extract images from page"""
|
||
elements = []
|
||
image_list = page.get_images()
|
||
|
||
for img_idx, img in enumerate(image_list):
|
||
try:
|
||
xref = img[0]
|
||
|
||
# Get image position(s)
|
||
img_rects = page.get_image_rects(xref)
|
||
if not img_rects:
|
||
continue
|
||
|
||
rect = img_rects[0] # Use first occurrence
|
||
bbox = BoundingBox(
|
||
x0=rect.x0,
|
||
y0=rect.y0,
|
||
x1=rect.x1,
|
||
y1=rect.y1
|
||
)
|
||
|
||
# Extract image data
|
||
pix = fitz.Pixmap(page.parent, xref)
|
||
image_data = {
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"xref": xref
|
||
}
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
|
||
image_path = output_dir / image_filename
|
||
pix.save(str(image_path))
|
||
# Store relative filename only (consistent with OCR track)
|
||
# PDF generator will join with result_dir to get full path
|
||
image_data["saved_path"] = image_filename
|
||
logger.debug(f"Saved image to {image_path}")
|
||
|
||
element = DocumentElement(
|
||
element_id=f"image_{page_num}_{counter + img_idx}",
|
||
type=ElementType.IMAGE,
|
||
content=image_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata={
|
||
"image_index": img_idx,
|
||
"xref": xref
|
||
}
|
||
)
|
||
elements.append(element)
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting image {img_idx}: {e}")
|
||
|
||
return elements
|
||
|
||
def has_missing_images(self, page: fitz.Page) -> bool:
|
||
"""
|
||
Detect if a page likely has images that weren't extracted.
|
||
|
||
This checks for inline image blocks (type=1 in text dict) which indicate
|
||
graphics composed of many small image blocks (like logos) that
|
||
page.get_images() cannot detect.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
|
||
Returns:
|
||
True if there are likely missing images that need OCR extraction
|
||
"""
|
||
try:
|
||
# Check if get_images found anything
|
||
standard_images = page.get_images()
|
||
if standard_images:
|
||
return False # Standard images were found, no need for fallback
|
||
|
||
# Check for inline image blocks (type=1)
|
||
text_dict = page.get_text("dict", sort=True)
|
||
blocks = text_dict.get("blocks", [])
|
||
|
||
image_block_count = sum(1 for b in blocks if b.get("type") == 1)
|
||
|
||
# If there are many inline image blocks, likely there's a logo or graphic
|
||
if image_block_count >= 10:
|
||
logger.info(f"Detected {image_block_count} inline image blocks - may need OCR for image extraction")
|
||
return True
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Error checking for missing images: {e}")
|
||
return False
|
||
|
||
def check_document_for_missing_images(self, pdf_path: Path) -> List[int]:
|
||
"""
|
||
Check a PDF document for pages that likely have missing images.
|
||
|
||
This opens the PDF and checks each page for inline image blocks
|
||
that weren't extracted by get_images().
|
||
|
||
Args:
|
||
pdf_path: Path to the PDF file
|
||
|
||
Returns:
|
||
List of page numbers (1-indexed) that have missing images
|
||
"""
|
||
pages_with_missing_images = []
|
||
|
||
try:
|
||
doc = fitz.open(str(pdf_path))
|
||
for page_num in range(len(doc)):
|
||
page = doc[page_num]
|
||
if self.has_missing_images(page):
|
||
pages_with_missing_images.append(page_num + 1) # 1-indexed
|
||
doc.close()
|
||
|
||
if pages_with_missing_images:
|
||
logger.info(f"Document has missing images on pages: {pages_with_missing_images}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error checking document for missing images: {e}")
|
||
|
||
return pages_with_missing_images
|
||
|
||
def render_inline_image_regions(
|
||
self,
|
||
pdf_path: Path,
|
||
unified_doc: 'UnifiedDocument',
|
||
pages: List[int],
|
||
output_dir: Optional[Path] = None
|
||
) -> int:
|
||
"""
|
||
Render inline image regions and add them to the unified document.
|
||
|
||
This is a fallback when OCR doesn't detect images. It clusters inline
|
||
image blocks (type=1) and renders them as images.
|
||
|
||
Args:
|
||
pdf_path: Path to the PDF file
|
||
unified_doc: UnifiedDocument to add images to
|
||
pages: List of page numbers (1-indexed) to process
|
||
output_dir: Directory to save rendered images
|
||
|
||
Returns:
|
||
Number of images added
|
||
"""
|
||
images_added = 0
|
||
|
||
try:
|
||
doc = fitz.open(str(pdf_path))
|
||
|
||
for page_num in pages:
|
||
if page_num < 1 or page_num > len(doc):
|
||
continue
|
||
|
||
page = doc[page_num - 1] # 0-indexed
|
||
page_rect = page.rect
|
||
|
||
# Get inline image blocks
|
||
text_dict = page.get_text("dict", sort=True)
|
||
blocks = text_dict.get("blocks", [])
|
||
|
||
image_blocks = []
|
||
for block in blocks:
|
||
if block.get("type") == 1: # Image block
|
||
bbox = block.get("bbox")
|
||
if bbox:
|
||
image_blocks.append(fitz.Rect(bbox))
|
||
|
||
if len(image_blocks) < 5: # Reduced from 10
|
||
logger.debug(f"Page {page_num}: Only {len(image_blocks)} inline image blocks, skipping")
|
||
continue
|
||
|
||
logger.info(f"Page {page_num}: Found {len(image_blocks)} inline image blocks")
|
||
|
||
# Cluster nearby image blocks
|
||
regions = self._cluster_nearby_rects(image_blocks, tolerance=5.0)
|
||
logger.info(f"Page {page_num}: Clustered into {len(regions)} regions")
|
||
|
||
# Find the corresponding page in unified_doc
|
||
target_page = None
|
||
for p in unified_doc.pages:
|
||
if p.page_number == page_num:
|
||
target_page = p
|
||
break
|
||
|
||
if not target_page:
|
||
continue
|
||
|
||
for region_idx, region_rect in enumerate(regions):
|
||
logger.info(f"Page {page_num} region {region_idx}: {region_rect} (w={region_rect.width:.1f}, h={region_rect.height:.1f})")
|
||
|
||
# Skip very small regions
|
||
if region_rect.width < 30 or region_rect.height < 30:
|
||
logger.info(f" -> Skipped: too small (min 30x30)")
|
||
continue
|
||
|
||
# Skip regions that are primarily in the table area (below top 40%)
|
||
# But allow regions that START in the top portion
|
||
page_30_pct = page_rect.height * 0.3
|
||
page_40_pct = page_rect.height * 0.4
|
||
if region_rect.y0 > page_40_pct:
|
||
logger.info(f" -> Skipped: y0={region_rect.y0:.1f} > 40% of page ({page_40_pct:.1f})")
|
||
continue
|
||
|
||
logger.info(f"Rendering inline image region {region_idx} on page {page_num}: {region_rect}")
|
||
|
||
try:
|
||
# Add small padding
|
||
clip_rect = region_rect + (-2, -2, 2, 2)
|
||
clip_rect.intersect(page_rect)
|
||
|
||
# Render at 2x resolution
|
||
mat = fitz.Matrix(2, 2)
|
||
pix = page.get_pixmap(clip=clip_rect, matrix=mat, alpha=False)
|
||
|
||
# Create bounding box
|
||
bbox = BoundingBox(
|
||
x0=clip_rect.x0,
|
||
y0=clip_rect.y0,
|
||
x1=clip_rect.x1,
|
||
y1=clip_rect.y1
|
||
)
|
||
|
||
image_data = {
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": "rgb",
|
||
"type": "inline_region"
|
||
}
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
doc_id = unified_doc.document_id or "unknown"
|
||
image_filename = f"{doc_id}_p{page_num}_logo{region_idx}.png"
|
||
image_path = output_dir / image_filename
|
||
pix.save(str(image_path))
|
||
image_data["saved_path"] = image_filename
|
||
logger.info(f"Saved inline image region to {image_path}")
|
||
|
||
element = DocumentElement(
|
||
element_id=f"logo_{page_num}_{region_idx}",
|
||
type=ElementType.LOGO,
|
||
content=image_data,
|
||
bbox=bbox,
|
||
confidence=0.9,
|
||
metadata={
|
||
"region_type": "inline_image_blocks",
|
||
"block_count": len(image_blocks)
|
||
}
|
||
)
|
||
target_page.elements.append(element)
|
||
images_added += 1
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering inline image region {region_idx}: {e}")
|
||
|
||
doc.close()
|
||
|
||
if images_added > 0:
|
||
logger.info(f"Added {images_added} inline image regions to document")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering inline image regions: {e}")
|
||
|
||
return images_added
|
||
|
||
def _cluster_nearby_rects(self, rects: List[fitz.Rect], tolerance: float = 5.0) -> List[fitz.Rect]:
|
||
"""Cluster nearby rectangles into regions."""
|
||
if not rects:
|
||
return []
|
||
|
||
sorted_rects = sorted(rects, key=lambda r: (r.y0, r.x0))
|
||
|
||
merged = []
|
||
for rect in sorted_rects:
|
||
merged_with_existing = False
|
||
for i, region in enumerate(merged):
|
||
expanded = region + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded.intersects(rect):
|
||
merged[i] = region | rect
|
||
merged_with_existing = True
|
||
break
|
||
if not merged_with_existing:
|
||
merged.append(rect)
|
||
|
||
# Second pass: merge any regions that now overlap
|
||
changed = True
|
||
while changed:
|
||
changed = False
|
||
new_merged = []
|
||
skip = set()
|
||
|
||
for i, r1 in enumerate(merged):
|
||
if i in skip:
|
||
continue
|
||
current = r1
|
||
for j, r2 in enumerate(merged[i+1:], start=i+1):
|
||
if j in skip:
|
||
continue
|
||
expanded = current + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded.intersects(r2):
|
||
current = current | r2
|
||
skip.add(j)
|
||
changed = True
|
||
new_merged.append(current)
|
||
merged = new_merged
|
||
|
||
return merged
|
||
|
||
def _extract_vector_graphics(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path],
|
||
table_bboxes: Optional[List[BoundingBox]] = None) -> List[DocumentElement]:
|
||
"""
|
||
Extract vector graphics (charts, diagrams) from page.
|
||
|
||
This method identifies regions that are composed of vector drawing commands
|
||
(paths, lines, rectangles) rather than embedded raster images. These are
|
||
typically charts created in Excel, vector diagrams, or other graphics.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
page_num: Page number (1-indexed)
|
||
document_id: Unique document identifier
|
||
counter: Starting counter for element IDs
|
||
output_dir: Directory to save rendered graphics
|
||
table_bboxes: List of table bounding boxes to exclude table border drawings
|
||
|
||
Returns:
|
||
List of DocumentElement objects representing vector graphics
|
||
"""
|
||
elements = []
|
||
|
||
try:
|
||
# Get all drawing commands
|
||
drawings = page.get_drawings()
|
||
if not drawings:
|
||
return elements
|
||
|
||
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
|
||
|
||
# Filter out drawings that are likely table borders
|
||
# Table borders are typically thin rectangular lines within table regions
|
||
non_table_drawings = self._filter_table_border_drawings(drawings, table_bboxes)
|
||
logger.debug(f"After filtering table borders: {len(non_table_drawings)} drawings remain")
|
||
|
||
if not non_table_drawings:
|
||
logger.debug("All drawings appear to be table borders, no vector graphics to extract")
|
||
return elements
|
||
|
||
# Cluster drawings into groups (charts, diagrams, etc.)
|
||
try:
|
||
# Use custom clustering that only considers non-table drawings
|
||
drawing_clusters = self._cluster_non_table_drawings(page, non_table_drawings)
|
||
logger.debug(f"Clustered into {len(drawing_clusters)} groups")
|
||
except (AttributeError, TypeError) as e:
|
||
# cluster_drawings not available or has different signature
|
||
# Fallback: try to identify charts by analyzing drawing density
|
||
logger.warning(f"Custom clustering failed ({e}), using fallback method")
|
||
drawing_clusters = self._cluster_drawings_fallback(page, non_table_drawings)
|
||
|
||
for cluster_idx, bbox in enumerate(drawing_clusters):
|
||
# Ignore small regions (likely noise or separator lines)
|
||
if bbox.width < 50 or bbox.height < 50:
|
||
logger.debug(f"Skipping small cluster {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f}")
|
||
continue
|
||
|
||
# Render the region to a raster image
|
||
# matrix=fitz.Matrix(2, 2) increases resolution to ~200 DPI
|
||
try:
|
||
pix = page.get_pixmap(clip=bbox, matrix=fitz.Matrix(2, 2))
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
filename = f"{document_id}_p{page_num}_chart{cluster_idx}.png"
|
||
filepath = output_dir / filename
|
||
pix.save(str(filepath))
|
||
|
||
# Create DocumentElement
|
||
image_data = {
|
||
"saved_path": str(filepath),
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"source": "vector_graphics"
|
||
}
|
||
|
||
element = DocumentElement(
|
||
element_id=f"chart_{page_num}_{counter + cluster_idx}",
|
||
type=ElementType.CHART, # Use CHART type for vector graphics
|
||
content=image_data,
|
||
bbox=BoundingBox(
|
||
x0=bbox.x0,
|
||
y0=bbox.y0,
|
||
x1=bbox.x1,
|
||
y1=bbox.y1
|
||
),
|
||
confidence=0.85, # Slightly lower confidence than raster images
|
||
metadata={
|
||
"cluster_index": cluster_idx,
|
||
"drawing_count": len(drawings)
|
||
}
|
||
)
|
||
elements.append(element)
|
||
logger.debug(f"Extracted chart {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f} -> {filepath}")
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering vector graphic cluster {cluster_idx}: {e}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting vector graphics: {e}")
|
||
|
||
return elements
|
||
|
||
def _cluster_drawings_fallback(self, page: fitz.Page, drawings: list) -> list:
|
||
"""
|
||
Fallback method to cluster drawings when cluster_drawings() is not available.
|
||
|
||
This uses a simple spatial clustering approach based on bounding boxes.
|
||
"""
|
||
if not drawings:
|
||
return []
|
||
|
||
# Collect all drawing bounding boxes
|
||
bboxes = []
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if rect:
|
||
bboxes.append(fitz.Rect(rect))
|
||
|
||
if not bboxes:
|
||
return []
|
||
|
||
# Simple clustering: merge overlapping or nearby rectangles
|
||
clusters = []
|
||
tolerance = 20
|
||
|
||
for bbox in bboxes:
|
||
# Try to merge with existing cluster
|
||
merged = False
|
||
for i, cluster in enumerate(clusters):
|
||
# Check if bbox is close to this cluster
|
||
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded_cluster.intersects(bbox):
|
||
# Merge bbox into cluster
|
||
clusters[i] = cluster | bbox # Union of rectangles
|
||
merged = True
|
||
break
|
||
|
||
if not merged:
|
||
# Create new cluster
|
||
clusters.append(bbox)
|
||
|
||
# Filter out very small clusters
|
||
filtered_clusters = [c for c in clusters if c.width >= 50 and c.height >= 50]
|
||
|
||
logger.debug(f"Fallback clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
|
||
|
||
return filtered_clusters
|
||
|
||
def _filter_table_border_drawings(self, drawings: list, table_bboxes: Optional[List[BoundingBox]]) -> list:
|
||
"""
|
||
Filter out drawings that are likely table borders.
|
||
|
||
Table borders are typically:
|
||
- Thin rectangular lines (height or width < 5pt)
|
||
- Located within or on the edge of table bounding boxes
|
||
|
||
Args:
|
||
drawings: List of PyMuPDF drawing objects
|
||
table_bboxes: List of table bounding boxes
|
||
|
||
Returns:
|
||
List of drawings that are NOT table borders (likely logos, charts, etc.)
|
||
"""
|
||
if not table_bboxes:
|
||
return drawings
|
||
|
||
non_table_drawings = []
|
||
table_border_count = 0
|
||
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if not rect:
|
||
continue
|
||
|
||
draw_rect = fitz.Rect(rect)
|
||
|
||
# Check if this drawing is a thin line (potential table border)
|
||
is_thin_line = draw_rect.width < 5 or draw_rect.height < 5
|
||
|
||
# Check if drawing overlaps significantly with any table
|
||
overlaps_table = False
|
||
for table_bbox in table_bboxes:
|
||
table_rect = fitz.Rect(table_bbox.x0, table_bbox.y0, table_bbox.x1, table_bbox.y1)
|
||
|
||
# Expand table rect slightly to include border lines on edges
|
||
expanded_table = table_rect + (-5, -5, 5, 5)
|
||
|
||
if expanded_table.contains(draw_rect) or expanded_table.intersects(draw_rect):
|
||
# Calculate overlap ratio
|
||
intersection = draw_rect & expanded_table
|
||
if not intersection.is_empty:
|
||
overlap_ratio = intersection.get_area() / draw_rect.get_area() if draw_rect.get_area() > 0 else 0
|
||
|
||
# If drawing is mostly inside table region, it's likely a border
|
||
if overlap_ratio > 0.8:
|
||
overlaps_table = True
|
||
break
|
||
|
||
# Keep drawing if it's NOT (thin line AND overlapping table)
|
||
# This keeps: logos (complex shapes), charts outside tables, etc.
|
||
if is_thin_line and overlaps_table:
|
||
table_border_count += 1
|
||
else:
|
||
non_table_drawings.append(drawing)
|
||
|
||
if table_border_count > 0:
|
||
logger.debug(f"Filtered out {table_border_count} table border drawings")
|
||
|
||
return non_table_drawings
|
||
|
||
def _cluster_non_table_drawings(self, page: fitz.Page, drawings: list) -> list:
|
||
"""
|
||
Cluster non-table drawings into groups.
|
||
|
||
This method clusters drawings that have been pre-filtered to exclude table borders.
|
||
It uses a more conservative clustering approach suitable for logos and charts.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
drawings: Pre-filtered list of drawings (excluding table borders)
|
||
|
||
Returns:
|
||
List of fitz.Rect representing clustered drawing regions
|
||
"""
|
||
if not drawings:
|
||
return []
|
||
|
||
# Collect all drawing bounding boxes
|
||
bboxes = []
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if rect:
|
||
bboxes.append(fitz.Rect(rect))
|
||
|
||
if not bboxes:
|
||
return []
|
||
|
||
# More conservative clustering with smaller tolerance
|
||
# This prevents grouping distant graphics together
|
||
clusters = []
|
||
tolerance = 10 # Smaller tolerance than fallback (was 20)
|
||
|
||
for bbox in bboxes:
|
||
# Try to merge with existing cluster
|
||
merged = False
|
||
for i, cluster in enumerate(clusters):
|
||
# Check if bbox is close to this cluster
|
||
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded_cluster.intersects(bbox):
|
||
# Merge bbox into cluster
|
||
clusters[i] = cluster | bbox # Union of rectangles
|
||
merged = True
|
||
break
|
||
|
||
if not merged:
|
||
# Create new cluster
|
||
clusters.append(bbox)
|
||
|
||
# Filter out very small clusters (noise)
|
||
# Keep minimum 30x30 for logos (smaller than default 50x50)
|
||
filtered_clusters = [c for c in clusters if c.width >= 30 and c.height >= 30]
|
||
|
||
logger.debug(f"Non-table clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
|
||
|
||
return filtered_clusters
|
||
|
||
def _deduplicate_table_chart_overlap(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""
|
||
Intelligently resolve TABLE-CHART overlaps based on table structure completeness.
|
||
|
||
When a region is detected as both TABLE and CHART:
|
||
- Calculate cell completeness = actual_cells / (rows × cols)
|
||
- If completeness ≥50% → Real table with complete structure → Keep TABLE
|
||
- If completeness <50% → False positive (chart detected as table) → Keep CHART
|
||
|
||
Args:
|
||
elements: List of extracted elements
|
||
|
||
Returns:
|
||
Filtered list with low-quality overlaps removed
|
||
"""
|
||
# Collect all tables and charts
|
||
tables = [elem for elem in elements if elem.type == ElementType.TABLE]
|
||
charts = [elem for elem in elements if elem.type == ElementType.CHART]
|
||
|
||
if not tables or not charts:
|
||
return elements # No potential conflicts
|
||
|
||
# Analyze TABLE structure completeness
|
||
table_completeness = {}
|
||
for table in tables:
|
||
if hasattr(table.content, 'rows') and hasattr(table.content, 'cols') and hasattr(table.content, 'cells'):
|
||
expected_cells = table.content.rows * table.content.cols
|
||
actual_cells = len(table.content.cells)
|
||
|
||
if expected_cells > 0:
|
||
completeness = actual_cells / expected_cells
|
||
table_completeness[table.element_id] = completeness
|
||
else:
|
||
table_completeness[table.element_id] = 0.0
|
||
else:
|
||
table_completeness[table.element_id] = 0.0
|
||
|
||
# Check overlaps and decide what to keep
|
||
filtered_elements = []
|
||
removed_charts = 0
|
||
removed_tables = 0
|
||
|
||
# Process TABLEs
|
||
for table in tables:
|
||
if not table.bbox:
|
||
filtered_elements.append(table)
|
||
continue
|
||
|
||
# Check if this TABLE overlaps with any CHART
|
||
overlaps_chart = False
|
||
for chart in charts:
|
||
if not chart.bbox:
|
||
continue
|
||
|
||
# Calculate overlap
|
||
overlap_x0 = max(table.bbox.x0, chart.bbox.x0)
|
||
overlap_y0 = max(table.bbox.y0, chart.bbox.y0)
|
||
overlap_x1 = min(table.bbox.x1, chart.bbox.x1)
|
||
overlap_y1 = min(table.bbox.y1, chart.bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
||
table_area = (table.bbox.x1 - table.bbox.x0) * (table.bbox.y1 - table.bbox.y0)
|
||
|
||
if table_area > 0:
|
||
overlap_ratio = overlap_area / table_area
|
||
|
||
if overlap_ratio >= 0.8:
|
||
overlaps_chart = True
|
||
completeness = table_completeness.get(table.element_id, 0.0)
|
||
|
||
logger.debug(
|
||
f"TABLE-CHART overlap: {table.element_id} vs {chart.element_id}: "
|
||
f"{overlap_ratio*100:.1f}% overlap, TABLE cell completeness: {completeness*100:.1f}%"
|
||
)
|
||
|
||
# Decision: Keep TABLE only if structure is complete
|
||
if completeness < 0.5: # <50% cell completeness
|
||
logger.info(
|
||
f"Removing incomplete TABLE {table.element_id} "
|
||
f"({completeness*100:.1f}% completeness, overlaps with CHART {chart.element_id})"
|
||
)
|
||
removed_tables += 1
|
||
break
|
||
else:
|
||
logger.info(
|
||
f"Keeping TABLE {table.element_id} with {completeness*100:.1f}% completeness "
|
||
f"(will remove overlapping CHART {chart.element_id})"
|
||
)
|
||
|
||
if not overlaps_chart or table_completeness.get(table.element_id, 0.0) >= 0.5:
|
||
filtered_elements.append(table)
|
||
|
||
# Process CHARTs
|
||
for chart in charts:
|
||
if not chart.bbox:
|
||
filtered_elements.append(chart)
|
||
continue
|
||
|
||
# Check if this CHART should be removed due to overlap with high-quality TABLE
|
||
should_remove = False
|
||
for table in tables:
|
||
if not table.bbox:
|
||
continue
|
||
|
||
# Calculate overlap
|
||
overlap_x0 = max(chart.bbox.x0, table.bbox.x0)
|
||
overlap_y0 = max(chart.bbox.y0, table.bbox.y0)
|
||
overlap_x1 = min(chart.bbox.x1, table.bbox.x1)
|
||
overlap_y1 = min(chart.bbox.y1, table.bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
||
chart_area = (chart.bbox.x1 - chart.bbox.x0) * (chart.bbox.y1 - chart.bbox.y0)
|
||
|
||
if chart_area > 0:
|
||
overlap_ratio = overlap_area / chart_area
|
||
|
||
if overlap_ratio >= 0.8:
|
||
completeness = table_completeness.get(table.element_id, 0.0)
|
||
|
||
# Remove CHART only if TABLE structure is complete
|
||
if completeness >= 0.5:
|
||
should_remove = True
|
||
logger.info(
|
||
f"Removing CHART {chart.element_id} "
|
||
f"({overlap_ratio*100:.1f}% overlap with TABLE {table.element_id} having {completeness*100:.1f}% completeness)"
|
||
)
|
||
removed_charts += 1
|
||
break
|
||
|
||
if not should_remove:
|
||
filtered_elements.append(chart)
|
||
|
||
# Process all other elements
|
||
for elem in elements:
|
||
if elem.type not in [ElementType.TABLE, ElementType.CHART]:
|
||
filtered_elements.append(elem)
|
||
|
||
if removed_charts > 0 or removed_tables > 0:
|
||
logger.info(
|
||
f"Deduplication complete: removed {removed_tables} incomplete TABLE(s), "
|
||
f"{removed_charts} overlapping CHART(s)"
|
||
)
|
||
|
||
return filtered_elements
|
||
|
||
# =========================================================================
|
||
# PDF Preprocessing Pipeline Methods
|
||
# =========================================================================
|
||
|
||
def _preprocess_page(self, page: fitz.Page, page_num: int, doc: fitz.Document = None) -> Dict[str, Any]:
|
||
"""
|
||
Run preprocessing pipeline on a page before extraction.
|
||
|
||
Pipeline steps:
|
||
1. Content sanitization (clean_contents)
|
||
2. Hidden layer detection (OCG)
|
||
3. White-out/black-out detection (vector rectangles)
|
||
4. Covering image detection (embedded black/white images)
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
page_num: Page number (1-indexed)
|
||
doc: PyMuPDF document object (needed for image analysis)
|
||
|
||
Returns:
|
||
Dict with preprocessing results:
|
||
- covered_word_bboxes: List of bboxes for text covered by rectangles/images
|
||
- covering_images: List of covering image info
|
||
- hidden_layers: List of hidden OCG layer names
|
||
- sanitized: Whether content was sanitized
|
||
"""
|
||
result = {
|
||
'covered_word_bboxes': [],
|
||
'covering_images': [],
|
||
'hidden_layers': [],
|
||
'sanitized': False
|
||
}
|
||
|
||
# Step 1.1: Content sanitization
|
||
if self.enable_content_sanitization:
|
||
try:
|
||
page.clean_contents(sanitize=True)
|
||
result['sanitized'] = True
|
||
logger.debug(f"Page {page_num}: Content stream sanitized")
|
||
except Exception as e:
|
||
logger.warning(f"Page {page_num}: Content sanitization failed: {e}")
|
||
|
||
# Step 1.3: White-out/black-out detection (vector rectangles)
|
||
if self.enable_whiteout_detection:
|
||
covered = self._detect_whiteout_covered_text(page, page_num)
|
||
result['covered_word_bboxes'] = [fitz.Rect(w['bbox']) for w in covered]
|
||
result['covered_words_detail'] = covered # Include color_type info
|
||
if covered:
|
||
# Count by color type
|
||
white_covered = sum(1 for c in covered if c.get('color_type') == 'white')
|
||
black_covered = sum(1 for c in covered if c.get('color_type') == 'black')
|
||
other_covered = len(covered) - white_covered - black_covered
|
||
logger.info(f"Page {page_num}: Detected {len(covered)} covered text regions "
|
||
f"(white: {white_covered}, black/redaction: {black_covered}, other: {other_covered})")
|
||
|
||
# Step 1.4: Covering image detection (embedded black/white images)
|
||
if self.enable_whiteout_detection and doc is not None:
|
||
covering_images = self._detect_covering_images(page, doc, page_num)
|
||
result['covering_images'] = covering_images
|
||
# Add covering image bboxes to the covered_word_bboxes list
|
||
for img in covering_images:
|
||
result['covered_word_bboxes'].append(fitz.Rect(img['bbox']))
|
||
if covering_images:
|
||
black_imgs = sum(1 for c in covering_images if c['color_type'] == 'image_black')
|
||
white_imgs = sum(1 for c in covering_images if c['color_type'] == 'image_white')
|
||
logger.info(f"Page {page_num}: Detected {len(covering_images)} covering images "
|
||
f"(black: {black_imgs}, white: {white_imgs})")
|
||
|
||
return result
|
||
|
||
def _detect_whiteout_covered_text(self, page: fitz.Page, page_num: int) -> List[Dict]:
|
||
"""
|
||
Detect text covered by solid color rectangles (white-out, black redaction, or any solid fill).
|
||
|
||
Uses IoU (Intersection over Union) to determine if text is covered.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
page_num: Page number for logging
|
||
|
||
Returns:
|
||
List of dicts with covered text info: {'text', 'bbox', 'coverage', 'color_type'}
|
||
"""
|
||
covered_words = []
|
||
page_rect = page.rect # Page boundaries
|
||
|
||
# Get all drawings and find solid-filled rectangles
|
||
drawings = page.get_drawings()
|
||
covering_rects = [] # List of (rect, color_type)
|
||
|
||
for d in drawings:
|
||
fill_color = d.get('fill')
|
||
if fill_color and isinstance(fill_color, (tuple, list)) and len(fill_color) >= 3:
|
||
r, g, b = fill_color[:3]
|
||
rect = d.get('rect')
|
||
if not rect:
|
||
continue
|
||
|
||
fitz_rect = fitz.Rect(rect)
|
||
|
||
# Skip very small rectangles (likely not covering blocks)
|
||
if fitz_rect.width < 5 or fitz_rect.height < 5:
|
||
continue
|
||
|
||
# Skip rectangles completely outside page boundaries
|
||
if not fitz_rect.intersects(page_rect):
|
||
continue
|
||
|
||
# Clip rectangle to page boundaries
|
||
fitz_rect = fitz_rect & page_rect
|
||
|
||
# Detect white rectangles (white-out / correction tape)
|
||
# Must be pure white (>= 0.98) to avoid false positives from light backgrounds
|
||
if r >= 0.98 and g >= 0.98 and b >= 0.98:
|
||
covering_rects.append((fitz_rect, 'white'))
|
||
# Detect black rectangles (redaction / censoring)
|
||
# Must be pure black (<= 0.02) to avoid false positives from dark elements
|
||
elif r <= 0.02 and g <= 0.02 and b <= 0.02:
|
||
covering_rects.append((fitz_rect, 'black'))
|
||
|
||
if not covering_rects:
|
||
return covered_words
|
||
|
||
# Log detected covering rectangles by type
|
||
white_count = sum(1 for _, t in covering_rects if t == 'white')
|
||
black_count = sum(1 for _, t in covering_rects if t == 'black')
|
||
logger.debug(f"Page {page_num}: Found {len(covering_rects)} potential covering rectangles "
|
||
f"(white: {white_count}, black/redaction: {black_count})")
|
||
|
||
# Get all text words with bounding boxes
|
||
# words format: (x0, y0, x1, y1, word, block_no, line_no, word_no)
|
||
words = page.get_text("words")
|
||
|
||
for word_info in words:
|
||
word_rect = fitz.Rect(word_info[:4])
|
||
word_text = word_info[4]
|
||
word_area = word_rect.width * word_rect.height
|
||
|
||
if word_area <= 0:
|
||
continue
|
||
|
||
for cover_rect, color_type in covering_rects:
|
||
# Calculate intersection
|
||
intersection = word_rect & cover_rect
|
||
if intersection.is_empty:
|
||
continue
|
||
|
||
intersection_area = intersection.width * intersection.height
|
||
coverage_ratio = intersection_area / word_area
|
||
|
||
# Check if coverage exceeds IoU threshold
|
||
if coverage_ratio >= self.whiteout_iou_threshold:
|
||
covered_words.append({
|
||
'text': word_text,
|
||
'bbox': tuple(word_rect),
|
||
'coverage': coverage_ratio,
|
||
'color_type': color_type
|
||
})
|
||
break # Word is covered, no need to check other rects
|
||
|
||
return covered_words
|
||
|
||
def _detect_covering_images(self, page: fitz.Page, doc: fitz.Document, page_num: int) -> List[Dict]:
|
||
"""
|
||
Detect embedded images that are mostly black/white AND actually cover text.
|
||
|
||
Only reports images that:
|
||
1. Are mostly solid black or white
|
||
2. Are within page boundaries
|
||
3. Actually overlap with text content (IoU check)
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
doc: PyMuPDF document object (needed for image extraction)
|
||
page_num: Page number for logging
|
||
|
||
Returns:
|
||
List of dicts with covering image info: {'bbox', 'color_type', 'avg_color', 'covered_text_count'}
|
||
"""
|
||
covering_images = []
|
||
page_rect = page.rect # Page boundaries
|
||
|
||
try:
|
||
# Get all images on the page with their positions
|
||
image_list = page.get_images(full=True)
|
||
|
||
if not image_list:
|
||
return covering_images
|
||
|
||
# Get all text words for coverage check
|
||
words = page.get_text("words") # (x0, y0, x1, y1, word, block_no, line_no, word_no)
|
||
|
||
for img_info in image_list:
|
||
xref = img_info[0]
|
||
width = img_info[2]
|
||
height = img_info[3]
|
||
|
||
# Skip very small images (icons, bullets)
|
||
if width < 20 or height < 10:
|
||
continue
|
||
|
||
try:
|
||
# Extract image data
|
||
base_image = doc.extract_image(xref)
|
||
img_bytes = base_image.get('image')
|
||
if not img_bytes:
|
||
continue
|
||
|
||
# Analyze image color using PIL
|
||
from PIL import Image
|
||
import io
|
||
|
||
img = Image.open(io.BytesIO(img_bytes))
|
||
if img.mode != 'RGB':
|
||
img = img.convert('RGB')
|
||
|
||
# Sample pixels for efficiency (don't analyze every pixel)
|
||
img_small = img.resize((min(50, img.width), min(50, img.height)))
|
||
pixels = list(img_small.getdata())
|
||
|
||
if not pixels:
|
||
continue
|
||
|
||
avg_r = sum(p[0] for p in pixels) / len(pixels)
|
||
avg_g = sum(p[1] for p in pixels) / len(pixels)
|
||
avg_b = sum(p[2] for p in pixels) / len(pixels)
|
||
|
||
# Determine if image is mostly black or white
|
||
color_type = None
|
||
if avg_r <= 30 and avg_g <= 30 and avg_b <= 30:
|
||
color_type = 'image_black'
|
||
elif avg_r >= 245 and avg_g >= 245 and avg_b >= 245:
|
||
color_type = 'image_white'
|
||
|
||
if color_type:
|
||
# Get image position on page
|
||
for img_rect in page.get_image_rects(xref):
|
||
# Skip images completely outside page boundaries
|
||
if not img_rect.intersects(page_rect):
|
||
continue
|
||
|
||
# Clip image rect to page boundaries
|
||
clipped_rect = img_rect & page_rect
|
||
|
||
# Check if image actually covers any text (IoU check)
|
||
covered_text_count = 0
|
||
for word_info in words:
|
||
word_rect = fitz.Rect(word_info[:4])
|
||
word_area = word_rect.width * word_rect.height
|
||
if word_area <= 0:
|
||
continue
|
||
|
||
intersection = word_rect & clipped_rect
|
||
if not intersection.is_empty:
|
||
intersection_area = intersection.width * intersection.height
|
||
coverage_ratio = intersection_area / word_area
|
||
# Count as covered if >= 50% of word is under the image
|
||
if coverage_ratio >= 0.5:
|
||
covered_text_count += 1
|
||
|
||
# Only report if image actually covers text
|
||
if covered_text_count > 0:
|
||
covering_images.append({
|
||
'bbox': tuple(clipped_rect),
|
||
'color_type': color_type,
|
||
'avg_color': (avg_r, avg_g, avg_b),
|
||
'size': (width, height),
|
||
'covered_text_count': covered_text_count
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.debug(f"Page {page_num}: Failed to analyze image xref={xref}: {e}")
|
||
continue
|
||
|
||
if covering_images:
|
||
black_count = sum(1 for c in covering_images if c['color_type'] == 'image_black')
|
||
white_count = sum(1 for c in covering_images if c['color_type'] == 'image_white')
|
||
total_covered = sum(c.get('covered_text_count', 0) for c in covering_images)
|
||
logger.debug(f"Page {page_num}: Found {len(covering_images)} covering images "
|
||
f"(black: {black_count}, white: {white_count}, covering {total_covered} text regions)")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Page {page_num}: Failed to detect covering images: {e}")
|
||
|
||
return covering_images
|
||
|
||
def _get_hidden_ocg_layers(self, doc: fitz.Document) -> List[str]:
|
||
"""
|
||
Get list of hidden Optional Content Group (OCG) layer names.
|
||
|
||
Args:
|
||
doc: PyMuPDF document object
|
||
|
||
Returns:
|
||
List of hidden layer names
|
||
"""
|
||
hidden_layers = []
|
||
|
||
try:
|
||
ocgs = doc.get_ocgs()
|
||
if not ocgs:
|
||
return hidden_layers
|
||
|
||
for ocg_xref, ocg_info in ocgs.items():
|
||
# Check if layer is hidden by default
|
||
if ocg_info.get('on') == False:
|
||
layer_name = ocg_info.get('name', f'OCG_{ocg_xref}')
|
||
hidden_layers.append(layer_name)
|
||
logger.debug(f"Found hidden OCG layer: {layer_name}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to get OCG layers: {e}")
|
||
|
||
return hidden_layers
|
||
|
||
def _calculate_garble_rate(self, text: str) -> float:
|
||
"""
|
||
Calculate the rate of garbled characters in text.
|
||
|
||
Detects:
|
||
- (cid:xxxx) patterns (missing ToUnicode map)
|
||
- Replacement character U+FFFD
|
||
- Private Use Area (PUA) characters
|
||
|
||
Args:
|
||
text: Text to analyze
|
||
|
||
Returns:
|
||
Garble rate as float between 0.0 and 1.0
|
||
"""
|
||
if not text:
|
||
return 0.0
|
||
|
||
# Count (cid:xxxx) patterns
|
||
cid_pattern = r'\(cid:\d+\)'
|
||
cid_matches = re.findall(cid_pattern, text)
|
||
cid_char_count = sum(len(m) for m in cid_matches)
|
||
|
||
# Count replacement characters (U+FFFD)
|
||
replacement_count = text.count('\ufffd')
|
||
|
||
# Count Private Use Area characters (U+E000 to U+F8FF)
|
||
pua_count = sum(1 for c in text if 0xE000 <= ord(c) <= 0xF8FF)
|
||
|
||
total_garble = cid_char_count + replacement_count + pua_count
|
||
total_chars = len(text)
|
||
|
||
return total_garble / total_chars if total_chars > 0 else 0.0
|
||
|
||
def _should_fallback_to_ocr(self, page_text: str, page_num: int) -> bool:
|
||
"""
|
||
Determine if page should use OCR fallback based on garble rate.
|
||
|
||
Args:
|
||
page_text: Extracted text from page
|
||
page_num: Page number for logging
|
||
|
||
Returns:
|
||
True if OCR fallback is recommended
|
||
"""
|
||
if not self.enable_garble_detection:
|
||
return False
|
||
|
||
garble_rate = self._calculate_garble_rate(page_text)
|
||
|
||
if garble_rate > self.garble_ocr_fallback_threshold:
|
||
logger.warning(
|
||
f"Page {page_num}: High garble rate detected ({garble_rate:.1%}). "
|
||
f"OCR fallback recommended."
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
def _is_page_number(self, text: str) -> bool:
|
||
"""
|
||
Check if text is likely a page number.
|
||
|
||
Args:
|
||
text: Text to check
|
||
|
||
Returns:
|
||
True if text matches page number patterns
|
||
"""
|
||
text = text.strip()
|
||
|
||
# Pure number
|
||
if text.isdigit() and len(text) <= 4:
|
||
return True
|
||
|
||
# Common patterns
|
||
patterns = [
|
||
r'^page\s*\d+$', # "Page 1"
|
||
r'^-?\s*\d+\s*-?$', # "- 1 -" or "-1-"
|
||
r'^\d+\s*/\s*\d+$', # "1/10"
|
||
r'^第\s*\d+\s*[頁页]$', # "第1頁" or "第1页"
|
||
r'^p\.?\s*\d+$', # "P.1" or "p1"
|
||
]
|
||
|
||
for pattern in patterns:
|
||
if re.match(pattern, text, re.IGNORECASE):
|
||
return True
|
||
|
||
return False
|
||
|
||
def _filter_page_numbers(self, elements: List[DocumentElement], page_height: float) -> List[DocumentElement]:
|
||
"""
|
||
Filter out page number elements.
|
||
|
||
Page numbers are typically:
|
||
- In the bottom 10% of the page
|
||
- Match numeric/page number patterns
|
||
|
||
Args:
|
||
elements: List of document elements
|
||
page_height: Page height for position calculation
|
||
|
||
Returns:
|
||
Filtered list without page numbers
|
||
"""
|
||
if not self.enable_page_number_filter:
|
||
return elements
|
||
|
||
filtered = []
|
||
removed_count = 0
|
||
|
||
for elem in elements:
|
||
# Only filter text elements
|
||
if elem.type not in [ElementType.TEXT, ElementType.PARAGRAPH]:
|
||
filtered.append(elem)
|
||
continue
|
||
|
||
# Check position - must be in bottom 10% of page
|
||
if elem.bbox:
|
||
y_rel = elem.bbox.y0 / page_height
|
||
if y_rel > 0.90:
|
||
# Get text content
|
||
text = elem.get_text() if hasattr(elem, 'get_text') else str(elem.content)
|
||
if self._is_page_number(text):
|
||
removed_count += 1
|
||
logger.debug(f"Filtered page number: '{text}'")
|
||
continue
|
||
|
||
filtered.append(elem)
|
||
|
||
if removed_count > 0:
|
||
logger.info(f"Filtered {removed_count} page number element(s)")
|
||
|
||
return filtered
|
||
|
||
def _is_text_in_covered_regions(self, bbox: BoundingBox, covered_bboxes: List[fitz.Rect]) -> bool:
|
||
"""
|
||
Check if a text bbox overlaps with any covered (white-out) regions.
|
||
|
||
Args:
|
||
bbox: Text bounding box
|
||
covered_bboxes: List of covered region rectangles
|
||
|
||
Returns:
|
||
True if text overlaps with covered regions
|
||
"""
|
||
if not covered_bboxes or not bbox:
|
||
return False
|
||
|
||
text_rect = fitz.Rect(bbox.x0, bbox.y0, bbox.x1, bbox.y1)
|
||
|
||
for covered_rect in covered_bboxes:
|
||
if text_rect.intersects(covered_rect):
|
||
# Calculate overlap ratio
|
||
intersection = text_rect & covered_rect
|
||
if not intersection.is_empty:
|
||
text_area = text_rect.width * text_rect.height
|
||
if text_area > 0:
|
||
overlap_ratio = (intersection.width * intersection.height) / text_area
|
||
if overlap_ratio >= self.whiteout_iou_threshold:
|
||
return True
|
||
|
||
return False
|
||
|
||
# =========================================================================
|
||
# Phase 4: GS Distillation - Exception Handler
|
||
# =========================================================================
|
||
|
||
@staticmethod
|
||
def is_ghostscript_available() -> bool:
|
||
"""Check if Ghostscript is available on the system."""
|
||
import shutil
|
||
return shutil.which('gs') is not None
|
||
|
||
def _should_trigger_gs_repair(self, file_path: Path) -> Tuple[bool, str]:
|
||
"""
|
||
Determine if Ghostscript repair should be triggered.
|
||
|
||
Triggers on:
|
||
1. High garble rate (>10% cid:xxxx patterns) in extracted text
|
||
2. Severe mupdf structural errors during opening
|
||
|
||
Args:
|
||
file_path: Path to PDF file
|
||
|
||
Returns:
|
||
Tuple of (should_repair, reason)
|
||
"""
|
||
import io
|
||
import sys
|
||
|
||
reason = ""
|
||
|
||
try:
|
||
# Capture mupdf warnings
|
||
old_stderr = sys.stderr
|
||
sys.stderr = captured_stderr = io.StringIO()
|
||
|
||
doc = fitz.open(str(file_path))
|
||
|
||
# Restore stderr and get warnings
|
||
sys.stderr = old_stderr
|
||
warnings = captured_stderr.getvalue()
|
||
|
||
# Check for severe structural errors
|
||
severe_keywords = ['error', 'invalid xref', 'corrupt', 'damaged', 'repair']
|
||
for keyword in severe_keywords:
|
||
if keyword.lower() in warnings.lower():
|
||
reason = f"Structural error detected: {keyword}"
|
||
doc.close()
|
||
return True, reason
|
||
|
||
# Check garble rate on first page
|
||
if len(doc) > 0:
|
||
page = doc[0]
|
||
text = page.get_text("text")
|
||
|
||
garble_rate = self._calculate_garble_rate(text)
|
||
if garble_rate > self.garble_ocr_fallback_threshold:
|
||
reason = f"High garble rate: {garble_rate:.1%}"
|
||
doc.close()
|
||
return True, reason
|
||
|
||
doc.close()
|
||
return False, ""
|
||
|
||
except Exception as e:
|
||
reason = f"Error opening PDF: {str(e)}"
|
||
return True, reason
|
||
|
||
def _repair_pdf_with_gs(self, input_path: Path, output_path: Path) -> bool:
|
||
"""
|
||
Repair a PDF using Ghostscript distillation.
|
||
|
||
This re-renders the PDF through Ghostscript's PDF interpreter,
|
||
which can fix many structural issues.
|
||
|
||
Args:
|
||
input_path: Path to input PDF
|
||
output_path: Path to save repaired PDF
|
||
|
||
Returns:
|
||
True if repair succeeded, False otherwise
|
||
"""
|
||
import subprocess
|
||
import shutil
|
||
|
||
if not self.is_ghostscript_available():
|
||
logger.warning("Ghostscript not available, cannot repair PDF")
|
||
return False
|
||
|
||
try:
|
||
# GS command for PDF repair/distillation
|
||
cmd = [
|
||
'gs',
|
||
'-dNOPAUSE',
|
||
'-dBATCH',
|
||
'-dSAFER',
|
||
'-sDEVICE=pdfwrite',
|
||
'-dPDFSETTINGS=/prepress',
|
||
'-dDetectDuplicateImages=true',
|
||
'-dCompressFonts=true',
|
||
'-dSubsetFonts=true',
|
||
f'-sOutputFile={output_path}',
|
||
str(input_path)
|
||
]
|
||
|
||
logger.info(f"Running Ghostscript repair: {' '.join(cmd)}")
|
||
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60 # 60 second timeout
|
||
)
|
||
|
||
if result.returncode == 0 and output_path.exists():
|
||
logger.info(f"Ghostscript repair successful: {output_path}")
|
||
return True
|
||
else:
|
||
logger.error(f"Ghostscript repair failed: {result.stderr}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
logger.error("Ghostscript repair timed out")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"Ghostscript repair error: {e}")
|
||
return False
|
||
|
||
def extract_with_repair(self,
|
||
file_path: Path,
|
||
output_dir: Optional[Path] = None,
|
||
enable_gs_repair: bool = False) -> UnifiedDocument:
|
||
"""
|
||
Extract content with optional Ghostscript repair for damaged PDFs.
|
||
|
||
This method first checks if the PDF needs repair, and if so,
|
||
attempts to repair it using Ghostscript before extraction.
|
||
|
||
Args:
|
||
file_path: Path to PDF file
|
||
output_dir: Optional directory to save extracted images
|
||
enable_gs_repair: Whether to attempt GS repair on problematic PDFs
|
||
|
||
Returns:
|
||
UnifiedDocument with extracted content
|
||
"""
|
||
import tempfile
|
||
|
||
# Check if repair is needed and enabled
|
||
if enable_gs_repair:
|
||
should_repair, reason = self._should_trigger_gs_repair(file_path)
|
||
|
||
if should_repair:
|
||
logger.warning(f"PDF repair triggered: {reason}")
|
||
|
||
if self.is_ghostscript_available():
|
||
# Create temporary file for repaired PDF
|
||
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp:
|
||
tmp_path = Path(tmp.name)
|
||
|
||
try:
|
||
if self._repair_pdf_with_gs(file_path, tmp_path):
|
||
logger.info("Using repaired PDF for extraction")
|
||
result = self.extract(tmp_path, output_dir)
|
||
# Add repair metadata
|
||
if result.metadata:
|
||
result.metadata.gs_repaired = True
|
||
return result
|
||
else:
|
||
logger.warning("GS repair failed, trying original file")
|
||
finally:
|
||
# Cleanup temp file
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
else:
|
||
logger.warning("Ghostscript not available, skipping repair")
|
||
|
||
# Normal extraction
|
||
return self.extract(file_path, output_dir)
|
||
|
||
def get_pages_needing_ocr(self, doc: UnifiedDocument) -> List[int]:
|
||
"""
|
||
Get list of page numbers that need OCR fallback.
|
||
|
||
This method checks each page's metadata for the 'needs_ocr_fallback' flag
|
||
set during extraction when high garble rates are detected.
|
||
|
||
Args:
|
||
doc: UnifiedDocument from extraction
|
||
|
||
Returns:
|
||
List of page numbers (1-indexed) that need OCR processing
|
||
"""
|
||
pages_needing_ocr = []
|
||
|
||
for page in doc.pages:
|
||
if page.metadata and page.metadata.get('needs_ocr_fallback', False):
|
||
pages_needing_ocr.append(page.page_number)
|
||
|
||
if pages_needing_ocr:
|
||
logger.info(f"Pages needing OCR fallback: {pages_needing_ocr}")
|
||
|
||
return pages_needing_ocr
|
||
|
||
def get_extraction_quality_report(self, doc: UnifiedDocument) -> Dict[str, Any]:
|
||
"""
|
||
Generate a quality report for the extraction.
|
||
|
||
This report helps determine if additional processing (OCR, manual review)
|
||
is needed.
|
||
|
||
Args:
|
||
doc: UnifiedDocument from extraction
|
||
|
||
Returns:
|
||
Dict with quality metrics:
|
||
- total_pages: int
|
||
- pages_with_issues: list of page numbers with problems
|
||
- average_garble_rate: float
|
||
- needs_ocr_fallback: bool (any page needs OCR)
|
||
- preprocessing_stats: dict with sanitization/whiteout counts
|
||
"""
|
||
report = {
|
||
'total_pages': len(doc.pages),
|
||
'pages_with_issues': [],
|
||
'garble_rates': {},
|
||
'average_garble_rate': 0.0,
|
||
'needs_ocr_fallback': False,
|
||
'preprocessing_stats': {
|
||
'pages_sanitized': 0,
|
||
'total_whiteout_regions': 0,
|
||
'total_covering_images': 0
|
||
}
|
||
}
|
||
|
||
total_garble = 0.0
|
||
pages_with_garble = 0
|
||
|
||
for page in doc.pages:
|
||
metadata = page.metadata or {}
|
||
|
||
# Check garble rate
|
||
garble_rate = metadata.get('garble_rate', 0.0)
|
||
if garble_rate > 0:
|
||
report['garble_rates'][page.page_number] = garble_rate
|
||
total_garble += garble_rate
|
||
pages_with_garble += 1
|
||
|
||
# Check OCR fallback flag
|
||
if metadata.get('needs_ocr_fallback', False):
|
||
report['pages_with_issues'].append(page.page_number)
|
||
report['needs_ocr_fallback'] = True
|
||
|
||
# Preprocessing stats
|
||
preprocessing = metadata.get('preprocessing', {})
|
||
if preprocessing.get('sanitized', False):
|
||
report['preprocessing_stats']['pages_sanitized'] += 1
|
||
report['preprocessing_stats']['total_whiteout_regions'] += preprocessing.get('whiteout_regions_found', 0)
|
||
report['preprocessing_stats']['total_covering_images'] += preprocessing.get('covering_images_found', 0)
|
||
|
||
# Calculate average garble rate
|
||
if pages_with_garble > 0:
|
||
report['average_garble_rate'] = total_garble / pages_with_garble
|
||
|
||
return report |