Files
OCR/backend/app/services/ocr_to_unified_converter.py
egg 3ccbdb8394 fix: multi-worker translation status and OCR fallback handling
Translation status (multi-worker support):
- Add filesystem lock files (.translating) to track in-progress translations
- Check lock files in /status API when job_state not found in current worker
- Remove lock files on translation success or failure

OCR fallback fix:
- Fix empty pages when layout analysis fails but OCR succeeds
- Change 'enhanced_results' in ocr_results to ocr_results.get('enhanced_results')
- This ensures fallback to text_regions when enhanced_results is empty list

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-14 16:36:36 +08:00

1423 lines
56 KiB
Python

"""
OCR to UnifiedDocument Converter
Converts PP-StructureV3 OCR results to UnifiedDocument format, preserving
all structure information and metadata.
Includes gap filling support to supplement PP-StructureV3 output with raw OCR
regions when significant content loss is detected.
"""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import hashlib
from app.models.unified_document import (
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
BoundingBox, StyleInfo, TableData, ElementType,
ProcessingTrack, TableCell, Dimensions
)
from app.services.gap_filling_service import GapFillingService
logger = logging.getLogger(__name__)
def trim_empty_columns(table_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove empty columns from a table dictionary.
A column is considered empty if ALL cells in that column have content that is
empty or whitespace-only (using .strip() to determine emptiness).
This function:
1. Identifies columns where every cell's content is empty/whitespace
2. Removes identified empty columns
3. Updates cols/columns value
4. Recalculates each cell's col index
5. Adjusts col_span when spans cross removed columns
6. Removes cells entirely when their complete span falls within removed columns
7. Preserves original bbox (no layout drift)
Args:
table_dict: Table dictionary with keys: rows, cols/columns, cells
Returns:
Cleaned table dictionary with empty columns removed
"""
cells = table_dict.get('cells', [])
if not cells:
return table_dict
# Get original column count
original_cols = table_dict.get('cols', table_dict.get('columns', 0))
if original_cols == 0:
# Calculate from cells if not provided
max_col = 0
for cell in cells:
cell_col = cell.get('col', 0) if isinstance(cell, dict) else getattr(cell, 'col', 0)
cell_span = cell.get('col_span', 1) if isinstance(cell, dict) else getattr(cell, 'col_span', 1)
max_col = max(max_col, cell_col + cell_span)
original_cols = max_col
if original_cols == 0:
return table_dict
# Build a map: column_index -> list of cell contents
# For cells with col_span > 1, we only check their primary column
column_contents: Dict[int, List[str]] = {i: [] for i in range(original_cols)}
for cell in cells:
if isinstance(cell, dict):
col = cell.get('col', 0)
col_span = cell.get('col_span', 1)
content = cell.get('content', '')
else:
col = getattr(cell, 'col', 0)
col_span = getattr(cell, 'col_span', 1)
content = getattr(cell, 'content', '')
# Mark content for each column this cell spans
for c in range(col, min(col + col_span, original_cols)):
if c in column_contents:
column_contents[c].append(str(content).strip() if content else '')
# Identify empty columns (all content is empty/whitespace)
empty_columns = set()
for col_idx, contents in column_contents.items():
# A column is empty if ALL cells in it have empty content
# Note: If a column has no cells at all, it's considered empty
if all(c == '' for c in contents):
empty_columns.add(col_idx)
if not empty_columns:
# No empty columns to remove, just ensure cols is set
result = dict(table_dict)
if result.get('cols', result.get('columns', 0)) == 0:
result['cols'] = original_cols
if 'columns' in result:
result['columns'] = original_cols
return result
logger.debug(f"Removing empty columns: {sorted(empty_columns)} from table with {original_cols} cols")
# Build column mapping: old_col -> new_col (or None if removed)
col_mapping: Dict[int, Optional[int]] = {}
new_col = 0
for old_col in range(original_cols):
if old_col in empty_columns:
col_mapping[old_col] = None
else:
col_mapping[old_col] = new_col
new_col += 1
new_cols = new_col
# Process cells
new_cells = []
for cell in cells:
if isinstance(cell, dict):
old_col = cell.get('col', 0)
old_col_span = cell.get('col_span', 1)
else:
old_col = getattr(cell, 'col', 0)
old_col_span = getattr(cell, 'col_span', 1)
# Calculate new col and col_span
# Find the first non-removed column in this cell's span
new_start_col = None
new_end_col = None
for c in range(old_col, min(old_col + old_col_span, original_cols)):
mapped = col_mapping.get(c)
if mapped is not None:
if new_start_col is None:
new_start_col = mapped
new_end_col = mapped
# If entire span falls within removed columns, skip this cell
if new_start_col is None:
logger.debug(f"Removing cell at row={cell.get('row', 0) if isinstance(cell, dict) else cell.row}, "
f"col={old_col} (entire span in removed columns)")
continue
new_col_span = new_end_col - new_start_col + 1
# Create new cell
if isinstance(cell, dict):
new_cell = dict(cell)
new_cell['col'] = new_start_col
new_cell['col_span'] = new_col_span
else:
# Handle TableCell objects
new_cell = {
'row': cell.row,
'col': new_start_col,
'row_span': cell.row_span,
'col_span': new_col_span,
'content': cell.content
}
if hasattr(cell, 'bbox') and cell.bbox:
new_cell['bbox'] = cell.bbox
if hasattr(cell, 'style') and cell.style:
new_cell['style'] = cell.style
new_cells.append(new_cell)
# Build result
result = dict(table_dict)
result['cells'] = new_cells
result['cols'] = new_cols
if 'columns' in result:
result['columns'] = new_cols
logger.info(f"Trimmed table: {original_cols} -> {new_cols} columns, "
f"{len(cells)} -> {len(new_cells)} cells")
return result
def validate_cell_boxes(
cell_boxes: List[List[float]],
table_bbox: List[float],
page_width: float,
page_height: float,
tolerance: float = 5.0
) -> Dict[str, Any]:
"""
Validate cell_boxes coordinates against page boundaries and table bbox.
PP-StructureV3 sometimes returns cell_boxes with coordinates that exceed
page boundaries or table bbox. This function validates and clamps to valid boundaries.
Args:
cell_boxes: List of cell bounding boxes [[x0, y0, x1, y1], ...]
table_bbox: Table bounding box [x0, y0, x1, y1]
page_width: Page width in pixels
page_height: Page height in pixels
tolerance: Allowed tolerance for boundary checks (pixels)
Returns:
Dict with:
- valid: bool - whether all cell_boxes are valid
- invalid_count: int - number of invalid cell_boxes
- clamped_boxes: List - cell_boxes clamped to valid boundaries
- issues: List[str] - description of issues found
"""
if not cell_boxes:
return {'valid': True, 'invalid_count': 0, 'clamped_boxes': [], 'issues': []}
issues = []
invalid_count = 0
clamped_boxes = []
# Page boundaries with tolerance
page_min_x = -tolerance
page_min_y = -tolerance
page_max_x = page_width + tolerance
page_max_y = page_height + tolerance
# Table boundaries with tolerance (prefer clamping to table bbox)
table_min_x = table_bbox[0] - tolerance if len(table_bbox) >= 4 else page_min_x
table_min_y = table_bbox[1] - tolerance if len(table_bbox) >= 4 else page_min_y
table_max_x = table_bbox[2] + tolerance if len(table_bbox) >= 4 else page_max_x
table_max_y = table_bbox[3] + tolerance if len(table_bbox) >= 4 else page_max_y
# For clamping, use the intersection of page and expanded table bbox
clamp_min_x = max(0, table_bbox[0] - tolerance) if len(table_bbox) >= 4 else 0
clamp_min_y = max(0, table_bbox[1] - tolerance) if len(table_bbox) >= 4 else 0
clamp_max_x = min(page_width, table_bbox[2] + tolerance) if len(table_bbox) >= 4 else page_width
clamp_max_y = min(page_height, table_bbox[3] + tolerance) if len(table_bbox) >= 4 else page_height
for idx, box in enumerate(cell_boxes):
if not box or len(box) < 4:
issues.append(f"Cell {idx}: Invalid box format")
invalid_count += 1
clamped_boxes.append([0, 0, 0, 0])
continue
x0, y0, x1, y1 = box[:4]
is_valid = True
cell_issues = []
# Check if coordinates exceed page boundaries
if x0 < page_min_x:
cell_issues.append(f"x0={x0:.1f} < 0")
is_valid = False
if y0 < page_min_y:
cell_issues.append(f"y0={y0:.1f} < 0")
is_valid = False
if x1 > page_max_x:
cell_issues.append(f"x1={x1:.1f} > page_width={page_width:.1f}")
is_valid = False
if y1 > page_max_y:
cell_issues.append(f"y1={y1:.1f} > page_height={page_height:.1f}")
is_valid = False
# Check if coordinates significantly exceed table bbox (more than 20% of table size)
if len(table_bbox) >= 4:
table_w = table_bbox[2] - table_bbox[0]
table_h = table_bbox[3] - table_bbox[1]
expand_tolerance = max(tolerance, table_h * 0.2) # 20% of table height
if y0 < table_bbox[1] - expand_tolerance:
cell_issues.append(f"y0={y0:.1f} above table (table_y0={table_bbox[1]:.1f})")
is_valid = False
if y1 > table_bbox[3] + expand_tolerance:
cell_issues.append(f"y1={y1:.1f} below table (table_y1={table_bbox[3]:.1f})")
is_valid = False
if x0 < table_bbox[0] - expand_tolerance:
cell_issues.append(f"x0={x0:.1f} left of table (table_x0={table_bbox[0]:.1f})")
is_valid = False
if x1 > table_bbox[2] + expand_tolerance:
cell_issues.append(f"x1={x1:.1f} right of table (table_x1={table_bbox[2]:.1f})")
is_valid = False
# Check for inverted coordinates
if x0 > x1:
cell_issues.append(f"x0={x0:.1f} > x1={x1:.1f}")
is_valid = False
if y0 > y1:
cell_issues.append(f"y0={y0:.1f} > y1={y1:.1f}")
is_valid = False
if not is_valid:
invalid_count += 1
issues.append(f"Cell {idx}: {', '.join(cell_issues)}")
# Clamp to valid boundaries (table bbox with some tolerance)
clamped_box = [
max(clamp_min_x, min(x0, clamp_max_x)),
max(clamp_min_y, min(y0, clamp_max_y)),
max(clamp_min_x, min(x1, clamp_max_x)),
max(clamp_min_y, min(y1, clamp_max_y))
]
# Ensure proper ordering after clamping
if clamped_box[0] > clamped_box[2]:
clamped_box[0], clamped_box[2] = clamped_box[2], clamped_box[0]
if clamped_box[1] > clamped_box[3]:
clamped_box[1], clamped_box[3] = clamped_box[3], clamped_box[1]
clamped_boxes.append(clamped_box)
if invalid_count > 0:
logger.warning(
f"Cell boxes validation: {invalid_count}/{len(cell_boxes)} invalid. "
f"Page: {page_width:.0f}x{page_height:.0f}, Table bbox: {table_bbox}"
)
return {
'valid': invalid_count == 0,
'invalid_count': invalid_count,
'clamped_boxes': clamped_boxes,
'issues': issues,
'needs_fallback': invalid_count > len(cell_boxes) * 0.5 # >50% invalid = needs fallback
}
class OCRToUnifiedConverter:
"""
Converter for transforming PP-StructureV3 OCR results to UnifiedDocument format.
This converter handles:
- PP-StructureV3 parsing_res_list results
- Markdown fallback results
- Multi-page document assembly
- Metadata preservation
- Structure relationship mapping
- Gap filling with raw OCR regions (when PP-StructureV3 misses content)
"""
def __init__(self, enable_gap_filling: bool = True):
"""
Initialize the converter.
Args:
enable_gap_filling: Whether to enable gap filling with raw OCR regions
"""
self.element_counter = 0
self.gap_filling_service = GapFillingService() if enable_gap_filling else None
self.gap_filling_stats: Dict[str, Any] = {}
def convert(
self,
ocr_results: Dict[str, Any],
file_path: Path,
processing_time: float,
lang: str = 'ch'
) -> UnifiedDocument:
"""
Convert OCR results to UnifiedDocument.
Args:
ocr_results: Raw OCR results from PP-StructureV3
file_path: Original file path
processing_time: Time taken for OCR processing
lang: Language used for OCR
Returns:
UnifiedDocument with all extracted information
"""
try:
# Create document metadata
metadata = self._create_metadata(file_path, processing_time, lang)
# Extract pages from OCR results
pages = self._extract_pages(ocr_results)
# Create document ID
document_id = self._generate_document_id(file_path)
# Create UnifiedDocument
unified_doc = UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages,
processing_errors=ocr_results.get('errors', [])
)
# Post-process to establish relationships
self._establish_relationships(unified_doc)
logger.info(f"Successfully converted OCR results to UnifiedDocument: "
f"{len(pages)} pages, {self._count_elements(pages)} elements")
return unified_doc
except Exception as e:
logger.error(f"Error converting OCR results: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Return minimal document with error
return UnifiedDocument(
document_id=self._generate_document_id(file_path),
metadata=self._create_metadata(file_path, processing_time, lang),
pages=[],
processing_errors=[{
'error': str(e),
'type': 'conversion_error',
'timestamp': datetime.now().isoformat()
}]
)
def _create_metadata(
self,
file_path: Path,
processing_time: float,
lang: str
) -> DocumentMetadata:
"""Create document metadata."""
return DocumentMetadata(
filename=file_path.name,
file_type=file_path.suffix,
file_size=file_path.stat().st_size if file_path.exists() else 0,
created_at=datetime.now(),
processing_track=ProcessingTrack.OCR,
processing_time=processing_time,
language=lang
)
def _extract_pages(self, ocr_results: Dict[str, Any]) -> List[Page]:
"""
Extract pages from OCR results.
Handles both enhanced PP-StructureV3 results (with parsing_res_list)
and traditional markdown results. Applies gap filling when enabled.
Gap filling can use either:
1. overall_ocr_res from PP-StructureV3 (preferred, no extra inference)
2. Separate raw OCR text_regions (fallback)
"""
pages = []
# Extract raw OCR text regions for gap filling
# Prefer overall_ocr_res from PP-StructureV3 when available
raw_text_regions = ocr_results.get('text_regions', [])
ocr_dimensions = ocr_results.get('ocr_dimensions', {})
# Check if we have enhanced results from PPStructureEnhanced
# Note: Must check for non-empty list, not just key existence (key may exist with empty list)
if ocr_results.get('enhanced_results'):
pages = self._extract_from_enhanced_results(
ocr_results['enhanced_results'],
raw_text_regions=raw_text_regions,
ocr_dimensions=ocr_dimensions
)
# Check for traditional OCR results with text_regions at top level (from process_file_traditional)
elif ocr_results.get('text_regions'):
pages = self._extract_from_traditional_ocr(ocr_results)
# Check for traditional layout_data structure
elif 'layout_data' in ocr_results:
pages = self._extract_from_layout_data(ocr_results['layout_data'])
# Check for direct PP-StructureV3 results
elif 'pages' in ocr_results:
pages = self._extract_from_direct_results(ocr_results['pages'])
else:
logger.warning("No recognized OCR result structure found")
return pages
def _extract_from_enhanced_results(
self,
enhanced_results: List[Dict[str, Any]],
raw_text_regions: Optional[List[Dict[str, Any]]] = None,
ocr_dimensions: Optional[Dict[str, Any]] = None
) -> List[Page]:
"""
Extract pages from enhanced PP-StructureV3 results.
Applies gap filling when enabled to supplement PP-StructureV3 output
with raw OCR regions that were not detected by the layout model.
Args:
enhanced_results: PP-StructureV3 enhanced results
raw_text_regions: Raw OCR text regions for gap filling
ocr_dimensions: OCR image dimensions for coordinate alignment
"""
pages = []
for page_idx, page_result in enumerate(enhanced_results):
elements = []
# Get page dimensions first (needed for element conversion)
page_width = page_result.get('width', 0)
page_height = page_result.get('height', 0)
pp_dimensions = Dimensions(width=page_width, height=page_height)
# Process elements from parsing_res_list
if 'elements' in page_result:
for elem_data in page_result['elements']:
element = self._convert_pp3_element(
elem_data, page_idx,
page_width=page_width,
page_height=page_height
)
if element:
elements.append(element)
# Apply gap filling if enabled
# Priority: 1) overall_ocr_res from page_result, 2) raw_text_regions from separate OCR
if self.gap_filling_service:
# Check for overall_ocr_res from PP-StructureV3 (preferred, no extra inference)
page_raw_regions = page_result.get('overall_ocr_res', [])
if page_raw_regions:
logger.debug(f"Page {page_idx + 1}: Using overall_ocr_res ({len(page_raw_regions)} regions)")
elif raw_text_regions:
# Fallback to separate raw OCR regions
page_raw_regions = [
r for r in raw_text_regions
if r.get('page', 0) == page_idx or r.get('page', 1) == page_idx + 1
]
if page_raw_regions:
logger.debug(f"Page {page_idx + 1}: Using separate raw OCR ({len(page_raw_regions)} regions)")
if page_raw_regions:
supplemented, stats = self.gap_filling_service.fill_gaps(
raw_ocr_regions=page_raw_regions,
pp_structure_elements=elements,
page_number=page_idx + 1,
ocr_dimensions=ocr_dimensions,
pp_dimensions=pp_dimensions
)
# Store statistics
self.gap_filling_stats[f'page_{page_idx + 1}'] = stats
if supplemented:
logger.info(
f"Page {page_idx + 1}: Gap filling added {len(supplemented)} elements "
f"(coverage: {stats.get('coverage_ratio', 0):.2%})"
)
elements.extend(supplemented)
# Recalculate reading order for combined elements
reading_order = self.gap_filling_service.recalculate_reading_order(elements)
page_result['reading_order'] = reading_order
# Create page
page = Page(
page_number=page_idx + 1,
dimensions=pp_dimensions,
elements=elements,
metadata={
'reading_order': page_result.get('reading_order', []),
'gap_filling': self.gap_filling_stats.get(f'page_{page_idx + 1}', {})
}
)
pages.append(page)
logger.debug(f"Extracted page {page_idx + 1} with {len(elements)} elements")
return pages
def _extract_from_layout_data(
self,
layout_data: Dict[str, Any]
) -> List[Page]:
"""Extract pages from traditional layout_data structure."""
pages = []
# Get page dimensions (assuming uniform for all pages)
page_width = layout_data.get('page_width', 0)
page_height = layout_data.get('page_height', 0)
# Group elements by page
elements_by_page = {}
# Process text regions
for text_region in layout_data.get('text_regions', []):
page_num = text_region.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_text_region(text_region)
if element:
elements_by_page[page_num].append(element)
# Process images
for img_meta in layout_data.get('images_metadata', []):
page_num = img_meta.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_image_metadata(img_meta)
if element:
elements_by_page[page_num].append(element)
# Process tables
for table_data in layout_data.get('tables', []):
page_num = table_data.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_table_data(table_data)
if element:
elements_by_page[page_num].append(element)
# Create pages
max_page = max(elements_by_page.keys()) if elements_by_page else 0
for page_num in range(1, max_page + 1):
elements = elements_by_page.get(page_num, [])
# Determine reading order based on position
reading_order = self._calculate_reading_order(elements)
page = Page(
page_number=page_num,
dimensions=Dimensions(
width=page_width,
height=page_height
),
elements=elements,
metadata={'reading_order': reading_order}
)
pages.append(page)
return pages
def _extract_from_traditional_ocr(self, ocr_results: Dict[str, Any]) -> List[Page]:
"""
Extract pages from traditional OCR results (process_file_traditional).
This handles the structure where text_regions and images_metadata are at
the top level of ocr_results, not nested inside layout_data.
"""
pages = []
# Get text regions and page dimensions
text_regions = ocr_results.get('text_regions', [])
ocr_dimensions = ocr_results.get('ocr_dimensions', [])
total_pages = ocr_results.get('total_pages', 1)
# Group elements by page
elements_by_page = {}
# Process text regions
for text_region in text_regions:
page_num = text_region.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_text_region(text_region)
if element:
elements_by_page[page_num].append(element)
# Process images
for img_meta in ocr_results.get('images_metadata', []):
page_num = img_meta.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_image_metadata(img_meta)
if element:
elements_by_page[page_num].append(element)
# Process tables from layout_data if available
if 'layout_data' in ocr_results and isinstance(ocr_results['layout_data'], dict):
for table_data in ocr_results['layout_data'].get('tables', []):
page_num = table_data.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_table_data(table_data)
if element:
elements_by_page[page_num].append(element)
# Create pages
max_page = max(elements_by_page.keys()) if elements_by_page else total_pages
for page_num in range(1, max_page + 1):
elements = elements_by_page.get(page_num, [])
# Get page dimensions
# Handle both dict (single page) and list (multiple pages) formats
if isinstance(ocr_dimensions, dict):
# Single page format: {'width': W, 'height': H}
page_width = ocr_dimensions.get('width', 0)
page_height = ocr_dimensions.get('height', 0)
elif isinstance(ocr_dimensions, list):
# Multi-page format: [{'page': 1, 'width': W, 'height': H}, ...]
page_dims = next((d for d in ocr_dimensions if isinstance(d, dict) and d.get('page') == page_num), None)
if page_dims:
page_width = page_dims.get('width', 0)
page_height = page_dims.get('height', 0)
else:
page_width = 0
page_height = 0
else:
# Default dimensions if not available
page_width = 0
page_height = 0
# Determine reading order based on position
reading_order = self._calculate_reading_order(elements)
page = Page(
page_number=page_num,
dimensions=Dimensions(
width=page_width,
height=page_height
),
elements=elements,
metadata={'reading_order': reading_order}
)
pages.append(page)
return pages
def _convert_pp3_element(
self,
elem_data: Dict[str, Any],
page_idx: int,
page_width: float = 0,
page_height: float = 0
) -> Optional[DocumentElement]:
"""
Convert PP-StructureV3 element to DocumentElement.
Args:
elem_data: Element data from PP-StructureV3
page_idx: Page index (0-based)
page_width: Page width for coordinate validation
page_height: Page height for coordinate validation
"""
try:
# Extract bbox
bbox_data = elem_data.get('bbox', [0, 0, 0, 0])
bbox = BoundingBox(
x0=float(bbox_data[0]),
y0=float(bbox_data[1]),
x1=float(bbox_data[2]),
y1=float(bbox_data[3])
)
# Get element type
element_type = elem_data.get('type', ElementType.TEXT)
if isinstance(element_type, str):
# Convert string to ElementType if needed
# ElementType is a str-based enum, so we can construct from value (lowercase)
try:
element_type = ElementType(element_type)
except ValueError:
# If value doesn't match, try member name (uppercase)
element_type = ElementType[element_type.upper()] if element_type.upper() in ElementType.__members__ else ElementType.TEXT
# Content-based reclassification: detect HTML tables in text content
content_str = elem_data.get('content', '')
if isinstance(content_str, str) and '<table' in content_str.lower():
if element_type == ElementType.TEXT:
logger.info(f"Element {elem_data.get('element_id')}: Reclassifying TEXT to TABLE (HTML table in content)")
element_type = ElementType.TABLE
# Prepare content based on element type
if element_type == ElementType.TABLE:
# For tables, use TableData as content
# Priority: rebuilt_table > HTML parsing
# rebuilt_table contains clean cells without empty padding
if 'rebuilt_table' in elem_data:
rebuilt = elem_data['rebuilt_table']
# Use rebuilt cells directly - they don't include empty cells
rebuilt_cells = rebuilt.get('cells', [])
from app.models.unified_document import TableCell
table_cells = [
TableCell(
row=c.get('row', 0),
col=c.get('col', 0),
row_span=c.get('row_span', 1),
col_span=c.get('col_span', 1),
content=c.get('content', '')
)
for c in rebuilt_cells
]
table_data = TableData(
rows=rebuilt.get('rows', 0),
cols=rebuilt.get('cols', 0),
cells=table_cells,
caption=elem_data.get('extracted_text')
)
logger.info(f"[CONVERTER] Table {elem_data.get('element_id')}: Using rebuilt_table directly ({len(rebuilt_cells)} cells)")
else:
# Fallback to HTML parsing for non-rebuilt tables
table_data = self._extract_table_data(elem_data)
content = table_data if table_data else elem_data.get('content', '')
# Preserve cell_boxes and embedded_images in metadata for PDF generation
# These are extracted by PP-StructureV3 and provide accurate cell positioning
if 'cell_boxes' in elem_data:
cell_boxes = elem_data['cell_boxes']
elem_data.setdefault('metadata', {})['cell_boxes_source'] = elem_data.get('cell_boxes_source', 'table_res_list')
# Validate cell_boxes coordinates if page dimensions are available
if page_width > 0 and page_height > 0:
validation = validate_cell_boxes(
cell_boxes=cell_boxes,
table_bbox=bbox_data,
page_width=page_width,
page_height=page_height
)
if not validation['valid']:
elem_data['metadata']['cell_boxes_validation'] = {
'valid': False,
'invalid_count': validation['invalid_count'],
'total_count': len(cell_boxes),
'needs_fallback': validation['needs_fallback']
}
# Use clamped boxes instead of invalid ones
elem_data['metadata']['cell_boxes'] = validation['clamped_boxes']
elem_data['metadata']['cell_boxes_original'] = cell_boxes
if validation['needs_fallback']:
logger.warning(
f"Table {elem_data.get('element_id')}: "
f"{validation['invalid_count']}/{len(cell_boxes)} cell_boxes invalid, "
f"fallback recommended"
)
else:
elem_data['metadata']['cell_boxes'] = cell_boxes
elem_data['metadata']['cell_boxes_validation'] = {'valid': True}
else:
# No page dimensions available, store as-is
elem_data['metadata']['cell_boxes'] = cell_boxes
if 'embedded_images' in elem_data:
elem_data.setdefault('metadata', {})['embedded_images'] = elem_data['embedded_images']
# Pass through rebuild information for tables that were rebuilt
# This tells the PDF renderer to use HTML content instead of cell_boxes
logger.info(f"[CONVERTER] Table {elem_data.get('element_id')}: checking for rebuild_stats, keys={list(elem_data.keys())}")
if 'rebuild_stats' in elem_data:
elem_data.setdefault('metadata', {})['rebuild_stats'] = elem_data['rebuild_stats']
elem_data['metadata']['was_rebuilt'] = True
logger.info(f"[CONVERTER] Table {elem_data.get('element_id')}: FOUND rebuild_stats, setting was_rebuilt=True")
if 'rebuilt_table' in elem_data:
elem_data.setdefault('metadata', {})['rebuilt_table'] = elem_data['rebuilt_table']
elif element_type in [
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
ElementType.DIAGRAM, ElementType.LOGO, ElementType.STAMP
]:
# For all visual elements, use metadata dict as content
# Priority: saved_path > img_path (PP-StructureV3 uses saved_path)
image_path = (
elem_data.get('saved_path') or
elem_data.get('img_path') or
''
)
content = {
'saved_path': image_path, # Preserve original path key
'path': image_path, # For backward compatibility
'width': elem_data.get('width', 0),
'height': elem_data.get('height', 0),
'format': elem_data.get('format', 'unknown')
}
if not image_path:
logger.warning(
f"Visual element {element_type.value} missing image path: "
f"saved_path={elem_data.get('saved_path')}, img_path={elem_data.get('img_path')}"
)
else:
content = elem_data.get('content', '')
# Create element
element = DocumentElement(
element_id=elem_data.get('element_id', f"elem_{self.element_counter}"),
type=element_type,
content=content,
bbox=bbox,
confidence=elem_data.get('confidence', 1.0),
metadata=elem_data.get('metadata', {})
)
# Add style info if available
if 'style' in elem_data:
element.style = self._extract_style_info(elem_data['style'])
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert PP3 element: {e}")
return None
def _convert_text_region(
self,
text_region: Dict[str, Any]
) -> Optional[DocumentElement]:
"""Convert text region to DocumentElement."""
try:
# Extract bbox (handle both formats: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] or [x0, y0, x1, y1])
bbox_data = text_region.get('bbox', [0, 0, 0, 0])
if isinstance(bbox_data, list) and len(bbox_data) == 4:
if isinstance(bbox_data[0], list):
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
x0 = float(bbox_data[0][0])
y0 = float(bbox_data[0][1])
x1 = float(bbox_data[2][0])
y1 = float(bbox_data[2][1])
else:
# Simple format: [x0, y0, x1, y1]
x0 = float(bbox_data[0])
y0 = float(bbox_data[1])
x1 = float(bbox_data[2])
y1 = float(bbox_data[3])
else:
x0 = y0 = x1 = y1 = 0
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
element = DocumentElement(
element_id=f"text_{self.element_counter}",
type=ElementType.TEXT,
content=text_region.get('text', ''),
bbox=bbox,
confidence=text_region.get('confidence', 1.0),
metadata={'page': text_region.get('page', 1)}
)
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert text region: {e}")
return None
def _convert_image_metadata(
self,
img_meta: Dict[str, Any]
) -> Optional[DocumentElement]:
"""Convert image metadata to DocumentElement."""
try:
# Extract bbox (handle both formats)
bbox_data = img_meta.get('bbox', [0, 0, 0, 0])
if isinstance(bbox_data, list) and len(bbox_data) == 4:
if isinstance(bbox_data[0], list):
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
x0 = float(bbox_data[0][0])
y0 = float(bbox_data[0][1])
x1 = float(bbox_data[2][0])
y1 = float(bbox_data[2][1])
else:
# Simple format: [x0, y0, x1, y1]
x0 = float(bbox_data[0])
y0 = float(bbox_data[1])
x1 = float(bbox_data[2])
y1 = float(bbox_data[3])
else:
x0 = y0 = x1 = y1 = 0
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
# Create image content dict
image_content = {
'path': img_meta.get('path', ''),
'width': img_meta.get('width', 0),
'height': img_meta.get('height', 0),
'format': img_meta.get('format', 'unknown')
}
element = DocumentElement(
element_id=f"img_{self.element_counter}",
type=ElementType.IMAGE,
content=image_content,
bbox=bbox,
metadata={'page': img_meta.get('page', 1)}
)
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert image metadata: {e}")
return None
def _convert_table_data(
self,
table_dict: Dict[str, Any]
) -> Optional[DocumentElement]:
"""Convert table data to DocumentElement."""
try:
# Clean up empty columns before building TableData
table_dict = trim_empty_columns(table_dict)
# Extract bbox
bbox_data = table_dict.get('bbox', [0, 0, 0, 0])
bbox = BoundingBox(
x0=float(bbox_data[0]),
y0=float(bbox_data[1]),
x1=float(bbox_data[2]),
y1=float(bbox_data[3])
)
# Create table data
# Note: TableData uses 'cols' not 'columns', and doesn't have 'html' field
# HTML content is stored in metadata instead
raw_cells = table_dict.get('cells', [])
table_cells = []
# Convert raw cells to TableCell objects if needed
for cell_data in raw_cells:
if isinstance(cell_data, dict):
from app.models.unified_document import TableCell
table_cells.append(TableCell(
row=cell_data.get('row', 0),
col=cell_data.get('col', 0),
row_span=cell_data.get('row_span', 1),
col_span=cell_data.get('col_span', 1),
content=cell_data.get('content', '')
))
table_data = TableData(
rows=table_dict.get('rows', 0),
cols=table_dict.get('columns', table_dict.get('cols', 0)),
cells=table_cells,
caption=table_dict.get('caption')
)
element = DocumentElement(
element_id=f"table_{self.element_counter}",
type=ElementType.TABLE,
content=table_data, # Use TableData object as content
bbox=bbox,
metadata={'page': table_dict.get('page', 1), 'extracted_text': table_dict.get('extracted_text', '')}
)
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert table data: {e}")
return None
def _extract_table_data(self, elem_data: Dict) -> Optional[TableData]:
"""
Extract table data from element using BeautifulSoup for robust HTML parsing.
This method produces TableData objects with fully populated cells arrays,
matching the format produced by DirectExtractionEngine for consistency.
"""
try:
html = elem_data.get('html', '')
extracted_text = elem_data.get('extracted_text', '')
# Fallback: check content field for HTML table if html field is empty
if not html:
content = elem_data.get('content', '')
if isinstance(content, str) and '<table' in content.lower():
html = content
logger.debug("Using content field as HTML table source")
# Return None if no HTML table content
if not html or '<table' not in html.lower():
if extracted_text:
# Return minimal TableData with just caption if we have text
return TableData(rows=0, cols=0, cells=[], caption=extracted_text)
return None
# Parse HTML table using BeautifulSoup
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
table = soup.find('table')
if not table:
logger.warning("No <table> element found in HTML")
return self._fallback_table_data(html, extracted_text)
cells = []
headers = []
rows = table.find_all('tr')
num_rows = len(rows)
# First pass: calculate total columns by finding max column extent
# Track cells that span multiple rows: occupied[row][col] = True
occupied: Dict[int, Dict[int, bool]] = {r: {} for r in range(num_rows)}
# Parse all cells with proper rowspan/colspan handling
for row_idx, row in enumerate(rows):
row_cells = row.find_all(['td', 'th'])
col_idx = 0
for cell in row_cells:
# Skip columns that are occupied by rowspan from previous rows
while occupied[row_idx].get(col_idx, False):
col_idx += 1
cell_content = cell.get_text(strip=True)
rowspan = int(cell.get('rowspan', 1))
colspan = int(cell.get('colspan', 1))
cells.append(TableCell(
row=row_idx,
col=col_idx,
row_span=rowspan,
col_span=colspan,
content=cell_content
))
# Collect headers from <th> elements or first row
if cell.name == 'th' or row_idx == 0:
headers.append(cell_content)
# Mark cells as occupied for rowspan/colspan
for r in range(row_idx, min(row_idx + rowspan, num_rows)):
for c in range(col_idx, col_idx + colspan):
if r not in occupied:
occupied[r] = {}
occupied[r][c] = True
# Advance column index by colspan
col_idx += colspan
# Calculate actual column count from occupied cells
num_cols = 0
for r in range(num_rows):
if occupied[r]:
max_col_in_row = max(occupied[r].keys()) + 1
num_cols = max(num_cols, max_col_in_row)
logger.debug(
f"Parsed HTML table: {num_rows} rows, {num_cols} cols, {len(cells)} cells"
)
# Build table dict for cleanup
table_dict = {
'rows': num_rows,
'cols': num_cols,
'cells': [
{
'row': c.row,
'col': c.col,
'row_span': c.row_span,
'col_span': c.col_span,
'content': c.content
}
for c in cells
],
'headers': headers if headers else None,
'caption': extracted_text if extracted_text else None
}
# Clean up empty columns
table_dict = trim_empty_columns(table_dict)
# Convert cleaned cells back to TableCell objects
cleaned_cells = [
TableCell(
row=c['row'],
col=c['col'],
row_span=c.get('row_span', 1),
col_span=c.get('col_span', 1),
content=c.get('content', '')
)
for c in table_dict.get('cells', [])
]
return TableData(
rows=table_dict.get('rows', num_rows),
cols=table_dict.get('cols', num_cols),
cells=cleaned_cells,
headers=table_dict.get('headers'),
caption=table_dict.get('caption')
)
except ImportError:
logger.warning("BeautifulSoup not available, using fallback parsing")
return self._fallback_table_data(html, extracted_text)
except Exception as e:
logger.warning(f"Failed to extract table data: {e}")
return None
def _fallback_table_data(self, html: str, extracted_text: str = '') -> Optional[TableData]:
"""
Fallback table parsing when BeautifulSoup is not available.
Returns basic TableData with row/col counts only (no cells).
"""
try:
rows = html.count('<tr')
cols = 0
if rows > 0:
first_row_end = html.find('</tr>')
if first_row_end > 0:
first_row = html[:first_row_end]
cols = first_row.count('<td') + first_row.count('<th')
if rows == 0 and cols == 0 and not extracted_text:
return None
return TableData(
rows=rows,
cols=cols,
cells=[], # Empty cells in fallback mode
caption=extracted_text if extracted_text else None
)
except:
return None
def _extract_style_info(self, style_data: Dict) -> Optional[StyleInfo]:
"""Extract style info from element."""
try:
return StyleInfo(
font_family=style_data.get('font_family'),
font_size=style_data.get('font_size'),
font_weight=style_data.get('font_weight'),
font_style=style_data.get('font_style'),
text_color=style_data.get('text_color'),
background_color=style_data.get('background_color'),
alignment=style_data.get('alignment')
)
except:
return None
def _calculate_reading_order(self, elements: List[DocumentElement]) -> List[int]:
"""Calculate reading order based on element positions."""
if not elements:
return []
# Create indexed elements with position
indexed_elements = []
for i, elem in enumerate(elements):
# Use top-left corner for sorting
indexed_elements.append((
i,
elem.bbox.y1, # y coordinate (top to bottom)
elem.bbox.x1 # x coordinate (left to right)
))
# Sort by y first (top to bottom), then x (left to right)
indexed_elements.sort(key=lambda x: (x[1], x[2]))
# Return the sorted indices
return [idx for idx, _, _ in indexed_elements]
def _establish_relationships(self, doc: UnifiedDocument):
"""
Establish relationships between elements.
This includes:
- Linking captions to figures/tables
- Grouping list items
- Identifying headers and their content
"""
for page in doc.pages:
# Link captions to nearest figure/table
self._link_captions(page.elements)
# Group consecutive list items
self._group_list_items(page.elements)
# Link headers to content
self._link_headers(page.elements)
# Update metadata based on content
self._update_metadata(doc)
def _link_captions(self, elements: List[DocumentElement]):
"""Link caption elements to their associated figures/tables."""
captions = [e for e in elements if e.type in [ElementType.CAPTION, ElementType.TABLE_CAPTION]]
targets = [e for e in elements if e.type in [ElementType.FIGURE, ElementType.TABLE, ElementType.IMAGE]]
for caption in captions:
if not targets:
break
# Find nearest target above the caption
best_target = None
min_distance = float('inf')
for target in targets:
# Caption should be below the target (y1 is bottom in BoundingBox)
if target.bbox.y1 <= caption.bbox.y0:
distance = caption.bbox.y0 - target.bbox.y1
if distance < min_distance:
min_distance = distance
best_target = target
if best_target and min_distance < 50: # Within 50 pixels
caption.metadata['linked_to'] = best_target.element_id
best_target.metadata['caption_id'] = caption.element_id
def _group_list_items(self, elements: List[DocumentElement]):
"""Group consecutive list items."""
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
if not list_items:
return
# Sort by position
list_items.sort(key=lambda e: (e.bbox.y1, e.bbox.x1))
# Group consecutive items
current_group = []
groups = []
for i, item in enumerate(list_items):
if i == 0:
current_group = [item]
else:
prev_item = list_items[i-1]
# Check if items are consecutive (similar x position, reasonable y gap)
x_aligned = abs(item.bbox.x0 - prev_item.bbox.x0) < 20
y_consecutive = (item.bbox.y0 - prev_item.bbox.y1) < 30
if x_aligned and y_consecutive:
current_group.append(item)
else:
if current_group:
groups.append(current_group)
current_group = [item]
if current_group:
groups.append(current_group)
# Mark groups in metadata
for group_idx, group in enumerate(groups):
group_id = f"list_group_{group_idx}"
for item_idx, item in enumerate(group):
item.metadata['list_group'] = group_id
item.metadata['list_index'] = item_idx
def _link_headers(self, elements: List[DocumentElement]):
"""Link headers to their content sections."""
headers = [e for e in elements if e.type in [ElementType.HEADER, ElementType.TITLE]]
for i, header in enumerate(headers):
# Find content between this header and the next
next_header_y = float('inf')
if i + 1 < len(headers):
next_header_y = headers[i + 1].bbox.y1
# Find all elements between headers (y0=top, y1=bottom)
content_elements = [
e for e in elements
if (e.bbox.y0 > header.bbox.y1 and
e.bbox.y0 < next_header_y and
e.type not in [ElementType.HEADER, ElementType.TITLE])
]
if content_elements:
header.metadata['content_elements'] = [e.element_id for e in content_elements]
for elem in content_elements:
elem.metadata['header_id'] = header.element_id
def _update_metadata(self, doc: UnifiedDocument):
"""Update document metadata based on extracted content."""
# For now, just ensure basic metadata is present.
# Since DocumentMetadata doesn't have all these fields,
# we can store summary data at the document level or in processing_errors
pass
def _generate_document_id(self, file_path: Path) -> str:
"""Generate unique document ID."""
content = f"{file_path.name}_{datetime.now().isoformat()}"
return hashlib.md5(content.encode()).hexdigest()
def _detect_mime_type(self, file_path: Path) -> str:
"""Detect MIME type of file."""
try:
import magic
return magic.from_file(str(file_path), mime=True)
except:
# Fallback to extension-based detection
ext = file_path.suffix.lower()
mime_map = {
'.pdf': 'application/pdf',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg'
}
return mime_map.get(ext, 'application/octet-stream')
def _count_elements(self, pages: List[Page]) -> int:
"""Count total elements across all pages."""
return sum(len(page.elements) for page in pages)
def _extract_from_direct_results(
self,
pages_data: List[Dict[str, Any]]
) -> List[Page]:
"""Extract pages from direct PP-StructureV3 results."""
pages = []
for page_idx, page_data in enumerate(pages_data):
elements = []
# Get page dimensions first
page_width = page_data.get('width', 0)
page_height = page_data.get('height', 0)
# Process each element in the page
if 'elements' in page_data:
for elem_data in page_data['elements']:
element = self._convert_pp3_element(
elem_data, page_idx,
page_width=page_width,
page_height=page_height
)
if element:
elements.append(element)
# Create page
page = Page(
page_number=page_idx + 1,
dimensions=Dimensions(
width=page_width,
height=page_height
),
elements=elements,
metadata={'reading_order': self._calculate_reading_order(elements)}
)
pages.append(page)
return pages