GPU Optimization (Section 3.1): - Add comprehensive memory management for RTX 4060 8GB - Enable all recognition features (chart, formula, table, seal, text) - Implement model cache with auto-unload for idle models - Add memory monitoring and warning system Bug Fix (Section 3.3): - Fix TableData field inconsistency: 'columns' -> 'cols' - Remove invalid 'html' and 'extracted_text' parameters - Add proper TableCell conversion in _convert_table_data Documentation: - Add Future Improvements section for batch processing enhancement 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
688 lines
24 KiB
Python
688 lines
24 KiB
Python
"""
|
|
OCR to UnifiedDocument Converter
|
|
|
|
Converts PP-StructureV3 OCR results to UnifiedDocument format, preserving
|
|
all structure information and metadata.
|
|
"""
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Union
|
|
from datetime import datetime
|
|
import hashlib
|
|
|
|
from app.models.unified_document import (
|
|
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
|
|
BoundingBox, StyleInfo, TableData, ElementType,
|
|
ProcessingTrack, TableCell, Dimensions
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OCRToUnifiedConverter:
|
|
"""
|
|
Converter for transforming PP-StructureV3 OCR results to UnifiedDocument format.
|
|
|
|
This converter handles:
|
|
- PP-StructureV3 parsing_res_list results
|
|
- Markdown fallback results
|
|
- Multi-page document assembly
|
|
- Metadata preservation
|
|
- Structure relationship mapping
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the converter."""
|
|
self.element_counter = 0
|
|
|
|
def convert(
|
|
self,
|
|
ocr_results: Dict[str, Any],
|
|
file_path: Path,
|
|
processing_time: float,
|
|
lang: str = 'ch'
|
|
) -> UnifiedDocument:
|
|
"""
|
|
Convert OCR results to UnifiedDocument.
|
|
|
|
Args:
|
|
ocr_results: Raw OCR results from PP-StructureV3
|
|
file_path: Original file path
|
|
processing_time: Time taken for OCR processing
|
|
lang: Language used for OCR
|
|
|
|
Returns:
|
|
UnifiedDocument with all extracted information
|
|
"""
|
|
try:
|
|
# Create document metadata
|
|
metadata = self._create_metadata(file_path, processing_time, lang)
|
|
|
|
# Extract pages from OCR results
|
|
pages = self._extract_pages(ocr_results)
|
|
|
|
# Create document ID
|
|
document_id = self._generate_document_id(file_path)
|
|
|
|
# Create UnifiedDocument
|
|
unified_doc = UnifiedDocument(
|
|
document_id=document_id,
|
|
metadata=metadata,
|
|
pages=pages,
|
|
processing_errors=ocr_results.get('errors', [])
|
|
)
|
|
|
|
# Post-process to establish relationships
|
|
self._establish_relationships(unified_doc)
|
|
|
|
logger.info(f"Successfully converted OCR results to UnifiedDocument: "
|
|
f"{len(pages)} pages, {self._count_elements(pages)} elements")
|
|
|
|
return unified_doc
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error converting OCR results: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Return minimal document with error
|
|
return UnifiedDocument(
|
|
document_id=self._generate_document_id(file_path),
|
|
metadata=self._create_metadata(file_path, processing_time, lang),
|
|
pages=[],
|
|
processing_errors=[{
|
|
'error': str(e),
|
|
'type': 'conversion_error',
|
|
'timestamp': datetime.now().isoformat()
|
|
}]
|
|
)
|
|
|
|
def _create_metadata(
|
|
self,
|
|
file_path: Path,
|
|
processing_time: float,
|
|
lang: str
|
|
) -> DocumentMetadata:
|
|
"""Create document metadata."""
|
|
return DocumentMetadata(
|
|
filename=file_path.name,
|
|
file_type=file_path.suffix,
|
|
file_size=file_path.stat().st_size if file_path.exists() else 0,
|
|
created_at=datetime.now(),
|
|
processing_track=ProcessingTrack.OCR,
|
|
processing_time=processing_time,
|
|
language=lang
|
|
)
|
|
|
|
def _extract_pages(self, ocr_results: Dict[str, Any]) -> List[Page]:
|
|
"""
|
|
Extract pages from OCR results.
|
|
|
|
Handles both enhanced PP-StructureV3 results (with parsing_res_list)
|
|
and traditional markdown results.
|
|
"""
|
|
pages = []
|
|
|
|
# Check if we have enhanced results from PPStructureEnhanced
|
|
if 'enhanced_results' in ocr_results:
|
|
pages = self._extract_from_enhanced_results(ocr_results['enhanced_results'])
|
|
# Check for traditional layout_data structure
|
|
elif 'layout_data' in ocr_results:
|
|
pages = self._extract_from_layout_data(ocr_results['layout_data'])
|
|
# Check for direct PP-StructureV3 results
|
|
elif 'pages' in ocr_results:
|
|
pages = self._extract_from_direct_results(ocr_results['pages'])
|
|
else:
|
|
logger.warning("No recognized OCR result structure found")
|
|
|
|
return pages
|
|
|
|
def _extract_from_enhanced_results(
|
|
self,
|
|
enhanced_results: List[Dict[str, Any]]
|
|
) -> List[Page]:
|
|
"""Extract pages from enhanced PP-StructureV3 results."""
|
|
pages = []
|
|
|
|
for page_idx, page_result in enumerate(enhanced_results):
|
|
elements = []
|
|
|
|
# Process elements from parsing_res_list
|
|
if 'elements' in page_result:
|
|
for elem_data in page_result['elements']:
|
|
element = self._convert_pp3_element(elem_data, page_idx)
|
|
if element:
|
|
elements.append(element)
|
|
|
|
# Create page
|
|
page = Page(
|
|
page_number=page_idx + 1,
|
|
dimensions=Dimensions(
|
|
width=page_result.get('width', 0),
|
|
height=page_result.get('height', 0)
|
|
),
|
|
elements=elements,
|
|
metadata={'reading_order': page_result.get('reading_order', [])}
|
|
)
|
|
|
|
pages.append(page)
|
|
logger.debug(f"Extracted page {page_idx + 1} with {len(elements)} elements")
|
|
|
|
return pages
|
|
|
|
def _extract_from_layout_data(
|
|
self,
|
|
layout_data: Dict[str, Any]
|
|
) -> List[Page]:
|
|
"""Extract pages from traditional layout_data structure."""
|
|
pages = []
|
|
|
|
# Get page dimensions (assuming uniform for all pages)
|
|
page_width = layout_data.get('page_width', 0)
|
|
page_height = layout_data.get('page_height', 0)
|
|
|
|
# Group elements by page
|
|
elements_by_page = {}
|
|
|
|
# Process text regions
|
|
for text_region in layout_data.get('text_regions', []):
|
|
page_num = text_region.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_text_region(text_region)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Process images
|
|
for img_meta in layout_data.get('images_metadata', []):
|
|
page_num = img_meta.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_image_metadata(img_meta)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Process tables
|
|
for table_data in layout_data.get('tables', []):
|
|
page_num = table_data.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_table_data(table_data)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Create pages
|
|
max_page = max(elements_by_page.keys()) if elements_by_page else 0
|
|
for page_num in range(1, max_page + 1):
|
|
elements = elements_by_page.get(page_num, [])
|
|
|
|
# Determine reading order based on position
|
|
reading_order = self._calculate_reading_order(elements)
|
|
|
|
page = Page(
|
|
page_number=page_num,
|
|
dimensions=Dimensions(
|
|
width=page_width,
|
|
height=page_height
|
|
),
|
|
elements=elements,
|
|
metadata={'reading_order': reading_order}
|
|
)
|
|
|
|
pages.append(page)
|
|
|
|
return pages
|
|
|
|
def _convert_pp3_element(
|
|
self,
|
|
elem_data: Dict[str, Any],
|
|
page_idx: int
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert PP-StructureV3 element to DocumentElement."""
|
|
try:
|
|
# Extract bbox
|
|
bbox_data = elem_data.get('bbox', [0, 0, 0, 0])
|
|
bbox = BoundingBox(
|
|
x0=float(bbox_data[0]),
|
|
y0=float(bbox_data[1]),
|
|
x1=float(bbox_data[2]),
|
|
y1=float(bbox_data[3])
|
|
)
|
|
|
|
# Get element type
|
|
element_type = elem_data.get('type', ElementType.TEXT)
|
|
if isinstance(element_type, str):
|
|
# Convert string to ElementType if needed
|
|
element_type = ElementType[element_type] if element_type in ElementType.__members__ else ElementType.TEXT
|
|
|
|
# Prepare content based on element type
|
|
if element_type == ElementType.TABLE:
|
|
# For tables, use TableData as content
|
|
table_data = self._extract_table_data(elem_data)
|
|
content = table_data if table_data else elem_data.get('content', '')
|
|
elif element_type in [ElementType.IMAGE, ElementType.FIGURE]:
|
|
# For images, use metadata dict as content
|
|
content = {
|
|
'path': elem_data.get('img_path', ''),
|
|
'width': elem_data.get('width', 0),
|
|
'height': elem_data.get('height', 0),
|
|
'format': elem_data.get('format', 'unknown')
|
|
}
|
|
else:
|
|
content = elem_data.get('content', '')
|
|
|
|
# Create element
|
|
element = DocumentElement(
|
|
element_id=elem_data.get('element_id', f"elem_{self.element_counter}"),
|
|
type=element_type,
|
|
content=content,
|
|
bbox=bbox,
|
|
confidence=elem_data.get('confidence', 1.0),
|
|
metadata=elem_data.get('metadata', {})
|
|
)
|
|
|
|
# Add style info if available
|
|
if 'style' in elem_data:
|
|
element.style = self._extract_style_info(elem_data['style'])
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert PP3 element: {e}")
|
|
return None
|
|
|
|
def _convert_text_region(
|
|
self,
|
|
text_region: Dict[str, Any]
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert text region to DocumentElement."""
|
|
try:
|
|
# Extract bbox (handle both formats: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] or [x0, y0, x1, y1])
|
|
bbox_data = text_region.get('bbox', [0, 0, 0, 0])
|
|
|
|
if isinstance(bbox_data, list) and len(bbox_data) == 4:
|
|
if isinstance(bbox_data[0], list):
|
|
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
|
|
x0 = float(bbox_data[0][0])
|
|
y0 = float(bbox_data[0][1])
|
|
x1 = float(bbox_data[2][0])
|
|
y1 = float(bbox_data[2][1])
|
|
else:
|
|
# Simple format: [x0, y0, x1, y1]
|
|
x0 = float(bbox_data[0])
|
|
y0 = float(bbox_data[1])
|
|
x1 = float(bbox_data[2])
|
|
y1 = float(bbox_data[3])
|
|
else:
|
|
x0 = y0 = x1 = y1 = 0
|
|
|
|
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
|
|
|
|
element = DocumentElement(
|
|
element_id=f"text_{self.element_counter}",
|
|
type=ElementType.TEXT,
|
|
content=text_region.get('text', ''),
|
|
bbox=bbox,
|
|
confidence=text_region.get('confidence', 1.0),
|
|
metadata={'page': text_region.get('page', 1)}
|
|
)
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert text region: {e}")
|
|
return None
|
|
|
|
def _convert_image_metadata(
|
|
self,
|
|
img_meta: Dict[str, Any]
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert image metadata to DocumentElement."""
|
|
try:
|
|
# Extract bbox (handle both formats)
|
|
bbox_data = img_meta.get('bbox', [0, 0, 0, 0])
|
|
|
|
if isinstance(bbox_data, list) and len(bbox_data) == 4:
|
|
if isinstance(bbox_data[0], list):
|
|
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
|
|
x0 = float(bbox_data[0][0])
|
|
y0 = float(bbox_data[0][1])
|
|
x1 = float(bbox_data[2][0])
|
|
y1 = float(bbox_data[2][1])
|
|
else:
|
|
# Simple format: [x0, y0, x1, y1]
|
|
x0 = float(bbox_data[0])
|
|
y0 = float(bbox_data[1])
|
|
x1 = float(bbox_data[2])
|
|
y1 = float(bbox_data[3])
|
|
else:
|
|
x0 = y0 = x1 = y1 = 0
|
|
|
|
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
|
|
|
|
# Create image content dict
|
|
image_content = {
|
|
'path': img_meta.get('path', ''),
|
|
'width': img_meta.get('width', 0),
|
|
'height': img_meta.get('height', 0),
|
|
'format': img_meta.get('format', 'unknown')
|
|
}
|
|
|
|
element = DocumentElement(
|
|
element_id=f"img_{self.element_counter}",
|
|
type=ElementType.IMAGE,
|
|
content=image_content,
|
|
bbox=bbox,
|
|
metadata={'page': img_meta.get('page', 1)}
|
|
)
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert image metadata: {e}")
|
|
return None
|
|
|
|
def _convert_table_data(
|
|
self,
|
|
table_dict: Dict[str, Any]
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert table data to DocumentElement."""
|
|
try:
|
|
# Extract bbox
|
|
bbox_data = table_dict.get('bbox', [0, 0, 0, 0])
|
|
bbox = BoundingBox(
|
|
x0=float(bbox_data[0]),
|
|
y0=float(bbox_data[1]),
|
|
x1=float(bbox_data[2]),
|
|
y1=float(bbox_data[3])
|
|
)
|
|
|
|
# Create table data
|
|
# Note: TableData uses 'cols' not 'columns', and doesn't have 'html' field
|
|
# HTML content is stored in metadata instead
|
|
raw_cells = table_dict.get('cells', [])
|
|
table_cells = []
|
|
|
|
# Convert raw cells to TableCell objects if needed
|
|
for cell_data in raw_cells:
|
|
if isinstance(cell_data, dict):
|
|
from app.models.unified_document import TableCell
|
|
table_cells.append(TableCell(
|
|
row=cell_data.get('row', 0),
|
|
col=cell_data.get('col', 0),
|
|
row_span=cell_data.get('row_span', 1),
|
|
col_span=cell_data.get('col_span', 1),
|
|
content=cell_data.get('content', '')
|
|
))
|
|
|
|
table_data = TableData(
|
|
rows=table_dict.get('rows', 0),
|
|
cols=table_dict.get('columns', table_dict.get('cols', 0)),
|
|
cells=table_cells,
|
|
caption=table_dict.get('caption')
|
|
)
|
|
|
|
element = DocumentElement(
|
|
element_id=f"table_{self.element_counter}",
|
|
type=ElementType.TABLE,
|
|
content=table_data, # Use TableData object as content
|
|
bbox=bbox,
|
|
metadata={'page': table_dict.get('page', 1), 'extracted_text': table_dict.get('extracted_text', '')}
|
|
)
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert table data: {e}")
|
|
return None
|
|
|
|
def _extract_table_data(self, elem_data: Dict) -> Optional[TableData]:
|
|
"""Extract table data from element."""
|
|
try:
|
|
html = elem_data.get('html', '')
|
|
extracted_text = elem_data.get('extracted_text', '')
|
|
|
|
# Try to parse HTML to get rows and columns
|
|
rows = 0
|
|
cols = 0
|
|
cells = []
|
|
|
|
if html:
|
|
# Simple HTML parsing (could be enhanced with BeautifulSoup)
|
|
rows = html.count('<tr')
|
|
if rows > 0:
|
|
# Estimate columns from first row
|
|
first_row_end = html.find('</tr>')
|
|
if first_row_end > 0:
|
|
first_row = html[:first_row_end]
|
|
cols = first_row.count('<td') + first_row.count('<th')
|
|
|
|
# Note: TableData uses 'cols' not 'columns'
|
|
# HTML content can be stored as caption or in element metadata
|
|
return TableData(
|
|
rows=rows,
|
|
cols=cols,
|
|
cells=cells,
|
|
caption=extracted_text if extracted_text else None
|
|
)
|
|
except:
|
|
return None
|
|
|
|
|
|
def _extract_style_info(self, style_data: Dict) -> Optional[StyleInfo]:
|
|
"""Extract style info from element."""
|
|
try:
|
|
return StyleInfo(
|
|
font_family=style_data.get('font_family'),
|
|
font_size=style_data.get('font_size'),
|
|
font_weight=style_data.get('font_weight'),
|
|
font_style=style_data.get('font_style'),
|
|
text_color=style_data.get('text_color'),
|
|
background_color=style_data.get('background_color'),
|
|
alignment=style_data.get('alignment')
|
|
)
|
|
except:
|
|
return None
|
|
|
|
def _calculate_reading_order(self, elements: List[DocumentElement]) -> List[int]:
|
|
"""Calculate reading order based on element positions."""
|
|
if not elements:
|
|
return []
|
|
|
|
# Create indexed elements with position
|
|
indexed_elements = []
|
|
for i, elem in enumerate(elements):
|
|
# Use top-left corner for sorting
|
|
indexed_elements.append((
|
|
i,
|
|
elem.bbox.y1, # y coordinate (top to bottom)
|
|
elem.bbox.x1 # x coordinate (left to right)
|
|
))
|
|
|
|
# Sort by y first (top to bottom), then x (left to right)
|
|
indexed_elements.sort(key=lambda x: (x[1], x[2]))
|
|
|
|
# Return the sorted indices
|
|
return [idx for idx, _, _ in indexed_elements]
|
|
|
|
def _establish_relationships(self, doc: UnifiedDocument):
|
|
"""
|
|
Establish relationships between elements.
|
|
|
|
This includes:
|
|
- Linking captions to figures/tables
|
|
- Grouping list items
|
|
- Identifying headers and their content
|
|
"""
|
|
for page in doc.pages:
|
|
# Link captions to nearest figure/table
|
|
self._link_captions(page.elements)
|
|
|
|
# Group consecutive list items
|
|
self._group_list_items(page.elements)
|
|
|
|
# Link headers to content
|
|
self._link_headers(page.elements)
|
|
|
|
# Update metadata based on content
|
|
self._update_metadata(doc)
|
|
|
|
def _link_captions(self, elements: List[DocumentElement]):
|
|
"""Link caption elements to their associated figures/tables."""
|
|
captions = [e for e in elements if e.type in [ElementType.CAPTION, ElementType.TABLE_CAPTION]]
|
|
targets = [e for e in elements if e.type in [ElementType.FIGURE, ElementType.TABLE, ElementType.IMAGE]]
|
|
|
|
for caption in captions:
|
|
if not targets:
|
|
break
|
|
|
|
# Find nearest target above the caption
|
|
best_target = None
|
|
min_distance = float('inf')
|
|
|
|
for target in targets:
|
|
# Caption should be below the target
|
|
if target.bbox.y2 <= caption.bbox.y1:
|
|
distance = caption.bbox.y1 - target.bbox.y2
|
|
if distance < min_distance:
|
|
min_distance = distance
|
|
best_target = target
|
|
|
|
if best_target and min_distance < 50: # Within 50 pixels
|
|
caption.metadata['linked_to'] = best_target.element_id
|
|
best_target.metadata['caption_id'] = caption.element_id
|
|
|
|
def _group_list_items(self, elements: List[DocumentElement]):
|
|
"""Group consecutive list items."""
|
|
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
|
|
|
|
if not list_items:
|
|
return
|
|
|
|
# Sort by position
|
|
list_items.sort(key=lambda e: (e.bbox.y1, e.bbox.x1))
|
|
|
|
# Group consecutive items
|
|
current_group = []
|
|
groups = []
|
|
|
|
for i, item in enumerate(list_items):
|
|
if i == 0:
|
|
current_group = [item]
|
|
else:
|
|
prev_item = list_items[i-1]
|
|
# Check if items are consecutive (similar x position, reasonable y gap)
|
|
x_aligned = abs(item.bbox.x1 - prev_item.bbox.x1) < 20
|
|
y_consecutive = (item.bbox.y1 - prev_item.bbox.y2) < 30
|
|
|
|
if x_aligned and y_consecutive:
|
|
current_group.append(item)
|
|
else:
|
|
if current_group:
|
|
groups.append(current_group)
|
|
current_group = [item]
|
|
|
|
if current_group:
|
|
groups.append(current_group)
|
|
|
|
# Mark groups in metadata
|
|
for group_idx, group in enumerate(groups):
|
|
group_id = f"list_group_{group_idx}"
|
|
for item_idx, item in enumerate(group):
|
|
item.metadata['list_group'] = group_id
|
|
item.metadata['list_index'] = item_idx
|
|
|
|
def _link_headers(self, elements: List[DocumentElement]):
|
|
"""Link headers to their content sections."""
|
|
headers = [e for e in elements if e.type in [ElementType.HEADER, ElementType.TITLE]]
|
|
|
|
for i, header in enumerate(headers):
|
|
# Find content between this header and the next
|
|
next_header_y = float('inf')
|
|
if i + 1 < len(headers):
|
|
next_header_y = headers[i + 1].bbox.y1
|
|
|
|
# Find all elements between headers
|
|
content_elements = [
|
|
e for e in elements
|
|
if (e.bbox.y1 > header.bbox.y2 and
|
|
e.bbox.y1 < next_header_y and
|
|
e.type not in [ElementType.HEADER, ElementType.TITLE])
|
|
]
|
|
|
|
if content_elements:
|
|
header.metadata['content_elements'] = [e.element_id for e in content_elements]
|
|
for elem in content_elements:
|
|
elem.metadata['header_id'] = header.element_id
|
|
|
|
def _update_metadata(self, doc: UnifiedDocument):
|
|
"""Update document metadata based on extracted content."""
|
|
# For now, just ensure basic metadata is present.
|
|
# Since DocumentMetadata doesn't have all these fields,
|
|
# we can store summary data at the document level or in processing_errors
|
|
pass
|
|
|
|
def _generate_document_id(self, file_path: Path) -> str:
|
|
"""Generate unique document ID."""
|
|
content = f"{file_path.name}_{datetime.now().isoformat()}"
|
|
return hashlib.md5(content.encode()).hexdigest()
|
|
|
|
def _detect_mime_type(self, file_path: Path) -> str:
|
|
"""Detect MIME type of file."""
|
|
try:
|
|
import magic
|
|
return magic.from_file(str(file_path), mime=True)
|
|
except:
|
|
# Fallback to extension-based detection
|
|
ext = file_path.suffix.lower()
|
|
mime_map = {
|
|
'.pdf': 'application/pdf',
|
|
'.png': 'image/png',
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg'
|
|
}
|
|
return mime_map.get(ext, 'application/octet-stream')
|
|
|
|
def _count_elements(self, pages: List[Page]) -> int:
|
|
"""Count total elements across all pages."""
|
|
return sum(len(page.elements) for page in pages)
|
|
|
|
def _extract_from_direct_results(
|
|
self,
|
|
pages_data: List[Dict[str, Any]]
|
|
) -> List[Page]:
|
|
"""Extract pages from direct PP-StructureV3 results."""
|
|
pages = []
|
|
|
|
for page_idx, page_data in enumerate(pages_data):
|
|
elements = []
|
|
|
|
# Process each element in the page
|
|
if 'elements' in page_data:
|
|
for elem_data in page_data['elements']:
|
|
element = self._convert_pp3_element(elem_data, page_idx)
|
|
if element:
|
|
elements.append(element)
|
|
|
|
# Create page
|
|
page = Page(
|
|
page_number=page_idx + 1,
|
|
dimensions=Dimensions(
|
|
width=page_data.get('width', 0),
|
|
height=page_data.get('height', 0)
|
|
),
|
|
elements=elements,
|
|
metadata={'reading_order': self._calculate_reading_order(elements)}
|
|
)
|
|
|
|
pages.append(page)
|
|
|
|
return pages |