feat: add black/white covering image detection

Implements detection of embedded images used for redaction/covering:
- Analyzes embedded images for mostly black (avg RGB <= 30) or white (>= 245)
- Uses PIL to efficiently sample image colors
- Gets image position on page via get_image_rects()
- Integrates with existing preprocessing pipeline
- Adds covering_images to page metadata and quality report

Detection results:
- demo_docs/edit3.pdf: 10 black covering images detected (7 on P1, 3 on P2)

Quality report now includes:
- total_covering_images count
- Per-page covering_images details with bbox, color_type, size

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-12-04 07:42:55 +08:00
parent 3903bcf77d
commit d6387adbd1

View File

@@ -120,7 +120,8 @@ class DirectExtractionEngine:
doc[page_num], doc[page_num],
page_num + 1, page_num + 1,
document_id, document_id,
output_dir output_dir,
doc # Pass doc for covering image detection
) )
pages.append(page) pages.append(page)
@@ -211,7 +212,8 @@ class DirectExtractionEngine:
page: fitz.Page, page: fitz.Page,
page_num: int, page_num: int,
document_id: str, document_id: str,
output_dir: Optional[Path]) -> Page: output_dir: Optional[Path],
doc: fitz.Document = None) -> Page:
"""Extract content from a single page with preprocessing pipeline.""" """Extract content from a single page with preprocessing pipeline."""
elements = [] elements = []
element_counter = 0 element_counter = 0
@@ -219,8 +221,8 @@ class DirectExtractionEngine:
# ===================================================================== # =====================================================================
# PREPROCESSING PIPELINE # PREPROCESSING PIPELINE
# ===================================================================== # =====================================================================
# Step 1: Run preprocessing (sanitization, white-out detection) # Step 1: Run preprocessing (sanitization, white-out detection, covering images)
preprocess_result = self._preprocess_page(page, page_num) preprocess_result = self._preprocess_page(page, page_num, doc)
covered_bboxes = preprocess_result.get('covered_word_bboxes', []) covered_bboxes = preprocess_result.get('covered_word_bboxes', [])
# Get page-level metadata (for final Page metadata) # Get page-level metadata (for final Page metadata)
@@ -337,13 +339,16 @@ class DirectExtractionEngine:
elements = self._filter_page_numbers(elements, dimensions.height) elements = self._filter_page_numbers(elements, dimensions.height)
# Step 3.2-3.3: Garble detection and OCR fallback recommendation # Step 3.2-3.3: Garble detection and OCR fallback recommendation
covering_images = preprocess_result.get('covering_images', [])
page_metadata = { page_metadata = {
"has_drawings": len(drawings) > 0, "has_drawings": len(drawings) > 0,
"drawing_count": len(drawings), "drawing_count": len(drawings),
"link_count": len(links), "link_count": len(links),
"preprocessing": { "preprocessing": {
"sanitized": preprocess_result.get('sanitized', False), "sanitized": preprocess_result.get('sanitized', False),
"whiteout_regions_found": len(covered_bboxes) "whiteout_regions_found": len(covered_bboxes) - len(covering_images), # Vector rects only
"covering_images_found": len(covering_images),
"covering_images": covering_images # Full details for debugging
} }
} }
@@ -1856,27 +1861,31 @@ class DirectExtractionEngine:
# PDF Preprocessing Pipeline Methods # PDF Preprocessing Pipeline Methods
# ========================================================================= # =========================================================================
def _preprocess_page(self, page: fitz.Page, page_num: int) -> Dict[str, Any]: def _preprocess_page(self, page: fitz.Page, page_num: int, doc: fitz.Document = None) -> Dict[str, Any]:
""" """
Run preprocessing pipeline on a page before extraction. Run preprocessing pipeline on a page before extraction.
Pipeline steps: Pipeline steps:
1. Content sanitization (clean_contents) 1. Content sanitization (clean_contents)
2. Hidden layer detection (OCG) 2. Hidden layer detection (OCG)
3. White-out detection 3. White-out/black-out detection (vector rectangles)
4. Covering image detection (embedded black/white images)
Args: Args:
page: PyMuPDF page object page: PyMuPDF page object
page_num: Page number (1-indexed) page_num: Page number (1-indexed)
doc: PyMuPDF document object (needed for image analysis)
Returns: Returns:
Dict with preprocessing results: Dict with preprocessing results:
- covered_word_bboxes: List of bboxes for text covered by white rectangles - covered_word_bboxes: List of bboxes for text covered by rectangles/images
- covering_images: List of covering image info
- hidden_layers: List of hidden OCG layer names - hidden_layers: List of hidden OCG layer names
- sanitized: Whether content was sanitized - sanitized: Whether content was sanitized
""" """
result = { result = {
'covered_word_bboxes': [], 'covered_word_bboxes': [],
'covering_images': [],
'hidden_layers': [], 'hidden_layers': [],
'sanitized': False 'sanitized': False
} }
@@ -1890,7 +1899,7 @@ class DirectExtractionEngine:
except Exception as e: except Exception as e:
logger.warning(f"Page {page_num}: Content sanitization failed: {e}") logger.warning(f"Page {page_num}: Content sanitization failed: {e}")
# Step 1.3: White-out detection # Step 1.3: White-out/black-out detection (vector rectangles)
if self.enable_whiteout_detection: if self.enable_whiteout_detection:
covered = self._detect_whiteout_covered_text(page, page_num) covered = self._detect_whiteout_covered_text(page, page_num)
result['covered_word_bboxes'] = [fitz.Rect(w['bbox']) for w in covered] result['covered_word_bboxes'] = [fitz.Rect(w['bbox']) for w in covered]
@@ -1903,6 +1912,19 @@ class DirectExtractionEngine:
logger.info(f"Page {page_num}: Detected {len(covered)} covered text regions " logger.info(f"Page {page_num}: Detected {len(covered)} covered text regions "
f"(white: {white_covered}, black/redaction: {black_covered}, other: {other_covered})") f"(white: {white_covered}, black/redaction: {black_covered}, other: {other_covered})")
# Step 1.4: Covering image detection (embedded black/white images)
if self.enable_whiteout_detection and doc is not None:
covering_images = self._detect_covering_images(page, doc, page_num)
result['covering_images'] = covering_images
# Add covering image bboxes to the covered_word_bboxes list
for img in covering_images:
result['covered_word_bboxes'].append(fitz.Rect(img['bbox']))
if covering_images:
black_imgs = sum(1 for c in covering_images if c['color_type'] == 'image_black')
white_imgs = sum(1 for c in covering_images if c['color_type'] == 'image_white')
logger.info(f"Page {page_num}: Detected {len(covering_images)} covering images "
f"(black: {black_imgs}, white: {white_imgs})")
return result return result
def _detect_whiteout_covered_text(self, page: fitz.Page, page_num: int) -> List[Dict]: def _detect_whiteout_covered_text(self, page: fitz.Page, page_num: int) -> List[Dict]:
@@ -1989,6 +2011,95 @@ class DirectExtractionEngine:
return covered_words return covered_words
def _detect_covering_images(self, page: fitz.Page, doc: fitz.Document, page_num: int) -> List[Dict]:
"""
Detect embedded images that are mostly black/white (likely covering/redaction).
Args:
page: PyMuPDF page object
doc: PyMuPDF document object (needed for image extraction)
page_num: Page number for logging
Returns:
List of dicts with covering image info: {'bbox', 'color_type', 'avg_color'}
"""
covering_images = []
try:
# Get all images on the page with their positions
image_list = page.get_images(full=True)
if not image_list:
return covering_images
for img_info in image_list:
xref = img_info[0]
width = img_info[2]
height = img_info[3]
# Skip very small images (icons, bullets)
if width < 20 or height < 10:
continue
try:
# Extract image data
base_image = doc.extract_image(xref)
img_bytes = base_image.get('image')
if not img_bytes:
continue
# Analyze image color using PIL
from PIL import Image
import io
img = Image.open(io.BytesIO(img_bytes))
if img.mode != 'RGB':
img = img.convert('RGB')
# Sample pixels for efficiency (don't analyze every pixel)
img_small = img.resize((min(50, img.width), min(50, img.height)))
pixels = list(img_small.getdata())
if not pixels:
continue
avg_r = sum(p[0] for p in pixels) / len(pixels)
avg_g = sum(p[1] for p in pixels) / len(pixels)
avg_b = sum(p[2] for p in pixels) / len(pixels)
# Determine if image is mostly black or white
color_type = None
if avg_r <= 30 and avg_g <= 30 and avg_b <= 30:
color_type = 'image_black'
elif avg_r >= 245 and avg_g >= 245 and avg_b >= 245:
color_type = 'image_white'
if color_type:
# Get image position on page
# We need to find the image rectangle on the page
for img_rect in page.get_image_rects(xref):
covering_images.append({
'bbox': tuple(img_rect),
'color_type': color_type,
'avg_color': (avg_r, avg_g, avg_b),
'size': (width, height)
})
except Exception as e:
logger.debug(f"Page {page_num}: Failed to analyze image xref={xref}: {e}")
continue
if covering_images:
black_count = sum(1 for c in covering_images if c['color_type'] == 'image_black')
white_count = sum(1 for c in covering_images if c['color_type'] == 'image_white')
logger.debug(f"Page {page_num}: Found {len(covering_images)} covering images "
f"(black: {black_count}, white: {white_count})")
except Exception as e:
logger.warning(f"Page {page_num}: Failed to detect covering images: {e}")
return covering_images
def _get_hidden_ocg_layers(self, doc: fitz.Document) -> List[str]: def _get_hidden_ocg_layers(self, doc: fitz.Document) -> List[str]:
""" """
Get list of hidden Optional Content Group (OCG) layer names. Get list of hidden Optional Content Group (OCG) layer names.
@@ -2410,7 +2521,8 @@ class DirectExtractionEngine:
'needs_ocr_fallback': False, 'needs_ocr_fallback': False,
'preprocessing_stats': { 'preprocessing_stats': {
'pages_sanitized': 0, 'pages_sanitized': 0,
'total_whiteout_regions': 0 'total_whiteout_regions': 0,
'total_covering_images': 0
} }
} }
@@ -2437,6 +2549,7 @@ class DirectExtractionEngine:
if preprocessing.get('sanitized', False): if preprocessing.get('sanitized', False):
report['preprocessing_stats']['pages_sanitized'] += 1 report['preprocessing_stats']['pages_sanitized'] += 1
report['preprocessing_stats']['total_whiteout_regions'] += preprocessing.get('whiteout_regions_found', 0) report['preprocessing_stats']['total_whiteout_regions'] += preprocessing.get('whiteout_regions_found', 0)
report['preprocessing_stats']['total_covering_images'] += preprocessing.get('covering_images_found', 0)
# Calculate average garble rate # Calculate average garble rate
if pages_with_garble > 0: if pages_with_garble > 0: