feat: add PDF preprocessing pipeline for Direct track

Implement multi-stage preprocessing pipeline to improve extraction quality:

Phase 1 - Object-level Cleaning:
- Content stream sanitization via clean_contents(sanitize=True)
- Hidden OCG layer detection
- White-out detection with IoU 80% threshold

Phase 2 - Layout Analysis:
- Column-aware sorting (sort=True)
- Page number pattern detection and filtering
- Position-based element classification

Phase 3 - Enhanced Extraction:
- Garble rate detection (cid:xxxx, U+FFFD, PUA characters)
- OCR fallback recommendation when garble >10%
- Quality report generation interface

Phase 4 - GS Distillation (Exception Handler):
- Ghostscript PDF repair for severely damaged files
- Auto-triggered on high garble or mupdf errors
- Graceful fallback when GS unavailable

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-12-03 16:11:00 +08:00
parent 1b5c7f39a8
commit 6a65c7617d
4 changed files with 1236 additions and 9 deletions

View File

@@ -40,7 +40,15 @@ class DirectExtractionEngine:
enable_table_detection: bool = True,
enable_image_extraction: bool = True,
min_table_rows: int = 2,
min_table_cols: int = 2):
min_table_cols: int = 2,
# Preprocessing pipeline options
enable_content_sanitization: bool = True,
enable_hidden_layer_removal: bool = True,
enable_whiteout_detection: bool = True,
whiteout_iou_threshold: float = 0.8,
enable_page_number_filter: bool = True,
enable_garble_detection: bool = True,
garble_ocr_fallback_threshold: float = 0.1):
"""
Initialize the extraction engine.
@@ -49,12 +57,30 @@ class DirectExtractionEngine:
enable_image_extraction: Whether to extract images
min_table_rows: Minimum rows for table detection
min_table_cols: Minimum columns for table detection
Preprocessing pipeline options:
enable_content_sanitization: Run clean_contents() to fix malformed PDF streams
enable_hidden_layer_removal: Remove content from hidden OCG layers
enable_whiteout_detection: Detect and filter text covered by white rectangles
whiteout_iou_threshold: IoU threshold for white-out detection (default 0.8)
enable_page_number_filter: Filter out detected page numbers
enable_garble_detection: Detect garbled text (cid:xxxx patterns)
garble_ocr_fallback_threshold: Garble rate threshold to recommend OCR fallback
"""
self.enable_table_detection = enable_table_detection
self.enable_image_extraction = enable_image_extraction
self.min_table_rows = min_table_rows
self.min_table_cols = min_table_cols
# Preprocessing pipeline options
self.enable_content_sanitization = enable_content_sanitization
self.enable_hidden_layer_removal = enable_hidden_layer_removal
self.enable_whiteout_detection = enable_whiteout_detection
self.whiteout_iou_threshold = whiteout_iou_threshold
self.enable_page_number_filter = enable_page_number_filter
self.enable_garble_detection = enable_garble_detection
self.garble_ocr_fallback_threshold = garble_ocr_fallback_threshold
def extract(self,
file_path: Path,
output_dir: Optional[Path] = None) -> UnifiedDocument:
@@ -186,10 +212,17 @@ class DirectExtractionEngine:
page_num: int,
document_id: str,
output_dir: Optional[Path]) -> Page:
"""Extract content from a single page"""
"""Extract content from a single page with preprocessing pipeline."""
elements = []
element_counter = 0
# =====================================================================
# PREPROCESSING PIPELINE
# =====================================================================
# Step 1: Run preprocessing (sanitization, white-out detection)
preprocess_result = self._preprocess_page(page, page_num)
covered_bboxes = preprocess_result.get('covered_word_bboxes', [])
# Get page-level metadata (for final Page metadata)
drawings = page.get_drawings()
links = page.get_links()
@@ -227,7 +260,7 @@ class DirectExtractionEngine:
element_counter += len(table_elements)
# Extract text blocks with formatting (sort=True for reading order)
# Filter out lines that overlap with table regions
# Filter out lines that overlap with table regions OR covered by white-out
text_dict = page.get_text("dict", sort=True)
for block_idx, block in enumerate(text_dict.get("blocks", [])):
if block.get("type") == 0: # Text block
@@ -235,6 +268,11 @@ class DirectExtractionEngine:
block, page_num, element_counter, table_bboxes
)
if element:
# Step 1.3: Skip text covered by white-out rectangles
if covered_bboxes and element.bbox:
if self._is_text_in_covered_regions(element.bbox, covered_bboxes):
logger.debug(f"Skipping white-out covered text: {element.element_id}")
continue
elements.append(element)
element_counter += 1
@@ -292,15 +330,39 @@ class DirectExtractionEngine:
elements = self._build_section_hierarchy(elements)
elements = self._build_nested_lists(elements)
# =====================================================================
# POST-PROCESSING PIPELINE
# =====================================================================
# Step 2.3: Filter page numbers
elements = self._filter_page_numbers(elements, dimensions.height)
# Step 3.2-3.3: Garble detection and OCR fallback recommendation
page_metadata = {
"has_drawings": len(drawings) > 0,
"drawing_count": len(drawings),
"link_count": len(links),
"preprocessing": {
"sanitized": preprocess_result.get('sanitized', False),
"whiteout_regions_found": len(covered_bboxes)
}
}
# Calculate garble rate for the page
if self.enable_garble_detection:
full_text = ' '.join(
elem.get_text() if hasattr(elem, 'get_text') else str(elem.content)
for elem in elements
if elem.type in [ElementType.TEXT, ElementType.PARAGRAPH, ElementType.TITLE]
)
garble_rate = self._calculate_garble_rate(full_text)
page_metadata['garble_rate'] = garble_rate
page_metadata['needs_ocr_fallback'] = self._should_fallback_to_ocr(full_text, page_num)
return Page(
page_number=page_num,
elements=elements,
dimensions=dimensions,
metadata={
"has_drawings": len(drawings) > 0,
"drawing_count": len(drawings),
"link_count": len(links)
}
metadata=page_metadata
)
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
@@ -1788,4 +1850,574 @@ class DirectExtractionEngine:
f"{removed_charts} overlapping CHART(s)"
)
return filtered_elements
return filtered_elements
# =========================================================================
# PDF Preprocessing Pipeline Methods
# =========================================================================
def _preprocess_page(self, page: fitz.Page, page_num: int) -> Dict[str, Any]:
"""
Run preprocessing pipeline on a page before extraction.
Pipeline steps:
1. Content sanitization (clean_contents)
2. Hidden layer detection (OCG)
3. White-out detection
Args:
page: PyMuPDF page object
page_num: Page number (1-indexed)
Returns:
Dict with preprocessing results:
- covered_word_bboxes: List of bboxes for text covered by white rectangles
- hidden_layers: List of hidden OCG layer names
- sanitized: Whether content was sanitized
"""
result = {
'covered_word_bboxes': [],
'hidden_layers': [],
'sanitized': False
}
# Step 1.1: Content sanitization
if self.enable_content_sanitization:
try:
page.clean_contents(sanitize=True)
result['sanitized'] = True
logger.debug(f"Page {page_num}: Content stream sanitized")
except Exception as e:
logger.warning(f"Page {page_num}: Content sanitization failed: {e}")
# Step 1.3: White-out detection
if self.enable_whiteout_detection:
covered = self._detect_whiteout_covered_text(page, page_num)
result['covered_word_bboxes'] = [fitz.Rect(w['bbox']) for w in covered]
if covered:
logger.info(f"Page {page_num}: Detected {len(covered)} text regions covered by white-out")
return result
def _detect_whiteout_covered_text(self, page: fitz.Page, page_num: int) -> List[Dict]:
"""
Detect text covered by white rectangles ("white-out" / "correction tape" effect).
Uses IoU (Intersection over Union) to determine if text is covered.
Args:
page: PyMuPDF page object
page_num: Page number for logging
Returns:
List of dicts with covered text info: {'text', 'bbox', 'coverage'}
"""
covered_words = []
# Get all drawings and find white-filled rectangles
drawings = page.get_drawings()
white_rects = []
for d in drawings:
fill_color = d.get('fill')
# Check for white fill (RGB all 1.0 or close to it)
if fill_color:
if isinstance(fill_color, (tuple, list)) and len(fill_color) >= 3:
r, g, b = fill_color[:3]
# Allow slight tolerance for "almost white"
if r >= 0.95 and g >= 0.95 and b >= 0.95:
rect = d.get('rect')
if rect:
white_rects.append(fitz.Rect(rect))
if not white_rects:
return covered_words
logger.debug(f"Page {page_num}: Found {len(white_rects)} white rectangles")
# Get all text words with bounding boxes
# words format: (x0, y0, x1, y1, word, block_no, line_no, word_no)
words = page.get_text("words")
for word_info in words:
word_rect = fitz.Rect(word_info[:4])
word_text = word_info[4]
word_area = word_rect.width * word_rect.height
if word_area <= 0:
continue
for white_rect in white_rects:
# Calculate intersection
intersection = word_rect & white_rect
if intersection.is_empty:
continue
intersection_area = intersection.width * intersection.height
coverage_ratio = intersection_area / word_area
# Check if coverage exceeds IoU threshold
if coverage_ratio >= self.whiteout_iou_threshold:
covered_words.append({
'text': word_text,
'bbox': tuple(word_rect),
'coverage': coverage_ratio
})
break # Word is covered, no need to check other rects
return covered_words
def _get_hidden_ocg_layers(self, doc: fitz.Document) -> List[str]:
"""
Get list of hidden Optional Content Group (OCG) layer names.
Args:
doc: PyMuPDF document object
Returns:
List of hidden layer names
"""
hidden_layers = []
try:
ocgs = doc.get_ocgs()
if not ocgs:
return hidden_layers
for ocg_xref, ocg_info in ocgs.items():
# Check if layer is hidden by default
if ocg_info.get('on') == False:
layer_name = ocg_info.get('name', f'OCG_{ocg_xref}')
hidden_layers.append(layer_name)
logger.debug(f"Found hidden OCG layer: {layer_name}")
except Exception as e:
logger.warning(f"Failed to get OCG layers: {e}")
return hidden_layers
def _calculate_garble_rate(self, text: str) -> float:
"""
Calculate the rate of garbled characters in text.
Detects:
- (cid:xxxx) patterns (missing ToUnicode map)
- Replacement character U+FFFD
- Private Use Area (PUA) characters
Args:
text: Text to analyze
Returns:
Garble rate as float between 0.0 and 1.0
"""
if not text:
return 0.0
# Count (cid:xxxx) patterns
cid_pattern = r'\(cid:\d+\)'
cid_matches = re.findall(cid_pattern, text)
cid_char_count = sum(len(m) for m in cid_matches)
# Count replacement characters (U+FFFD)
replacement_count = text.count('\ufffd')
# Count Private Use Area characters (U+E000 to U+F8FF)
pua_count = sum(1 for c in text if 0xE000 <= ord(c) <= 0xF8FF)
total_garble = cid_char_count + replacement_count + pua_count
total_chars = len(text)
return total_garble / total_chars if total_chars > 0 else 0.0
def _should_fallback_to_ocr(self, page_text: str, page_num: int) -> bool:
"""
Determine if page should use OCR fallback based on garble rate.
Args:
page_text: Extracted text from page
page_num: Page number for logging
Returns:
True if OCR fallback is recommended
"""
if not self.enable_garble_detection:
return False
garble_rate = self._calculate_garble_rate(page_text)
if garble_rate > self.garble_ocr_fallback_threshold:
logger.warning(
f"Page {page_num}: High garble rate detected ({garble_rate:.1%}). "
f"OCR fallback recommended."
)
return True
return False
def _is_page_number(self, text: str) -> bool:
"""
Check if text is likely a page number.
Args:
text: Text to check
Returns:
True if text matches page number patterns
"""
text = text.strip()
# Pure number
if text.isdigit() and len(text) <= 4:
return True
# Common patterns
patterns = [
r'^page\s*\d+$', # "Page 1"
r'^-?\s*\d+\s*-?$', # "- 1 -" or "-1-"
r'^\d+\s*/\s*\d+$', # "1/10"
r'^第\s*\d+\s*[頁页]$', # "第1頁" or "第1页"
r'^p\.?\s*\d+$', # "P.1" or "p1"
]
for pattern in patterns:
if re.match(pattern, text, re.IGNORECASE):
return True
return False
def _filter_page_numbers(self, elements: List[DocumentElement], page_height: float) -> List[DocumentElement]:
"""
Filter out page number elements.
Page numbers are typically:
- In the bottom 10% of the page
- Match numeric/page number patterns
Args:
elements: List of document elements
page_height: Page height for position calculation
Returns:
Filtered list without page numbers
"""
if not self.enable_page_number_filter:
return elements
filtered = []
removed_count = 0
for elem in elements:
# Only filter text elements
if elem.type not in [ElementType.TEXT, ElementType.PARAGRAPH]:
filtered.append(elem)
continue
# Check position - must be in bottom 10% of page
if elem.bbox:
y_rel = elem.bbox.y0 / page_height
if y_rel > 0.90:
# Get text content
text = elem.get_text() if hasattr(elem, 'get_text') else str(elem.content)
if self._is_page_number(text):
removed_count += 1
logger.debug(f"Filtered page number: '{text}'")
continue
filtered.append(elem)
if removed_count > 0:
logger.info(f"Filtered {removed_count} page number element(s)")
return filtered
def _is_text_in_covered_regions(self, bbox: BoundingBox, covered_bboxes: List[fitz.Rect]) -> bool:
"""
Check if a text bbox overlaps with any covered (white-out) regions.
Args:
bbox: Text bounding box
covered_bboxes: List of covered region rectangles
Returns:
True if text overlaps with covered regions
"""
if not covered_bboxes or not bbox:
return False
text_rect = fitz.Rect(bbox.x0, bbox.y0, bbox.x1, bbox.y1)
for covered_rect in covered_bboxes:
if text_rect.intersects(covered_rect):
# Calculate overlap ratio
intersection = text_rect & covered_rect
if not intersection.is_empty:
text_area = text_rect.width * text_rect.height
if text_area > 0:
overlap_ratio = (intersection.width * intersection.height) / text_area
if overlap_ratio >= self.whiteout_iou_threshold:
return True
return False
# =========================================================================
# Phase 4: GS Distillation - Exception Handler
# =========================================================================
@staticmethod
def is_ghostscript_available() -> bool:
"""Check if Ghostscript is available on the system."""
import shutil
return shutil.which('gs') is not None
def _should_trigger_gs_repair(self, file_path: Path) -> Tuple[bool, str]:
"""
Determine if Ghostscript repair should be triggered.
Triggers on:
1. High garble rate (>10% cid:xxxx patterns) in extracted text
2. Severe mupdf structural errors during opening
Args:
file_path: Path to PDF file
Returns:
Tuple of (should_repair, reason)
"""
import io
import sys
reason = ""
try:
# Capture mupdf warnings
old_stderr = sys.stderr
sys.stderr = captured_stderr = io.StringIO()
doc = fitz.open(str(file_path))
# Restore stderr and get warnings
sys.stderr = old_stderr
warnings = captured_stderr.getvalue()
# Check for severe structural errors
severe_keywords = ['error', 'invalid xref', 'corrupt', 'damaged', 'repair']
for keyword in severe_keywords:
if keyword.lower() in warnings.lower():
reason = f"Structural error detected: {keyword}"
doc.close()
return True, reason
# Check garble rate on first page
if len(doc) > 0:
page = doc[0]
text = page.get_text("text")
garble_rate = self._calculate_garble_rate(text)
if garble_rate > self.garble_ocr_fallback_threshold:
reason = f"High garble rate: {garble_rate:.1%}"
doc.close()
return True, reason
doc.close()
return False, ""
except Exception as e:
reason = f"Error opening PDF: {str(e)}"
return True, reason
def _repair_pdf_with_gs(self, input_path: Path, output_path: Path) -> bool:
"""
Repair a PDF using Ghostscript distillation.
This re-renders the PDF through Ghostscript's PDF interpreter,
which can fix many structural issues.
Args:
input_path: Path to input PDF
output_path: Path to save repaired PDF
Returns:
True if repair succeeded, False otherwise
"""
import subprocess
import shutil
if not self.is_ghostscript_available():
logger.warning("Ghostscript not available, cannot repair PDF")
return False
try:
# GS command for PDF repair/distillation
cmd = [
'gs',
'-dNOPAUSE',
'-dBATCH',
'-dSAFER',
'-sDEVICE=pdfwrite',
'-dPDFSETTINGS=/prepress',
'-dDetectDuplicateImages=true',
'-dCompressFonts=true',
'-dSubsetFonts=true',
f'-sOutputFile={output_path}',
str(input_path)
]
logger.info(f"Running Ghostscript repair: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60 # 60 second timeout
)
if result.returncode == 0 and output_path.exists():
logger.info(f"Ghostscript repair successful: {output_path}")
return True
else:
logger.error(f"Ghostscript repair failed: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error("Ghostscript repair timed out")
return False
except Exception as e:
logger.error(f"Ghostscript repair error: {e}")
return False
def extract_with_repair(self,
file_path: Path,
output_dir: Optional[Path] = None,
enable_gs_repair: bool = False) -> UnifiedDocument:
"""
Extract content with optional Ghostscript repair for damaged PDFs.
This method first checks if the PDF needs repair, and if so,
attempts to repair it using Ghostscript before extraction.
Args:
file_path: Path to PDF file
output_dir: Optional directory to save extracted images
enable_gs_repair: Whether to attempt GS repair on problematic PDFs
Returns:
UnifiedDocument with extracted content
"""
import tempfile
# Check if repair is needed and enabled
if enable_gs_repair:
should_repair, reason = self._should_trigger_gs_repair(file_path)
if should_repair:
logger.warning(f"PDF repair triggered: {reason}")
if self.is_ghostscript_available():
# Create temporary file for repaired PDF
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp:
tmp_path = Path(tmp.name)
try:
if self._repair_pdf_with_gs(file_path, tmp_path):
logger.info("Using repaired PDF for extraction")
result = self.extract(tmp_path, output_dir)
# Add repair metadata
if result.metadata:
result.metadata.gs_repaired = True
return result
else:
logger.warning("GS repair failed, trying original file")
finally:
# Cleanup temp file
if tmp_path.exists():
tmp_path.unlink()
else:
logger.warning("Ghostscript not available, skipping repair")
# Normal extraction
return self.extract(file_path, output_dir)
def get_pages_needing_ocr(self, doc: UnifiedDocument) -> List[int]:
"""
Get list of page numbers that need OCR fallback.
This method checks each page's metadata for the 'needs_ocr_fallback' flag
set during extraction when high garble rates are detected.
Args:
doc: UnifiedDocument from extraction
Returns:
List of page numbers (1-indexed) that need OCR processing
"""
pages_needing_ocr = []
for page in doc.pages:
if page.metadata and page.metadata.get('needs_ocr_fallback', False):
pages_needing_ocr.append(page.page_number)
if pages_needing_ocr:
logger.info(f"Pages needing OCR fallback: {pages_needing_ocr}")
return pages_needing_ocr
def get_extraction_quality_report(self, doc: UnifiedDocument) -> Dict[str, Any]:
"""
Generate a quality report for the extraction.
This report helps determine if additional processing (OCR, manual review)
is needed.
Args:
doc: UnifiedDocument from extraction
Returns:
Dict with quality metrics:
- total_pages: int
- pages_with_issues: list of page numbers with problems
- average_garble_rate: float
- needs_ocr_fallback: bool (any page needs OCR)
- preprocessing_stats: dict with sanitization/whiteout counts
"""
report = {
'total_pages': len(doc.pages),
'pages_with_issues': [],
'garble_rates': {},
'average_garble_rate': 0.0,
'needs_ocr_fallback': False,
'preprocessing_stats': {
'pages_sanitized': 0,
'total_whiteout_regions': 0
}
}
total_garble = 0.0
pages_with_garble = 0
for page in doc.pages:
metadata = page.metadata or {}
# Check garble rate
garble_rate = metadata.get('garble_rate', 0.0)
if garble_rate > 0:
report['garble_rates'][page.page_number] = garble_rate
total_garble += garble_rate
pages_with_garble += 1
# Check OCR fallback flag
if metadata.get('needs_ocr_fallback', False):
report['pages_with_issues'].append(page.page_number)
report['needs_ocr_fallback'] = True
# Preprocessing stats
preprocessing = metadata.get('preprocessing', {})
if preprocessing.get('sanitized', False):
report['preprocessing_stats']['pages_sanitized'] += 1
report['preprocessing_stats']['total_whiteout_regions'] += preprocessing.get('whiteout_regions_found', 0)
# Calculate average garble rate
if pages_with_garble > 0:
report['average_garble_rate'] = total_garble / pages_with_garble
return report