Files
OCR/backend/app/services/ocr_to_unified_converter.py
egg fa9b542b06 fix: improve OCR track multi-line text rendering and HTML table detection
Multi-line text rendering (pdf_generator_service.py):
- Calculate font size by dividing bbox height by number of lines
- Start Y coordinate from bbox TOP instead of bottom
- Use non_empty_lines for proper line positioning

HTML table detection:
- pp_structure_enhanced.py: Detect HTML tables in 'text' type content
  and reclassify to TABLE when <table tag found
- pdf_generator_service.py: Content-based reclassification from TEXT
  to TABLE during UnifiedDocument parsing
- ocr_to_unified_converter.py: Fallback to check 'content' field for
  HTML tables when 'html' field is empty

Known issue: OCR processing still has quality issues that need further
investigation and fixes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 16:09:31 +08:00

792 lines
29 KiB
Python

"""
OCR to UnifiedDocument Converter
Converts PP-StructureV3 OCR results to UnifiedDocument format, preserving
all structure information and metadata.
"""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import hashlib
from app.models.unified_document import (
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
BoundingBox, StyleInfo, TableData, ElementType,
ProcessingTrack, TableCell, Dimensions
)
logger = logging.getLogger(__name__)
class OCRToUnifiedConverter:
"""
Converter for transforming PP-StructureV3 OCR results to UnifiedDocument format.
This converter handles:
- PP-StructureV3 parsing_res_list results
- Markdown fallback results
- Multi-page document assembly
- Metadata preservation
- Structure relationship mapping
"""
def __init__(self):
"""Initialize the converter."""
self.element_counter = 0
def convert(
self,
ocr_results: Dict[str, Any],
file_path: Path,
processing_time: float,
lang: str = 'ch'
) -> UnifiedDocument:
"""
Convert OCR results to UnifiedDocument.
Args:
ocr_results: Raw OCR results from PP-StructureV3
file_path: Original file path
processing_time: Time taken for OCR processing
lang: Language used for OCR
Returns:
UnifiedDocument with all extracted information
"""
try:
# Create document metadata
metadata = self._create_metadata(file_path, processing_time, lang)
# Extract pages from OCR results
pages = self._extract_pages(ocr_results)
# Create document ID
document_id = self._generate_document_id(file_path)
# Create UnifiedDocument
unified_doc = UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages,
processing_errors=ocr_results.get('errors', [])
)
# Post-process to establish relationships
self._establish_relationships(unified_doc)
logger.info(f"Successfully converted OCR results to UnifiedDocument: "
f"{len(pages)} pages, {self._count_elements(pages)} elements")
return unified_doc
except Exception as e:
logger.error(f"Error converting OCR results: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Return minimal document with error
return UnifiedDocument(
document_id=self._generate_document_id(file_path),
metadata=self._create_metadata(file_path, processing_time, lang),
pages=[],
processing_errors=[{
'error': str(e),
'type': 'conversion_error',
'timestamp': datetime.now().isoformat()
}]
)
def _create_metadata(
self,
file_path: Path,
processing_time: float,
lang: str
) -> DocumentMetadata:
"""Create document metadata."""
return DocumentMetadata(
filename=file_path.name,
file_type=file_path.suffix,
file_size=file_path.stat().st_size if file_path.exists() else 0,
created_at=datetime.now(),
processing_track=ProcessingTrack.OCR,
processing_time=processing_time,
language=lang
)
def _extract_pages(self, ocr_results: Dict[str, Any]) -> List[Page]:
"""
Extract pages from OCR results.
Handles both enhanced PP-StructureV3 results (with parsing_res_list)
and traditional markdown results.
"""
pages = []
# Check if we have enhanced results from PPStructureEnhanced
if 'enhanced_results' in ocr_results:
pages = self._extract_from_enhanced_results(ocr_results['enhanced_results'])
# Check for traditional OCR results with text_regions at top level (from process_file_traditional)
elif 'text_regions' in ocr_results:
pages = self._extract_from_traditional_ocr(ocr_results)
# Check for traditional layout_data structure
elif 'layout_data' in ocr_results:
pages = self._extract_from_layout_data(ocr_results['layout_data'])
# Check for direct PP-StructureV3 results
elif 'pages' in ocr_results:
pages = self._extract_from_direct_results(ocr_results['pages'])
else:
logger.warning("No recognized OCR result structure found")
return pages
def _extract_from_enhanced_results(
self,
enhanced_results: List[Dict[str, Any]]
) -> List[Page]:
"""Extract pages from enhanced PP-StructureV3 results."""
pages = []
for page_idx, page_result in enumerate(enhanced_results):
elements = []
# Process elements from parsing_res_list
if 'elements' in page_result:
for elem_data in page_result['elements']:
element = self._convert_pp3_element(elem_data, page_idx)
if element:
elements.append(element)
# Create page
page = Page(
page_number=page_idx + 1,
dimensions=Dimensions(
width=page_result.get('width', 0),
height=page_result.get('height', 0)
),
elements=elements,
metadata={'reading_order': page_result.get('reading_order', [])}
)
pages.append(page)
logger.debug(f"Extracted page {page_idx + 1} with {len(elements)} elements")
return pages
def _extract_from_layout_data(
self,
layout_data: Dict[str, Any]
) -> List[Page]:
"""Extract pages from traditional layout_data structure."""
pages = []
# Get page dimensions (assuming uniform for all pages)
page_width = layout_data.get('page_width', 0)
page_height = layout_data.get('page_height', 0)
# Group elements by page
elements_by_page = {}
# Process text regions
for text_region in layout_data.get('text_regions', []):
page_num = text_region.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_text_region(text_region)
if element:
elements_by_page[page_num].append(element)
# Process images
for img_meta in layout_data.get('images_metadata', []):
page_num = img_meta.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_image_metadata(img_meta)
if element:
elements_by_page[page_num].append(element)
# Process tables
for table_data in layout_data.get('tables', []):
page_num = table_data.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_table_data(table_data)
if element:
elements_by_page[page_num].append(element)
# Create pages
max_page = max(elements_by_page.keys()) if elements_by_page else 0
for page_num in range(1, max_page + 1):
elements = elements_by_page.get(page_num, [])
# Determine reading order based on position
reading_order = self._calculate_reading_order(elements)
page = Page(
page_number=page_num,
dimensions=Dimensions(
width=page_width,
height=page_height
),
elements=elements,
metadata={'reading_order': reading_order}
)
pages.append(page)
return pages
def _extract_from_traditional_ocr(self, ocr_results: Dict[str, Any]) -> List[Page]:
"""
Extract pages from traditional OCR results (process_file_traditional).
This handles the structure where text_regions and images_metadata are at
the top level of ocr_results, not nested inside layout_data.
"""
pages = []
# Get text regions and page dimensions
text_regions = ocr_results.get('text_regions', [])
ocr_dimensions = ocr_results.get('ocr_dimensions', [])
total_pages = ocr_results.get('total_pages', 1)
# Group elements by page
elements_by_page = {}
# Process text regions
for text_region in text_regions:
page_num = text_region.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_text_region(text_region)
if element:
elements_by_page[page_num].append(element)
# Process images
for img_meta in ocr_results.get('images_metadata', []):
page_num = img_meta.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_image_metadata(img_meta)
if element:
elements_by_page[page_num].append(element)
# Process tables from layout_data if available
if 'layout_data' in ocr_results and isinstance(ocr_results['layout_data'], dict):
for table_data in ocr_results['layout_data'].get('tables', []):
page_num = table_data.get('page', 1)
if page_num not in elements_by_page:
elements_by_page[page_num] = []
element = self._convert_table_data(table_data)
if element:
elements_by_page[page_num].append(element)
# Create pages
max_page = max(elements_by_page.keys()) if elements_by_page else total_pages
for page_num in range(1, max_page + 1):
elements = elements_by_page.get(page_num, [])
# Get page dimensions
# Handle both dict (single page) and list (multiple pages) formats
if isinstance(ocr_dimensions, dict):
# Single page format: {'width': W, 'height': H}
page_width = ocr_dimensions.get('width', 0)
page_height = ocr_dimensions.get('height', 0)
elif isinstance(ocr_dimensions, list):
# Multi-page format: [{'page': 1, 'width': W, 'height': H}, ...]
page_dims = next((d for d in ocr_dimensions if isinstance(d, dict) and d.get('page') == page_num), None)
if page_dims:
page_width = page_dims.get('width', 0)
page_height = page_dims.get('height', 0)
else:
page_width = 0
page_height = 0
else:
# Default dimensions if not available
page_width = 0
page_height = 0
# Determine reading order based on position
reading_order = self._calculate_reading_order(elements)
page = Page(
page_number=page_num,
dimensions=Dimensions(
width=page_width,
height=page_height
),
elements=elements,
metadata={'reading_order': reading_order}
)
pages.append(page)
return pages
def _convert_pp3_element(
self,
elem_data: Dict[str, Any],
page_idx: int
) -> Optional[DocumentElement]:
"""Convert PP-StructureV3 element to DocumentElement."""
try:
# Extract bbox
bbox_data = elem_data.get('bbox', [0, 0, 0, 0])
bbox = BoundingBox(
x0=float(bbox_data[0]),
y0=float(bbox_data[1]),
x1=float(bbox_data[2]),
y1=float(bbox_data[3])
)
# Get element type
element_type = elem_data.get('type', ElementType.TEXT)
if isinstance(element_type, str):
# Convert string to ElementType if needed
element_type = ElementType[element_type] if element_type in ElementType.__members__ else ElementType.TEXT
# Prepare content based on element type
if element_type == ElementType.TABLE:
# For tables, use TableData as content
table_data = self._extract_table_data(elem_data)
content = table_data if table_data else elem_data.get('content', '')
elif element_type in [ElementType.IMAGE, ElementType.FIGURE]:
# For images, use metadata dict as content
content = {
'path': elem_data.get('img_path', ''),
'width': elem_data.get('width', 0),
'height': elem_data.get('height', 0),
'format': elem_data.get('format', 'unknown')
}
else:
content = elem_data.get('content', '')
# Create element
element = DocumentElement(
element_id=elem_data.get('element_id', f"elem_{self.element_counter}"),
type=element_type,
content=content,
bbox=bbox,
confidence=elem_data.get('confidence', 1.0),
metadata=elem_data.get('metadata', {})
)
# Add style info if available
if 'style' in elem_data:
element.style = self._extract_style_info(elem_data['style'])
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert PP3 element: {e}")
return None
def _convert_text_region(
self,
text_region: Dict[str, Any]
) -> Optional[DocumentElement]:
"""Convert text region to DocumentElement."""
try:
# Extract bbox (handle both formats: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] or [x0, y0, x1, y1])
bbox_data = text_region.get('bbox', [0, 0, 0, 0])
if isinstance(bbox_data, list) and len(bbox_data) == 4:
if isinstance(bbox_data[0], list):
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
x0 = float(bbox_data[0][0])
y0 = float(bbox_data[0][1])
x1 = float(bbox_data[2][0])
y1 = float(bbox_data[2][1])
else:
# Simple format: [x0, y0, x1, y1]
x0 = float(bbox_data[0])
y0 = float(bbox_data[1])
x1 = float(bbox_data[2])
y1 = float(bbox_data[3])
else:
x0 = y0 = x1 = y1 = 0
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
element = DocumentElement(
element_id=f"text_{self.element_counter}",
type=ElementType.TEXT,
content=text_region.get('text', ''),
bbox=bbox,
confidence=text_region.get('confidence', 1.0),
metadata={'page': text_region.get('page', 1)}
)
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert text region: {e}")
return None
def _convert_image_metadata(
self,
img_meta: Dict[str, Any]
) -> Optional[DocumentElement]:
"""Convert image metadata to DocumentElement."""
try:
# Extract bbox (handle both formats)
bbox_data = img_meta.get('bbox', [0, 0, 0, 0])
if isinstance(bbox_data, list) and len(bbox_data) == 4:
if isinstance(bbox_data[0], list):
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
x0 = float(bbox_data[0][0])
y0 = float(bbox_data[0][1])
x1 = float(bbox_data[2][0])
y1 = float(bbox_data[2][1])
else:
# Simple format: [x0, y0, x1, y1]
x0 = float(bbox_data[0])
y0 = float(bbox_data[1])
x1 = float(bbox_data[2])
y1 = float(bbox_data[3])
else:
x0 = y0 = x1 = y1 = 0
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
# Create image content dict
image_content = {
'path': img_meta.get('path', ''),
'width': img_meta.get('width', 0),
'height': img_meta.get('height', 0),
'format': img_meta.get('format', 'unknown')
}
element = DocumentElement(
element_id=f"img_{self.element_counter}",
type=ElementType.IMAGE,
content=image_content,
bbox=bbox,
metadata={'page': img_meta.get('page', 1)}
)
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert image metadata: {e}")
return None
def _convert_table_data(
self,
table_dict: Dict[str, Any]
) -> Optional[DocumentElement]:
"""Convert table data to DocumentElement."""
try:
# Extract bbox
bbox_data = table_dict.get('bbox', [0, 0, 0, 0])
bbox = BoundingBox(
x0=float(bbox_data[0]),
y0=float(bbox_data[1]),
x1=float(bbox_data[2]),
y1=float(bbox_data[3])
)
# Create table data
# Note: TableData uses 'cols' not 'columns', and doesn't have 'html' field
# HTML content is stored in metadata instead
raw_cells = table_dict.get('cells', [])
table_cells = []
# Convert raw cells to TableCell objects if needed
for cell_data in raw_cells:
if isinstance(cell_data, dict):
from app.models.unified_document import TableCell
table_cells.append(TableCell(
row=cell_data.get('row', 0),
col=cell_data.get('col', 0),
row_span=cell_data.get('row_span', 1),
col_span=cell_data.get('col_span', 1),
content=cell_data.get('content', '')
))
table_data = TableData(
rows=table_dict.get('rows', 0),
cols=table_dict.get('columns', table_dict.get('cols', 0)),
cells=table_cells,
caption=table_dict.get('caption')
)
element = DocumentElement(
element_id=f"table_{self.element_counter}",
type=ElementType.TABLE,
content=table_data, # Use TableData object as content
bbox=bbox,
metadata={'page': table_dict.get('page', 1), 'extracted_text': table_dict.get('extracted_text', '')}
)
self.element_counter += 1
return element
except Exception as e:
logger.warning(f"Failed to convert table data: {e}")
return None
def _extract_table_data(self, elem_data: Dict) -> Optional[TableData]:
"""Extract table data from element."""
try:
html = elem_data.get('html', '')
extracted_text = elem_data.get('extracted_text', '')
# Fallback: check content field for HTML table if html field is empty
if not html:
content = elem_data.get('content', '')
if isinstance(content, str) and '<table' in content.lower():
html = content
logger.debug("Using content field as HTML table source")
# Try to parse HTML to get rows and columns
rows = 0
cols = 0
cells = []
if html:
# Simple HTML parsing (could be enhanced with BeautifulSoup)
rows = html.count('<tr')
if rows > 0:
# Estimate columns from first row
first_row_end = html.find('</tr>')
if first_row_end > 0:
first_row = html[:first_row_end]
cols = first_row.count('<td') + first_row.count('<th')
# Return None if no valid table data found
if rows == 0 and cols == 0 and not extracted_text:
return None
# Note: TableData uses 'cols' not 'columns'
# HTML content can be stored as caption or in element metadata
return TableData(
rows=rows,
cols=cols,
cells=cells,
caption=extracted_text if extracted_text else None
)
except:
return None
def _extract_style_info(self, style_data: Dict) -> Optional[StyleInfo]:
"""Extract style info from element."""
try:
return StyleInfo(
font_family=style_data.get('font_family'),
font_size=style_data.get('font_size'),
font_weight=style_data.get('font_weight'),
font_style=style_data.get('font_style'),
text_color=style_data.get('text_color'),
background_color=style_data.get('background_color'),
alignment=style_data.get('alignment')
)
except:
return None
def _calculate_reading_order(self, elements: List[DocumentElement]) -> List[int]:
"""Calculate reading order based on element positions."""
if not elements:
return []
# Create indexed elements with position
indexed_elements = []
for i, elem in enumerate(elements):
# Use top-left corner for sorting
indexed_elements.append((
i,
elem.bbox.y1, # y coordinate (top to bottom)
elem.bbox.x1 # x coordinate (left to right)
))
# Sort by y first (top to bottom), then x (left to right)
indexed_elements.sort(key=lambda x: (x[1], x[2]))
# Return the sorted indices
return [idx for idx, _, _ in indexed_elements]
def _establish_relationships(self, doc: UnifiedDocument):
"""
Establish relationships between elements.
This includes:
- Linking captions to figures/tables
- Grouping list items
- Identifying headers and their content
"""
for page in doc.pages:
# Link captions to nearest figure/table
self._link_captions(page.elements)
# Group consecutive list items
self._group_list_items(page.elements)
# Link headers to content
self._link_headers(page.elements)
# Update metadata based on content
self._update_metadata(doc)
def _link_captions(self, elements: List[DocumentElement]):
"""Link caption elements to their associated figures/tables."""
captions = [e for e in elements if e.type in [ElementType.CAPTION, ElementType.TABLE_CAPTION]]
targets = [e for e in elements if e.type in [ElementType.FIGURE, ElementType.TABLE, ElementType.IMAGE]]
for caption in captions:
if not targets:
break
# Find nearest target above the caption
best_target = None
min_distance = float('inf')
for target in targets:
# Caption should be below the target
if target.bbox.y2 <= caption.bbox.y1:
distance = caption.bbox.y1 - target.bbox.y2
if distance < min_distance:
min_distance = distance
best_target = target
if best_target and min_distance < 50: # Within 50 pixels
caption.metadata['linked_to'] = best_target.element_id
best_target.metadata['caption_id'] = caption.element_id
def _group_list_items(self, elements: List[DocumentElement]):
"""Group consecutive list items."""
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
if not list_items:
return
# Sort by position
list_items.sort(key=lambda e: (e.bbox.y1, e.bbox.x1))
# Group consecutive items
current_group = []
groups = []
for i, item in enumerate(list_items):
if i == 0:
current_group = [item]
else:
prev_item = list_items[i-1]
# Check if items are consecutive (similar x position, reasonable y gap)
x_aligned = abs(item.bbox.x1 - prev_item.bbox.x1) < 20
y_consecutive = (item.bbox.y1 - prev_item.bbox.y2) < 30
if x_aligned and y_consecutive:
current_group.append(item)
else:
if current_group:
groups.append(current_group)
current_group = [item]
if current_group:
groups.append(current_group)
# Mark groups in metadata
for group_idx, group in enumerate(groups):
group_id = f"list_group_{group_idx}"
for item_idx, item in enumerate(group):
item.metadata['list_group'] = group_id
item.metadata['list_index'] = item_idx
def _link_headers(self, elements: List[DocumentElement]):
"""Link headers to their content sections."""
headers = [e for e in elements if e.type in [ElementType.HEADER, ElementType.TITLE]]
for i, header in enumerate(headers):
# Find content between this header and the next
next_header_y = float('inf')
if i + 1 < len(headers):
next_header_y = headers[i + 1].bbox.y1
# Find all elements between headers
content_elements = [
e for e in elements
if (e.bbox.y1 > header.bbox.y2 and
e.bbox.y1 < next_header_y and
e.type not in [ElementType.HEADER, ElementType.TITLE])
]
if content_elements:
header.metadata['content_elements'] = [e.element_id for e in content_elements]
for elem in content_elements:
elem.metadata['header_id'] = header.element_id
def _update_metadata(self, doc: UnifiedDocument):
"""Update document metadata based on extracted content."""
# For now, just ensure basic metadata is present.
# Since DocumentMetadata doesn't have all these fields,
# we can store summary data at the document level or in processing_errors
pass
def _generate_document_id(self, file_path: Path) -> str:
"""Generate unique document ID."""
content = f"{file_path.name}_{datetime.now().isoformat()}"
return hashlib.md5(content.encode()).hexdigest()
def _detect_mime_type(self, file_path: Path) -> str:
"""Detect MIME type of file."""
try:
import magic
return magic.from_file(str(file_path), mime=True)
except:
# Fallback to extension-based detection
ext = file_path.suffix.lower()
mime_map = {
'.pdf': 'application/pdf',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg'
}
return mime_map.get(ext, 'application/octet-stream')
def _count_elements(self, pages: List[Page]) -> int:
"""Count total elements across all pages."""
return sum(len(page.elements) for page in pages)
def _extract_from_direct_results(
self,
pages_data: List[Dict[str, Any]]
) -> List[Page]:
"""Extract pages from direct PP-StructureV3 results."""
pages = []
for page_idx, page_data in enumerate(pages_data):
elements = []
# Process each element in the page
if 'elements' in page_data:
for elem_data in page_data['elements']:
element = self._convert_pp3_element(elem_data, page_idx)
if element:
elements.append(element)
# Create page
page = Page(
page_number=page_idx + 1,
dimensions=Dimensions(
width=page_data.get('width', 0),
height=page_data.get('height', 0)
),
elements=elements,
metadata={'reading_order': self._calculate_reading_order(elements)}
)
pages.append(page)
return pages