""" Gap Filling Service for OCR Track This service detects and fills gaps in PP-StructureV3 output by supplementing with Raw OCR text regions when significant content loss is detected. The hybrid approach uses Raw OCR's comprehensive text detection to compensate for PP-StructureV3's layout model limitations on certain document types. """ import logging from typing import Dict, List, Optional, Tuple, Set, Any from dataclasses import dataclass from app.models.unified_document import ( DocumentElement, BoundingBox, ElementType, Dimensions ) from app.core.config import settings from app.utils.bbox_utils import normalize_bbox as _normalize_bbox logger = logging.getLogger(__name__) # Element types that should NOT be supplemented (preserve structural integrity) SKIP_ELEMENT_TYPES: Set[ElementType] = { ElementType.TABLE, ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART, ElementType.DIAGRAM, ElementType.HEADER, ElementType.FOOTER, ElementType.FORMULA, ElementType.CODE, ElementType.BARCODE, ElementType.QR_CODE, ElementType.LOGO, ElementType.STAMP, ElementType.SIGNATURE, } @dataclass class TextRegion: """Represents a raw OCR text region.""" text: str bbox: List[float] # [x0, y0, x1, y1] or polygon format confidence: float page: int = 0 @property def normalized_bbox(self) -> Tuple[float, float, float, float]: """Get normalized bbox as (x0, y0, x1, y1). Uses shared bbox utility.""" result = _normalize_bbox(self.bbox) return result if result else (0, 0, 0, 0) @property def center(self) -> Tuple[float, float]: """Get center point of the bbox.""" x0, y0, x1, y1 = self.normalized_bbox return ((x0 + x1) / 2, (y0 + y1) / 2) # Element type to IoA threshold mapping # TABLE needs strict filtering (low threshold) to prevent duplicate content # FIGURE allows more text through (high threshold) to preserve axis labels, legends # TEXT/TITLE uses moderate threshold to tolerate boundary detection errors ELEMENT_TYPE_IOA_THRESHOLDS = { ElementType.TABLE: 'table', ElementType.FIGURE: 'figure', ElementType.IMAGE: 'figure', ElementType.CHART: 'figure', ElementType.DIAGRAM: 'figure', } class GapFillingService: """ Service for detecting and filling gaps in PP-StructureV3 output. This service uses IoA (Intersection over Area) algorithm for coverage detection, which correctly measures "small box contained in large box" relationship. Key improvements over IoU: - IoA = intersection_area / ocr_box_area (non-symmetric) - Better for detecting if OCR text is covered by larger layout regions - Different thresholds per element type (TEXT, TABLE, FIGURE) - Optional boundary shrinking to reduce edge duplicates This service: 1. Calculates coverage of PP-StructureV3 elements over raw OCR regions using IoA 2. Identifies uncovered raw OCR regions 3. Supplements uncovered regions as TEXT elements 4. Deduplicates against existing PP-StructureV3 TEXT elements 5. Recalculates reading order for the combined result """ def __init__( self, coverage_threshold: float = None, confidence_threshold: float = None, ioa_threshold_text: float = None, ioa_threshold_table: float = None, ioa_threshold_figure: float = None, dedup_ioa_threshold: float = None, shrink_pixels: int = None, enabled: bool = None ): """ Initialize the gap filling service. Args: coverage_threshold: Coverage ratio below which gap filling activates (default: 0.7) confidence_threshold: Minimum confidence for raw OCR regions (default: 0.3) ioa_threshold_text: IoA threshold for TEXT/TITLE elements (default: 0.6) ioa_threshold_table: IoA threshold for TABLE elements (default: 0.1) ioa_threshold_figure: IoA threshold for FIGURE/IMAGE elements (default: 0.8) dedup_ioa_threshold: IoA threshold for deduplication (default: 0.5) shrink_pixels: Shrink OCR bbox inward by this many pixels (default: 1) enabled: Whether gap filling is enabled (default: True) """ self.coverage_threshold = coverage_threshold if coverage_threshold is not None else getattr( settings, 'gap_filling_coverage_threshold', 0.7 ) self.confidence_threshold = confidence_threshold if confidence_threshold is not None else getattr( settings, 'gap_filling_confidence_threshold', 0.3 ) # IoA thresholds per element type self.ioa_threshold_text = ioa_threshold_text if ioa_threshold_text is not None else getattr( settings, 'gap_filling_ioa_threshold_text', 0.6 ) self.ioa_threshold_table = ioa_threshold_table if ioa_threshold_table is not None else getattr( settings, 'gap_filling_ioa_threshold_table', 0.1 ) self.ioa_threshold_figure = ioa_threshold_figure if ioa_threshold_figure is not None else getattr( settings, 'gap_filling_ioa_threshold_figure', 0.8 ) self.dedup_ioa_threshold = dedup_ioa_threshold if dedup_ioa_threshold is not None else getattr( settings, 'gap_filling_dedup_ioa_threshold', 0.5 ) # Boundary shrinking self.shrink_pixels = shrink_pixels if shrink_pixels is not None else getattr( settings, 'gap_filling_shrink_pixels', 1 ) self.enabled = enabled if enabled is not None else getattr( settings, 'gap_filling_enabled', True ) def should_activate( self, raw_ocr_regions: List[TextRegion], pp_structure_elements: List[DocumentElement] ) -> Tuple[bool, float]: """ Determine if gap filling should be activated. Gap filling activates when: 1. Coverage ratio is below threshold (default: 70%) 2. OR element count disparity is significant Args: raw_ocr_regions: List of raw OCR text regions pp_structure_elements: List of PP-StructureV3 elements Returns: Tuple of (should_activate, coverage_ratio) """ if not self.enabled: return False, 1.0 if not raw_ocr_regions: return False, 1.0 # Calculate coverage covered_count = 0 for region in raw_ocr_regions: if self._is_region_covered(region, pp_structure_elements): covered_count += 1 coverage_ratio = covered_count / len(raw_ocr_regions) # Check activation conditions should_activate = coverage_ratio < self.coverage_threshold if should_activate: logger.info( f"Gap filling activated: coverage={coverage_ratio:.2%} < threshold={self.coverage_threshold:.0%}, " f"raw_regions={len(raw_ocr_regions)}, pp_elements={len(pp_structure_elements)}" ) else: logger.debug( f"Gap filling not needed: coverage={coverage_ratio:.2%} >= threshold={self.coverage_threshold:.0%}" ) return should_activate, coverage_ratio def find_uncovered_regions( self, raw_ocr_regions: List[TextRegion], pp_structure_elements: List[DocumentElement] ) -> List[TextRegion]: """ Find raw OCR regions not covered by PP-StructureV3 elements. A region is considered covered if: 1. Its center point falls inside any PP-StructureV3 element bbox, OR 2. IoU with any PP-StructureV3 element exceeds iou_threshold Args: raw_ocr_regions: List of raw OCR text regions pp_structure_elements: List of PP-StructureV3 elements Returns: List of uncovered raw OCR regions """ uncovered = [] for region in raw_ocr_regions: # Skip low confidence regions if region.confidence < self.confidence_threshold: continue if not self._is_region_covered(region, pp_structure_elements): uncovered.append(region) logger.debug(f"Found {len(uncovered)} uncovered regions out of {len(raw_ocr_regions)}") return uncovered def _get_ioa_threshold_for_element(self, element_type: ElementType) -> float: """ Get the IoA threshold for a specific element type. Different element types have different thresholds: - TABLE: 0.1 (strict, prevents duplicate table content) - FIGURE/IMAGE: 0.8 (preserves text inside figures) - TEXT/others: 0.6 (tolerates boundary errors) Args: element_type: The element type to get threshold for Returns: IoA threshold value """ threshold_type = ELEMENT_TYPE_IOA_THRESHOLDS.get(element_type, 'text') if threshold_type == 'table': return self.ioa_threshold_table elif threshold_type == 'figure': return self.ioa_threshold_figure else: return self.ioa_threshold_text def _shrink_bbox( self, bbox: Tuple[float, float, float, float], pixels: int ) -> Tuple[float, float, float, float]: """ Shrink a bounding box inward by the specified number of pixels. This reduces false "uncovered" detection at region boundaries. Args: bbox: Original bbox (x0, y0, x1, y1) pixels: Number of pixels to shrink on each side Returns: Shrunk bbox (x0, y0, x1, y1) """ x0, y0, x1, y1 = bbox # Ensure we don't shrink to negative width/height width = x1 - x0 height = y1 - y0 max_shrink = min(width / 2, height / 2, pixels) return ( x0 + max_shrink, y0 + max_shrink, x1 - max_shrink, y1 - max_shrink ) def _is_region_covered( self, region: TextRegion, pp_structure_elements: List[DocumentElement], skip_table_coverage: bool = False ) -> bool: """ Check if a raw OCR region is covered by any PP-StructureV3 element. Uses IoA (Intersection over Area) instead of IoU for better coverage detection. IoA = intersection_area / ocr_box_area This correctly measures "OCR box is contained in layout region". Different element types use different IoA thresholds: - TABLE: 0.1 (strict, any overlap means covered) - FIGURE/IMAGE: 0.8 (preserve text inside figures like axis labels) - TEXT/others: 0.6 (tolerate boundary errors) Args: region: Raw OCR text region pp_structure_elements: List of PP-StructureV3 elements skip_table_coverage: If True, don't consider TABLE elements as covering. Default is False - TABLE elements DO cover regions to prevent duplicate rendering of table cell content. Returns: True if the region is covered """ center_x, center_y = region.center region_bbox = region.normalized_bbox # Apply boundary shrinking to reduce edge duplicates if self.shrink_pixels > 0: region_bbox = self._shrink_bbox(region_bbox, self.shrink_pixels) for element in pp_structure_elements: # Check TABLE elements for coverage (default behavior) # This prevents gap_fill from adding duplicate text inside table areas if skip_table_coverage and element.type == ElementType.TABLE: continue elem_bbox = ( element.bbox.x0, element.bbox.y0, element.bbox.x1, element.bbox.y1 ) # Check 1: Center point falls inside element bbox if self._point_in_bbox(center_x, center_y, elem_bbox): return True # Check 2: IoA exceeds element-type-specific threshold # IoA = intersection_area / ocr_box_area ioa = self._calculate_ioa(region_bbox, elem_bbox) threshold = self._get_ioa_threshold_for_element(element.type) if ioa > threshold: return True return False def deduplicate_regions( self, uncovered_regions: List[TextRegion], pp_structure_elements: List[DocumentElement] ) -> List[TextRegion]: """ Remove regions that highly overlap with existing PP-StructureV3 TEXT elements. Uses IoA (Intersection over Area) for deduplication to correctly detect when an OCR region is already covered by an existing TEXT element. Args: uncovered_regions: List of uncovered raw OCR regions pp_structure_elements: List of PP-StructureV3 elements Returns: Deduplicated list of regions """ # Get TEXT elements only for deduplication text_elements = [ e for e in pp_structure_elements if e.type not in SKIP_ELEMENT_TYPES ] deduplicated = [] for region in uncovered_regions: region_bbox = region.normalized_bbox # Apply boundary shrinking for deduplication as well if self.shrink_pixels > 0: region_bbox = self._shrink_bbox(region_bbox, self.shrink_pixels) is_duplicate = False for element in text_elements: elem_bbox = ( element.bbox.x0, element.bbox.y0, element.bbox.x1, element.bbox.y1 ) # Use IoA for deduplication ioa = self._calculate_ioa(region_bbox, elem_bbox) if ioa > self.dedup_ioa_threshold: logger.debug( f"Skipping duplicate region (IoA={ioa:.2f}): '{region.text[:30]}...'" ) is_duplicate = True break if not is_duplicate: deduplicated.append(region) removed_count = len(uncovered_regions) - len(deduplicated) if removed_count > 0: logger.debug(f"Removed {removed_count} duplicate regions") return deduplicated def convert_regions_to_elements( self, regions: List[TextRegion], page_number: int, start_element_id: int = 0 ) -> List[DocumentElement]: """ Convert raw OCR regions to DocumentElement objects. Args: regions: List of raw OCR regions to convert page_number: Page number for the elements start_element_id: Starting ID counter for elements Returns: List of DocumentElement objects """ elements = [] for idx, region in enumerate(regions): x0, y0, x1, y1 = region.normalized_bbox element = DocumentElement( element_id=f"gap_fill_{page_number}_{start_element_id + idx}", type=ElementType.TEXT, content=region.text, bbox=BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1), confidence=region.confidence, metadata={ 'source': 'gap_filling', 'original_confidence': region.confidence } ) elements.append(element) return elements def recalculate_reading_order( self, elements: List[DocumentElement] ) -> List[int]: """ Recalculate reading order for elements based on position. Sorts elements by y0 (top to bottom) then x0 (left to right). Args: elements: List of DocumentElement objects Returns: List of element indices in reading order """ # Create indexed list with position info indexed_elements = [ (idx, e.bbox.y0, e.bbox.x0) for idx, e in enumerate(elements) ] # Sort by y0 then x0 indexed_elements.sort(key=lambda x: (x[1], x[2])) # Return indices in reading order return [idx for idx, _, _ in indexed_elements] def merge_adjacent_regions( self, regions: List[TextRegion], max_horizontal_gap: float = 20.0, max_vertical_gap: float = 5.0 ) -> List[TextRegion]: """ Merge fragmented adjacent regions on the same line. This is optional and can reduce fragmentation from raw OCR. Args: regions: List of raw OCR regions max_horizontal_gap: Maximum horizontal gap to merge (pixels) max_vertical_gap: Maximum vertical gap to merge (pixels) Returns: List of merged regions """ if not regions: return regions # Sort by y0, then x0 sorted_regions = sorted( regions, key=lambda r: (r.normalized_bbox[1], r.normalized_bbox[0]) ) merged = [] current = sorted_regions[0] for next_region in sorted_regions[1:]: curr_bbox = current.normalized_bbox next_bbox = next_region.normalized_bbox # Check if on same line (vertical overlap) curr_y_center = (curr_bbox[1] + curr_bbox[3]) / 2 next_y_center = (next_bbox[1] + next_bbox[3]) / 2 vertical_distance = abs(curr_y_center - next_y_center) # Check horizontal gap horizontal_gap = next_bbox[0] - curr_bbox[2] if (vertical_distance < max_vertical_gap and 0 <= horizontal_gap <= max_horizontal_gap): # Merge regions merged_bbox = [ min(curr_bbox[0], next_bbox[0]), min(curr_bbox[1], next_bbox[1]), max(curr_bbox[2], next_bbox[2]), max(curr_bbox[3], next_bbox[3]) ] current = TextRegion( text=current.text + " " + next_region.text, bbox=merged_bbox, confidence=min(current.confidence, next_region.confidence), page=current.page ) else: merged.append(current) current = next_region merged.append(current) if len(merged) < len(regions): logger.debug(f"Merged {len(regions)} regions into {len(merged)}") return merged def fill_gaps( self, raw_ocr_regions: List[Dict[str, Any]], pp_structure_elements: List[DocumentElement], page_number: int, ocr_dimensions: Optional[Dict[str, Any]] = None, pp_dimensions: Optional[Dimensions] = None ) -> Tuple[List[DocumentElement], Dict[str, Any]]: """ Main entry point: detect gaps and fill with raw OCR regions. Args: raw_ocr_regions: Raw OCR results (list of dicts with text, bbox, confidence) pp_structure_elements: PP-StructureV3 elements page_number: Current page number ocr_dimensions: OCR image dimensions for coordinate alignment pp_dimensions: PP-Structure dimensions for coordinate alignment Returns: Tuple of (supplemented_elements, statistics) """ statistics = { 'enabled': self.enabled, 'activated': False, 'coverage_ratio': 1.0, 'raw_ocr_count': len(raw_ocr_regions), 'pp_structure_count': len(pp_structure_elements), 'uncovered_count': 0, 'deduplicated_count': 0, 'supplemented_count': 0 } if not self.enabled: logger.debug("Gap filling is disabled") return [], statistics # Convert raw OCR regions to TextRegion objects text_regions = self._convert_raw_ocr_regions( raw_ocr_regions, page_number, ocr_dimensions, pp_dimensions ) if not text_regions: logger.debug("No valid text regions to process") return [], statistics # Check if gap filling should activate should_activate, coverage_ratio = self.should_activate( text_regions, pp_structure_elements ) statistics['coverage_ratio'] = coverage_ratio statistics['activated'] = should_activate if not should_activate: return [], statistics # Find uncovered regions uncovered = self.find_uncovered_regions(text_regions, pp_structure_elements) statistics['uncovered_count'] = len(uncovered) if not uncovered: logger.debug("No uncovered regions found") return [], statistics # Deduplicate against existing TEXT elements deduplicated = self.deduplicate_regions(uncovered, pp_structure_elements) statistics['deduplicated_count'] = len(deduplicated) if not deduplicated: logger.debug("All uncovered regions were duplicates") return [], statistics # Optional: Merge adjacent regions # merged = self.merge_adjacent_regions(deduplicated) # Convert to DocumentElements start_id = len(pp_structure_elements) supplemented = self.convert_regions_to_elements( deduplicated, page_number, start_id ) statistics['supplemented_count'] = len(supplemented) logger.info( f"Gap filling complete: supplemented {len(supplemented)} elements " f"(coverage: {coverage_ratio:.2%} -> estimated {(coverage_ratio + len(supplemented)/len(text_regions) if text_regions else 0):.2%})" ) return supplemented, statistics def _convert_raw_ocr_regions( self, raw_regions: List[Dict[str, Any]], page_number: int, ocr_dimensions: Optional[Dict[str, Any]] = None, pp_dimensions: Optional[Dimensions] = None ) -> List[TextRegion]: """ Convert raw OCR region dicts to TextRegion objects. Handles coordinate alignment if dimensions are provided. Args: raw_regions: List of raw OCR region dictionaries page_number: Current page number ocr_dimensions: OCR image dimensions pp_dimensions: PP-Structure dimensions Returns: List of TextRegion objects """ text_regions = [] # Calculate scale factors if needed scale_x, scale_y = 1.0, 1.0 if ocr_dimensions and pp_dimensions: ocr_width = ocr_dimensions.get('width', 0) ocr_height = ocr_dimensions.get('height', 0) if ocr_width > 0 and pp_dimensions.width > 0: scale_x = pp_dimensions.width / ocr_width if ocr_height > 0 and pp_dimensions.height > 0: scale_y = pp_dimensions.height / ocr_height if scale_x != 1.0 or scale_y != 1.0: logger.debug(f"Coordinate scaling: x={scale_x:.3f}, y={scale_y:.3f}") for region in raw_regions: text = region.get('text', '') if not text or not text.strip(): continue confidence = region.get('confidence', 0.0) bbox_raw = region.get('bbox', []) # Normalize bbox if isinstance(bbox_raw, dict): # Dict format: {x_min, y_min, x_max, y_max} bbox = [ bbox_raw.get('x_min', 0), bbox_raw.get('y_min', 0), bbox_raw.get('x_max', 0), bbox_raw.get('y_max', 0) ] elif isinstance(bbox_raw, (list, tuple)): bbox = list(bbox_raw) else: continue # Apply scaling if needed if scale_x != 1.0 or scale_y != 1.0: # Check if nested list format [[x1,y1], [x2,y2], ...] if len(bbox) >= 1 and isinstance(bbox[0], (list, tuple)): bbox = [ [pt[0] * scale_x, pt[1] * scale_y] for pt in bbox if len(pt) >= 2 ] elif len(bbox) == 4 and not isinstance(bbox[0], (list, tuple)): # Simple [x0, y0, x1, y1] format bbox = [ bbox[0] * scale_x, bbox[1] * scale_y, bbox[2] * scale_x, bbox[3] * scale_y ] elif len(bbox) >= 8: # Flat polygon format [x1, y1, x2, y2, ...] bbox = [ bbox[i] * (scale_x if i % 2 == 0 else scale_y) for i in range(len(bbox)) ] text_regions.append(TextRegion( text=text, bbox=bbox, confidence=confidence, page=page_number )) return text_regions @staticmethod def _point_in_bbox( x: float, y: float, bbox: Tuple[float, float, float, float] ) -> bool: """Check if point (x, y) is inside bbox (x0, y0, x1, y1).""" x0, y0, x1, y1 = bbox return x0 <= x <= x1 and y0 <= y <= y1 @staticmethod def _calculate_ioa( ocr_bbox: Tuple[float, float, float, float], layout_bbox: Tuple[float, float, float, float] ) -> float: """ Calculate Intersection over Area (IoA) of OCR bbox relative to layout bbox. IoA = intersection_area / ocr_box_area This is the recommended algorithm for detecting if an OCR text region is contained within a larger layout region. Unlike IoU which is symmetric, IoA correctly measures "how much of the OCR box is inside the layout region". Example: - OCR box: 100x20 pixels (small text line) - Layout box: 500x800 pixels (large paragraph region) - IoU would be very small (~0.005) even if OCR is fully inside layout - IoA would be 1.0 if OCR is fully inside layout, which is correct Args: ocr_bbox: OCR text region bbox (x0, y0, x1, y1) - typically smaller layout_bbox: Layout element bbox (x0, y0, x1, y1) - typically larger Returns: IoA value between 0 and 1 """ # Calculate intersection x0 = max(ocr_bbox[0], layout_bbox[0]) y0 = max(ocr_bbox[1], layout_bbox[1]) x1 = min(ocr_bbox[2], layout_bbox[2]) y1 = min(ocr_bbox[3], layout_bbox[3]) if x1 <= x0 or y1 <= y0: return 0.0 intersection = (x1 - x0) * (y1 - y0) # Calculate OCR box area (denominator for IoA) ocr_area = (ocr_bbox[2] - ocr_bbox[0]) * (ocr_bbox[3] - ocr_bbox[1]) if ocr_area <= 0: return 0.0 return intersection / ocr_area @staticmethod def _calculate_iou( bbox1: Tuple[float, float, float, float], bbox2: Tuple[float, float, float, float] ) -> float: """ Calculate Intersection over Union (IoU) of two bboxes. Note: This method is kept for backward compatibility. For coverage detection, use _calculate_ioa() instead. Args: bbox1: First bbox (x0, y0, x1, y1) bbox2: Second bbox (x0, y0, x1, y1) Returns: IoU value between 0 and 1 """ # Calculate intersection x0 = max(bbox1[0], bbox2[0]) y0 = max(bbox1[1], bbox2[1]) x1 = min(bbox1[2], bbox2[2]) y1 = min(bbox1[3], bbox2[3]) if x1 <= x0 or y1 <= y0: return 0.0 intersection = (x1 - x0) * (y1 - y0) # Calculate union area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]) area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1]) union = area1 + area2 - intersection if union <= 0: return 0.0 return intersection / union