Backup commit before executing remove-unused-code proposal. This includes all pending changes and new features. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
807 lines
28 KiB
Python
807 lines
28 KiB
Python
"""
|
|
Table Content Rebuilder
|
|
|
|
Rebuilds table content from raw OCR regions when PP-StructureV3's HTML output
|
|
is incorrect due to cell merge errors or boundary detection issues.
|
|
|
|
This module addresses the key problem: PP-StructureV3's ML-based table recognition
|
|
often merges multiple cells incorrectly, especially for borderless tables.
|
|
The solution uses:
|
|
1. cell_boxes validation (filter out-of-bounds cells)
|
|
2. Raw OCR regions to rebuild accurate cell content
|
|
3. Grid-based row/col position calculation
|
|
"""
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from collections import defaultdict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class CellBox:
|
|
"""Represents a validated cell bounding box."""
|
|
x0: float
|
|
y0: float
|
|
x1: float
|
|
y1: float
|
|
original_index: int
|
|
|
|
@property
|
|
def center_y(self) -> float:
|
|
return (self.y0 + self.y1) / 2
|
|
|
|
@property
|
|
def center_x(self) -> float:
|
|
return (self.x0 + self.x1) / 2
|
|
|
|
@property
|
|
def area(self) -> float:
|
|
return max(0, (self.x1 - self.x0) * (self.y1 - self.y0))
|
|
|
|
|
|
@dataclass
|
|
class OCRTextRegion:
|
|
"""Represents a raw OCR text region."""
|
|
text: str
|
|
x0: float
|
|
y0: float
|
|
x1: float
|
|
y1: float
|
|
confidence: float = 1.0
|
|
|
|
@property
|
|
def center_y(self) -> float:
|
|
return (self.y0 + self.y1) / 2
|
|
|
|
@property
|
|
def center_x(self) -> float:
|
|
return (self.x0 + self.x1) / 2
|
|
|
|
|
|
@dataclass
|
|
class RebuiltCell:
|
|
"""Represents a rebuilt table cell."""
|
|
row: int
|
|
col: int
|
|
row_span: int
|
|
col_span: int
|
|
content: str
|
|
bbox: Optional[List[float]] = None
|
|
ocr_regions: List[OCRTextRegion] = None
|
|
|
|
def __post_init__(self):
|
|
if self.ocr_regions is None:
|
|
self.ocr_regions = []
|
|
|
|
|
|
class TableContentRebuilder:
|
|
"""
|
|
Rebuilds table content from raw OCR regions and validated cell_boxes.
|
|
|
|
This class solves the problem where PP-StructureV3's HTML output incorrectly
|
|
merges multiple cells. Instead of relying on the ML-generated HTML, it:
|
|
1. Validates cell_boxes against table bbox
|
|
2. Groups cell_boxes into rows/columns by coordinate clustering
|
|
3. Fills each cell with matching raw OCR text
|
|
4. Generates correct table structure
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
boundary_tolerance: float = 20.0,
|
|
row_clustering_threshold: float = 15.0,
|
|
col_clustering_threshold: float = 15.0,
|
|
iou_threshold_for_ocr_match: float = 0.3,
|
|
min_text_coverage: float = 0.5
|
|
):
|
|
"""
|
|
Initialize the rebuilder.
|
|
|
|
Args:
|
|
boundary_tolerance: Tolerance for cell_boxes boundary check (pixels)
|
|
row_clustering_threshold: Max Y-distance for cells in same row (pixels)
|
|
col_clustering_threshold: Max X-distance for cells in same column (pixels)
|
|
iou_threshold_for_ocr_match: Min IoU to consider OCR region inside cell
|
|
min_text_coverage: Min overlap ratio for OCR text to be assigned to cell
|
|
"""
|
|
self.boundary_tolerance = boundary_tolerance
|
|
self.row_clustering_threshold = row_clustering_threshold
|
|
self.col_clustering_threshold = col_clustering_threshold
|
|
self.iou_threshold = iou_threshold_for_ocr_match
|
|
self.min_text_coverage = min_text_coverage
|
|
|
|
def validate_cell_boxes(
|
|
self,
|
|
cell_boxes: List[List[float]],
|
|
table_bbox: List[float]
|
|
) -> Tuple[List[CellBox], Dict[str, Any]]:
|
|
"""
|
|
Validate cell_boxes against table bbox, filtering invalid ones.
|
|
|
|
Args:
|
|
cell_boxes: List of cell bounding boxes [[x0, y0, x1, y1], ...]
|
|
table_bbox: Table bounding box [x0, y0, x1, y1]
|
|
|
|
Returns:
|
|
Tuple of (valid_cells, validation_stats)
|
|
"""
|
|
if not cell_boxes or len(table_bbox) < 4:
|
|
return [], {"total": 0, "valid": 0, "invalid": 0, "reason": "empty_input"}
|
|
|
|
table_x0, table_y0, table_x1, table_y1 = table_bbox[:4]
|
|
table_height = table_y1 - table_y0
|
|
table_width = table_x1 - table_x0
|
|
|
|
# Expanded table bounds with tolerance
|
|
expanded_y1 = table_y1 + self.boundary_tolerance
|
|
expanded_x1 = table_x1 + self.boundary_tolerance
|
|
expanded_y0 = table_y0 - self.boundary_tolerance
|
|
expanded_x0 = table_x0 - self.boundary_tolerance
|
|
|
|
valid_cells = []
|
|
invalid_reasons = defaultdict(int)
|
|
|
|
for idx, box in enumerate(cell_boxes):
|
|
if not box or len(box) < 4:
|
|
invalid_reasons["invalid_format"] += 1
|
|
continue
|
|
|
|
x0, y0, x1, y1 = box[:4]
|
|
|
|
# Check if cell is significantly outside table bounds
|
|
# Cell's bottom (y1) shouldn't exceed table's bottom + tolerance
|
|
if y1 > expanded_y1:
|
|
invalid_reasons["y1_exceeds_table"] += 1
|
|
continue
|
|
|
|
# Cell's top (y0) shouldn't be above table's top - tolerance
|
|
if y0 < expanded_y0:
|
|
invalid_reasons["y0_above_table"] += 1
|
|
continue
|
|
|
|
# Cell's right (x1) shouldn't exceed table's right + tolerance
|
|
if x1 > expanded_x1:
|
|
invalid_reasons["x1_exceeds_table"] += 1
|
|
continue
|
|
|
|
# Cell's left (x0) shouldn't be left of table - tolerance
|
|
if x0 < expanded_x0:
|
|
invalid_reasons["x0_left_of_table"] += 1
|
|
continue
|
|
|
|
# Check for inverted coordinates
|
|
if x0 >= x1 or y0 >= y1:
|
|
invalid_reasons["inverted_coords"] += 1
|
|
continue
|
|
|
|
# Check cell height is reasonable (at least 8px for readable text)
|
|
cell_height = y1 - y0
|
|
if cell_height < 8:
|
|
invalid_reasons["too_small"] += 1
|
|
continue
|
|
|
|
valid_cells.append(CellBox(
|
|
x0=x0, y0=y0, x1=x1, y1=y1,
|
|
original_index=idx
|
|
))
|
|
|
|
stats = {
|
|
"total": len(cell_boxes),
|
|
"valid": len(valid_cells),
|
|
"invalid": len(cell_boxes) - len(valid_cells),
|
|
"invalid_reasons": dict(invalid_reasons),
|
|
"validity_ratio": len(valid_cells) / len(cell_boxes) if cell_boxes else 0
|
|
}
|
|
|
|
logger.info(
|
|
f"Cell box validation: {stats['valid']}/{stats['total']} valid "
|
|
f"(ratio={stats['validity_ratio']:.2%})"
|
|
)
|
|
if invalid_reasons:
|
|
logger.debug(f"Invalid reasons: {dict(invalid_reasons)}")
|
|
|
|
return valid_cells, stats
|
|
|
|
def parse_raw_ocr_regions(
|
|
self,
|
|
raw_regions: List[Dict[str, Any]],
|
|
table_bbox: List[float]
|
|
) -> List[OCRTextRegion]:
|
|
"""
|
|
Parse raw OCR regions and filter to those within/near table bbox.
|
|
|
|
Args:
|
|
raw_regions: List of raw OCR region dicts with 'text', 'bbox', 'confidence'
|
|
table_bbox: Table bounding box [x0, y0, x1, y1]
|
|
|
|
Returns:
|
|
List of OCRTextRegion objects within table area
|
|
"""
|
|
if not raw_regions or len(table_bbox) < 4:
|
|
return []
|
|
|
|
table_x0, table_y0, table_x1, table_y1 = table_bbox[:4]
|
|
# Expand table area slightly to catch edge text
|
|
margin = 10
|
|
|
|
result = []
|
|
for region in raw_regions:
|
|
text = region.get('text', '').strip()
|
|
if not text:
|
|
continue
|
|
|
|
bbox = region.get('bbox', [])
|
|
confidence = region.get('confidence', 1.0)
|
|
|
|
# Parse bbox (handle both nested and flat formats)
|
|
if not bbox:
|
|
continue
|
|
|
|
if isinstance(bbox[0], (list, tuple)):
|
|
# Nested format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
|
|
xs = [pt[0] for pt in bbox if len(pt) >= 2]
|
|
ys = [pt[1] for pt in bbox if len(pt) >= 2]
|
|
if xs and ys:
|
|
x0, y0, x1, y1 = min(xs), min(ys), max(xs), max(ys)
|
|
else:
|
|
continue
|
|
elif len(bbox) == 4:
|
|
x0, y0, x1, y1 = bbox
|
|
else:
|
|
continue
|
|
|
|
# Check if region overlaps with table area
|
|
if (x1 < table_x0 - margin or x0 > table_x1 + margin or
|
|
y1 < table_y0 - margin or y0 > table_y1 + margin):
|
|
continue
|
|
|
|
result.append(OCRTextRegion(
|
|
text=text,
|
|
x0=float(x0), y0=float(y0),
|
|
x1=float(x1), y1=float(y1),
|
|
confidence=confidence
|
|
))
|
|
|
|
logger.debug(f"Parsed {len(result)} OCR regions within table area")
|
|
return result
|
|
|
|
def cluster_cells_into_grid(
|
|
self,
|
|
cells: List[CellBox]
|
|
) -> Tuple[List[float], List[float], Dict[Tuple[int, int], CellBox]]:
|
|
"""
|
|
Cluster cells into rows and columns based on coordinates.
|
|
|
|
Args:
|
|
cells: List of validated CellBox objects
|
|
|
|
Returns:
|
|
Tuple of (row_boundaries, col_boundaries, cell_grid)
|
|
- row_boundaries: Y coordinates for row divisions
|
|
- col_boundaries: X coordinates for column divisions
|
|
- cell_grid: Dict mapping (row, col) to CellBox
|
|
"""
|
|
if not cells:
|
|
return [], [], {}
|
|
|
|
# Collect all unique Y boundaries (top and bottom of cells)
|
|
y_coords = set()
|
|
x_coords = set()
|
|
for cell in cells:
|
|
y_coords.add(round(cell.y0, 1))
|
|
y_coords.add(round(cell.y1, 1))
|
|
x_coords.add(round(cell.x0, 1))
|
|
x_coords.add(round(cell.x1, 1))
|
|
|
|
# Cluster nearby coordinates
|
|
row_boundaries = self._cluster_coordinates(sorted(y_coords), self.row_clustering_threshold)
|
|
col_boundaries = self._cluster_coordinates(sorted(x_coords), self.col_clustering_threshold)
|
|
|
|
logger.debug(f"Found {len(row_boundaries)} row boundaries, {len(col_boundaries)} col boundaries")
|
|
|
|
# Map cells to grid positions
|
|
cell_grid = {}
|
|
for cell in cells:
|
|
# Find row (based on cell's top Y coordinate)
|
|
row = self._find_position(cell.y0, row_boundaries)
|
|
# Find column (based on cell's left X coordinate)
|
|
col = self._find_position(cell.x0, col_boundaries)
|
|
|
|
if row is not None and col is not None:
|
|
# Check for span (if cell extends across multiple rows/cols)
|
|
row_end = self._find_position(cell.y1, row_boundaries)
|
|
col_end = self._find_position(cell.x1, col_boundaries)
|
|
|
|
# Store with potential span info
|
|
if (row, col) not in cell_grid:
|
|
cell_grid[(row, col)] = cell
|
|
|
|
return row_boundaries, col_boundaries, cell_grid
|
|
|
|
def _cluster_coordinates(
|
|
self,
|
|
coords: List[float],
|
|
threshold: float
|
|
) -> List[float]:
|
|
"""Cluster nearby coordinates into distinct values."""
|
|
if not coords:
|
|
return []
|
|
|
|
clustered = [coords[0]]
|
|
for coord in coords[1:]:
|
|
if coord - clustered[-1] > threshold:
|
|
clustered.append(coord)
|
|
|
|
return clustered
|
|
|
|
def _find_position(
|
|
self,
|
|
value: float,
|
|
boundaries: List[float]
|
|
) -> Optional[int]:
|
|
"""Find which position (index) a value falls into."""
|
|
for i, boundary in enumerate(boundaries):
|
|
if value <= boundary + self.row_clustering_threshold:
|
|
return i
|
|
return len(boundaries) - 1 if boundaries else None
|
|
|
|
def assign_ocr_to_cells(
|
|
self,
|
|
cells: List[CellBox],
|
|
ocr_regions: List[OCRTextRegion],
|
|
row_boundaries: List[float],
|
|
col_boundaries: List[float]
|
|
) -> Dict[Tuple[int, int], List[OCRTextRegion]]:
|
|
"""
|
|
Assign OCR text regions to cells based on spatial overlap.
|
|
|
|
Args:
|
|
cells: List of validated CellBox objects
|
|
ocr_regions: List of OCRTextRegion objects
|
|
row_boundaries: Y coordinates for row divisions
|
|
col_boundaries: X coordinates for column divisions
|
|
|
|
Returns:
|
|
Dict mapping (row, col) to list of OCR regions in that cell
|
|
"""
|
|
cell_ocr_map: Dict[Tuple[int, int], List[OCRTextRegion]] = defaultdict(list)
|
|
|
|
for ocr in ocr_regions:
|
|
best_cell = None
|
|
best_overlap = 0
|
|
|
|
for cell in cells:
|
|
overlap = self._calculate_overlap_ratio(
|
|
(ocr.x0, ocr.y0, ocr.x1, ocr.y1),
|
|
(cell.x0, cell.y0, cell.x1, cell.y1)
|
|
)
|
|
|
|
if overlap > best_overlap and overlap >= self.min_text_coverage:
|
|
best_overlap = overlap
|
|
best_cell = cell
|
|
|
|
if best_cell:
|
|
row = self._find_position(best_cell.y0, row_boundaries)
|
|
col = self._find_position(best_cell.x0, col_boundaries)
|
|
if row is not None and col is not None:
|
|
cell_ocr_map[(row, col)].append(ocr)
|
|
|
|
return cell_ocr_map
|
|
|
|
def _calculate_overlap_ratio(
|
|
self,
|
|
box1: Tuple[float, float, float, float],
|
|
box2: Tuple[float, float, float, float]
|
|
) -> float:
|
|
"""Calculate overlap ratio of box1 with box2."""
|
|
x0_1, y0_1, x1_1, y1_1 = box1
|
|
x0_2, y0_2, x1_2, y1_2 = box2
|
|
|
|
# Calculate intersection
|
|
inter_x0 = max(x0_1, x0_2)
|
|
inter_y0 = max(y0_1, y0_2)
|
|
inter_x1 = min(x1_1, x1_2)
|
|
inter_y1 = min(y1_1, y1_2)
|
|
|
|
if inter_x0 >= inter_x1 or inter_y0 >= inter_y1:
|
|
return 0.0
|
|
|
|
inter_area = (inter_x1 - inter_x0) * (inter_y1 - inter_y0)
|
|
box1_area = (x1_1 - x0_1) * (y1_1 - y0_1)
|
|
|
|
return inter_area / box1_area if box1_area > 0 else 0.0
|
|
|
|
def rebuild_table(
|
|
self,
|
|
cell_boxes: List[List[float]],
|
|
table_bbox: List[float],
|
|
raw_ocr_regions: List[Dict[str, Any]],
|
|
original_html: str = ""
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""
|
|
Rebuild table content from cell_boxes and raw OCR regions.
|
|
|
|
This is the main entry point. It:
|
|
1. Validates cell_boxes
|
|
2. If validity ratio is low, uses pure OCR-based rebuild
|
|
3. Otherwise, uses cell_boxes + OCR hybrid rebuild
|
|
|
|
Args:
|
|
cell_boxes: List of cell bounding boxes from PP-StructureV3
|
|
table_bbox: Table bounding box [x0, y0, x1, y1]
|
|
raw_ocr_regions: List of raw OCR region dicts
|
|
original_html: Original HTML from PP-StructureV3 (for fallback)
|
|
|
|
Returns:
|
|
Tuple of (rebuilt_table_dict, rebuild_stats)
|
|
"""
|
|
stats = {
|
|
"action": "none",
|
|
"reason": "",
|
|
"original_cell_count": len(cell_boxes) if cell_boxes else 0,
|
|
"valid_cell_count": 0,
|
|
"ocr_regions_in_table": 0,
|
|
"rebuilt_rows": 0,
|
|
"rebuilt_cols": 0
|
|
}
|
|
|
|
# Step 1: Validate cell_boxes
|
|
valid_cells, validation_stats = self.validate_cell_boxes(cell_boxes, table_bbox)
|
|
stats["valid_cell_count"] = validation_stats["valid"]
|
|
stats["validation"] = validation_stats
|
|
|
|
# Step 2: Parse raw OCR regions in table area
|
|
ocr_regions = self.parse_raw_ocr_regions(raw_ocr_regions, table_bbox)
|
|
stats["ocr_regions_in_table"] = len(ocr_regions)
|
|
|
|
if not ocr_regions:
|
|
stats["action"] = "skip"
|
|
stats["reason"] = "no_ocr_regions_in_table"
|
|
return None, stats
|
|
|
|
# Step 3: Choose rebuild strategy based on cell_boxes validity
|
|
# If validity ratio is too low (< 50%), use pure OCR-based rebuild
|
|
if validation_stats["validity_ratio"] < 0.5 or len(valid_cells) < 2:
|
|
logger.info(
|
|
f"Using pure OCR-based rebuild (validity={validation_stats['validity_ratio']:.2%})"
|
|
)
|
|
return self._rebuild_from_ocr_only(ocr_regions, table_bbox, stats)
|
|
|
|
# Otherwise, use hybrid cell_boxes + OCR rebuild
|
|
return self._rebuild_with_cell_boxes(valid_cells, ocr_regions, stats, table_bbox)
|
|
|
|
def _rebuild_from_ocr_only(
|
|
self,
|
|
ocr_regions: List[OCRTextRegion],
|
|
table_bbox: List[float],
|
|
stats: Dict[str, Any]
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""
|
|
Rebuild table using only OCR regions (when cell_boxes are unreliable).
|
|
|
|
Strategy:
|
|
1. Detect column boundary from OCR x-coordinates
|
|
2. Cluster OCR regions by Y coordinate into rows
|
|
3. Split each row into left/right columns
|
|
"""
|
|
if not ocr_regions:
|
|
stats["action"] = "skip"
|
|
stats["reason"] = "no_ocr_regions"
|
|
return None, stats
|
|
|
|
# Get table bounds
|
|
table_x0, table_y0, table_x1, table_y1 = table_bbox[:4]
|
|
table_width = table_x1 - table_x0
|
|
|
|
# Step 1: Detect column split point by analyzing x-coordinates
|
|
# Look for the gap between left column (x0 < 250) and right column (x0 >= 250)
|
|
col_split_x = self._detect_column_split(ocr_regions, table_bbox)
|
|
logger.debug(f"Detected column split at x={col_split_x}")
|
|
|
|
# Step 2: Cluster OCR regions by Y coordinate into rows
|
|
# Use smaller threshold (12px) to properly separate rows
|
|
row_threshold = 12.0
|
|
sorted_ocr = sorted(ocr_regions, key=lambda r: r.center_y)
|
|
|
|
rows = []
|
|
current_row = [sorted_ocr[0]]
|
|
|
|
for ocr in sorted_ocr[1:]:
|
|
if ocr.center_y - current_row[-1].center_y <= row_threshold:
|
|
current_row.append(ocr)
|
|
else:
|
|
rows.append(current_row)
|
|
current_row = [ocr]
|
|
rows.append(current_row)
|
|
|
|
logger.debug(f"Detected {len(rows)} rows")
|
|
|
|
# Step 3: Analyze column structure
|
|
left_regions = [r for r in ocr_regions if r.x0 < col_split_x]
|
|
right_regions = [r for r in ocr_regions if r.x0 >= col_split_x]
|
|
num_cols = 2 if len(left_regions) >= 2 and len(right_regions) >= 2 else 1
|
|
|
|
# Step 4: Build cells for each row
|
|
rebuilt_cells = []
|
|
for row_idx, row_ocrs in enumerate(rows):
|
|
row_ocrs_sorted = sorted(row_ocrs, key=lambda r: r.center_x)
|
|
|
|
if num_cols == 2:
|
|
# Split into left and right columns using x0
|
|
left_ocrs = [r for r in row_ocrs_sorted if r.x0 < col_split_x]
|
|
right_ocrs = [r for r in row_ocrs_sorted if r.x0 >= col_split_x]
|
|
|
|
# Left column cell
|
|
if left_ocrs:
|
|
left_content = " ".join(r.text for r in left_ocrs)
|
|
left_bbox = [
|
|
min(r.x0 for r in left_ocrs),
|
|
min(r.y0 for r in left_ocrs),
|
|
max(r.x1 for r in left_ocrs),
|
|
max(r.y1 for r in left_ocrs)
|
|
]
|
|
rebuilt_cells.append({
|
|
"row": row_idx,
|
|
"col": 0,
|
|
"row_span": 1,
|
|
"col_span": 1,
|
|
"content": left_content,
|
|
"bbox": left_bbox
|
|
})
|
|
|
|
# Right column cell
|
|
if right_ocrs:
|
|
right_content = " ".join(r.text for r in right_ocrs)
|
|
right_bbox = [
|
|
min(r.x0 for r in right_ocrs),
|
|
min(r.y0 for r in right_ocrs),
|
|
max(r.x1 for r in right_ocrs),
|
|
max(r.y1 for r in right_ocrs)
|
|
]
|
|
rebuilt_cells.append({
|
|
"row": row_idx,
|
|
"col": 1,
|
|
"row_span": 1,
|
|
"col_span": 1,
|
|
"content": right_content,
|
|
"bbox": right_bbox
|
|
})
|
|
else:
|
|
# Single column - merge all OCR in row
|
|
row_content = " ".join(r.text for r in row_ocrs_sorted)
|
|
row_bbox = [
|
|
min(r.x0 for r in row_ocrs_sorted),
|
|
min(r.y0 for r in row_ocrs_sorted),
|
|
max(r.x1 for r in row_ocrs_sorted),
|
|
max(r.y1 for r in row_ocrs_sorted)
|
|
]
|
|
rebuilt_cells.append({
|
|
"row": row_idx,
|
|
"col": 0,
|
|
"row_span": 1,
|
|
"col_span": 1,
|
|
"content": row_content,
|
|
"bbox": row_bbox
|
|
})
|
|
|
|
num_rows = len(rows)
|
|
stats["rebuilt_rows"] = num_rows
|
|
stats["rebuilt_cols"] = num_cols
|
|
|
|
# Build result
|
|
rebuilt_table = {
|
|
"rows": num_rows,
|
|
"cols": num_cols,
|
|
"cells": rebuilt_cells,
|
|
"html": self._generate_html(rebuilt_cells, num_rows, num_cols),
|
|
"rebuild_source": "pure_ocr"
|
|
}
|
|
|
|
stats["action"] = "rebuilt"
|
|
stats["reason"] = "pure_ocr_success"
|
|
stats["rebuilt_cell_count"] = len(rebuilt_cells)
|
|
|
|
logger.info(
|
|
f"Table rebuilt (pure OCR): {num_rows}x{num_cols} with {len(rebuilt_cells)} cells"
|
|
)
|
|
|
|
return rebuilt_table, stats
|
|
|
|
def _detect_column_split(
|
|
self,
|
|
ocr_regions: List[OCRTextRegion],
|
|
table_bbox: List[float]
|
|
) -> float:
|
|
"""
|
|
Detect the column split point by analyzing x-coordinates.
|
|
|
|
For tables with left/right structure (e.g., property-value tables),
|
|
there's usually a gap between left column text and right column text.
|
|
"""
|
|
if not ocr_regions:
|
|
return (table_bbox[0] + table_bbox[2]) / 2
|
|
|
|
# Collect all x0 values (left edge of each text region)
|
|
x0_values = sorted(set(round(r.x0) for r in ocr_regions))
|
|
|
|
if len(x0_values) < 2:
|
|
return (table_bbox[0] + table_bbox[2]) / 2
|
|
|
|
# Find the largest gap between consecutive x0 values
|
|
# This usually indicates the column boundary
|
|
max_gap = 0
|
|
split_point = (table_bbox[0] + table_bbox[2]) / 2
|
|
|
|
for i in range(len(x0_values) - 1):
|
|
gap = x0_values[i + 1] - x0_values[i]
|
|
if gap > max_gap and gap > 50: # Require minimum 50px gap
|
|
max_gap = gap
|
|
split_point = (x0_values[i] + x0_values[i + 1]) / 2
|
|
|
|
# If no clear gap found, use table center
|
|
if max_gap < 50:
|
|
split_point = (table_bbox[0] + table_bbox[2]) / 2
|
|
|
|
return split_point
|
|
|
|
def _rebuild_with_cell_boxes(
|
|
self,
|
|
valid_cells: List[CellBox],
|
|
ocr_regions: List[OCRTextRegion],
|
|
stats: Dict[str, Any],
|
|
table_bbox: Optional[List[float]] = None
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""Rebuild table using cell_boxes structure + OCR content."""
|
|
# Step 3: Cluster cells into grid
|
|
row_boundaries, col_boundaries, cell_grid = self.cluster_cells_into_grid(valid_cells)
|
|
|
|
num_rows = len(row_boundaries) - 1 if len(row_boundaries) > 1 else 1
|
|
num_cols = len(col_boundaries) - 1 if len(col_boundaries) > 1 else 1
|
|
|
|
# Quality check: if hybrid produces too many columns or sparse grid, fall back to pure OCR
|
|
# A well-formed table typically has 2-5 columns. Too many columns indicates poor clustering.
|
|
total_expected_cells = num_rows * num_cols
|
|
if num_cols > 5 or total_expected_cells > 100:
|
|
logger.info(
|
|
f"Hybrid mode produced {num_rows}x{num_cols} grid (too sparse), "
|
|
f"falling back to pure OCR mode"
|
|
)
|
|
if table_bbox:
|
|
return self._rebuild_from_ocr_only(ocr_regions, table_bbox, stats)
|
|
|
|
stats["rebuilt_rows"] = num_rows
|
|
stats["rebuilt_cols"] = num_cols
|
|
|
|
# Step 4: Assign OCR text to cells
|
|
cell_ocr_map = self.assign_ocr_to_cells(
|
|
valid_cells, ocr_regions, row_boundaries, col_boundaries
|
|
)
|
|
|
|
# Step 5: Build rebuilt cells
|
|
rebuilt_cells = []
|
|
for (row, col), ocr_list in cell_ocr_map.items():
|
|
# Sort OCR regions by position (top to bottom, left to right)
|
|
sorted_ocr = sorted(ocr_list, key=lambda r: (r.center_y, r.center_x))
|
|
content = " ".join(r.text for r in sorted_ocr)
|
|
|
|
# Find the cell bbox for this position
|
|
cell_bbox = None
|
|
for cell in valid_cells:
|
|
cell_row = self._find_position(cell.y0, row_boundaries)
|
|
cell_col = self._find_position(cell.x0, col_boundaries)
|
|
if cell_row == row and cell_col == col:
|
|
cell_bbox = [cell.x0, cell.y0, cell.x1, cell.y1]
|
|
break
|
|
|
|
rebuilt_cells.append({
|
|
"row": row,
|
|
"col": col,
|
|
"row_span": 1,
|
|
"col_span": 1,
|
|
"content": content,
|
|
"bbox": cell_bbox
|
|
})
|
|
|
|
# Quality check: if too few cells have content compared to grid size, fall back to pure OCR
|
|
content_ratio = len(rebuilt_cells) / total_expected_cells if total_expected_cells > 0 else 0
|
|
if content_ratio < 0.3 and table_bbox:
|
|
logger.info(
|
|
f"Hybrid mode has low content ratio ({content_ratio:.2%}), "
|
|
f"falling back to pure OCR mode"
|
|
)
|
|
return self._rebuild_from_ocr_only(ocr_regions, table_bbox, stats)
|
|
|
|
# Build result
|
|
rebuilt_table = {
|
|
"rows": num_rows,
|
|
"cols": num_cols,
|
|
"cells": rebuilt_cells,
|
|
"html": self._generate_html(rebuilt_cells, num_rows, num_cols),
|
|
"rebuild_source": "cell_boxes_hybrid"
|
|
}
|
|
|
|
stats["action"] = "rebuilt"
|
|
stats["reason"] = "hybrid_success"
|
|
stats["rebuilt_cell_count"] = len(rebuilt_cells)
|
|
|
|
logger.info(
|
|
f"Table rebuilt (hybrid): {num_rows}x{num_cols} with {len(rebuilt_cells)} cells "
|
|
f"(from {len(ocr_regions)} OCR regions)"
|
|
)
|
|
|
|
return rebuilt_table, stats
|
|
|
|
def _generate_html(
|
|
self,
|
|
cells: List[Dict[str, Any]],
|
|
num_rows: int,
|
|
num_cols: int
|
|
) -> str:
|
|
"""Generate HTML table from rebuilt cells."""
|
|
# Create grid
|
|
grid = [[None for _ in range(num_cols)] for _ in range(num_rows)]
|
|
|
|
for cell in cells:
|
|
row, col = cell["row"], cell["col"]
|
|
if 0 <= row < num_rows and 0 <= col < num_cols:
|
|
grid[row][col] = cell["content"]
|
|
|
|
# Build HTML
|
|
html_parts = ["<html><body><table>"]
|
|
for row_idx in range(num_rows):
|
|
html_parts.append("<tr>")
|
|
for col_idx in range(num_cols):
|
|
content = grid[row_idx][col_idx] or ""
|
|
tag = "th" if row_idx == 0 else "td"
|
|
html_parts.append(f"<{tag}>{content}</{tag}>")
|
|
html_parts.append("</tr>")
|
|
html_parts.append("</table></body></html>")
|
|
|
|
return "".join(html_parts)
|
|
|
|
def should_rebuild(
|
|
self,
|
|
cell_boxes: List[List[float]],
|
|
table_bbox: List[float],
|
|
original_html: str = ""
|
|
) -> Tuple[bool, str]:
|
|
"""
|
|
Determine if table should be rebuilt based on cell_boxes validity.
|
|
|
|
Args:
|
|
cell_boxes: List of cell bounding boxes
|
|
table_bbox: Table bounding box
|
|
original_html: Original HTML from PP-StructureV3
|
|
|
|
Returns:
|
|
Tuple of (should_rebuild, reason)
|
|
"""
|
|
if not cell_boxes:
|
|
return False, "no_cell_boxes"
|
|
|
|
_, validation_stats = self.validate_cell_boxes(cell_boxes, table_bbox)
|
|
|
|
# Always rebuild if ANY cells are invalid - PP-Structure HTML often merges cells incorrectly
|
|
# even when most cell_boxes are valid
|
|
if validation_stats["invalid"] > 0:
|
|
return True, f"invalid_cells_{validation_stats['invalid']}/{validation_stats['total']}"
|
|
|
|
# Rebuild if there are boundary violations
|
|
invalid_reasons = validation_stats.get("invalid_reasons", {})
|
|
boundary_violations = (
|
|
invalid_reasons.get("y1_exceeds_table", 0) +
|
|
invalid_reasons.get("y0_above_table", 0) +
|
|
invalid_reasons.get("x1_exceeds_table", 0) +
|
|
invalid_reasons.get("x0_left_of_table", 0)
|
|
)
|
|
|
|
if boundary_violations > 0:
|
|
return True, f"boundary_violations_{boundary_violations}"
|
|
|
|
# Also rebuild to ensure OCR-based content is used instead of PP-Structure HTML
|
|
# PP-Structure's HTML often has incorrect cell merging
|
|
return True, "ocr_content_preferred"
|