""" OCR to UnifiedDocument Converter Converts PP-StructureV3 OCR results to UnifiedDocument format, preserving all structure information and metadata. """ import logging from pathlib import Path from typing import Dict, List, Optional, Any, Union from datetime import datetime import hashlib from app.models.unified_document import ( UnifiedDocument, DocumentElement, Page, DocumentMetadata, BoundingBox, StyleInfo, TableData, ElementType, ProcessingTrack, TableCell, Dimensions ) logger = logging.getLogger(__name__) class OCRToUnifiedConverter: """ Converter for transforming PP-StructureV3 OCR results to UnifiedDocument format. This converter handles: - PP-StructureV3 parsing_res_list results - Markdown fallback results - Multi-page document assembly - Metadata preservation - Structure relationship mapping """ def __init__(self): """Initialize the converter.""" self.element_counter = 0 def convert( self, ocr_results: Dict[str, Any], file_path: Path, processing_time: float, lang: str = 'ch' ) -> UnifiedDocument: """ Convert OCR results to UnifiedDocument. Args: ocr_results: Raw OCR results from PP-StructureV3 file_path: Original file path processing_time: Time taken for OCR processing lang: Language used for OCR Returns: UnifiedDocument with all extracted information """ try: # Create document metadata metadata = self._create_metadata(file_path, processing_time, lang) # Extract pages from OCR results pages = self._extract_pages(ocr_results) # Create document ID document_id = self._generate_document_id(file_path) # Create UnifiedDocument unified_doc = UnifiedDocument( document_id=document_id, metadata=metadata, pages=pages, processing_errors=ocr_results.get('errors', []) ) # Post-process to establish relationships self._establish_relationships(unified_doc) logger.info(f"Successfully converted OCR results to UnifiedDocument: " f"{len(pages)} pages, {self._count_elements(pages)} elements") return unified_doc except Exception as e: logger.error(f"Error converting OCR results: {e}") import traceback traceback.print_exc() # Return minimal document with error return UnifiedDocument( document_id=self._generate_document_id(file_path), metadata=self._create_metadata(file_path, processing_time, lang), pages=[], processing_errors=[{ 'error': str(e), 'type': 'conversion_error', 'timestamp': datetime.now().isoformat() }] ) def _create_metadata( self, file_path: Path, processing_time: float, lang: str ) -> DocumentMetadata: """Create document metadata.""" return DocumentMetadata( filename=file_path.name, file_type=file_path.suffix, file_size=file_path.stat().st_size if file_path.exists() else 0, created_at=datetime.now(), processing_track=ProcessingTrack.OCR, processing_time=processing_time, language=lang ) def _extract_pages(self, ocr_results: Dict[str, Any]) -> List[Page]: """ Extract pages from OCR results. Handles both enhanced PP-StructureV3 results (with parsing_res_list) and traditional markdown results. """ pages = [] # Check if we have enhanced results from PPStructureEnhanced if 'enhanced_results' in ocr_results: pages = self._extract_from_enhanced_results(ocr_results['enhanced_results']) # Check for traditional layout_data structure elif 'layout_data' in ocr_results: pages = self._extract_from_layout_data(ocr_results['layout_data']) # Check for direct PP-StructureV3 results elif 'pages' in ocr_results: pages = self._extract_from_direct_results(ocr_results['pages']) else: logger.warning("No recognized OCR result structure found") return pages def _extract_from_enhanced_results( self, enhanced_results: List[Dict[str, Any]] ) -> List[Page]: """Extract pages from enhanced PP-StructureV3 results.""" pages = [] for page_idx, page_result in enumerate(enhanced_results): elements = [] # Process elements from parsing_res_list if 'elements' in page_result: for elem_data in page_result['elements']: element = self._convert_pp3_element(elem_data, page_idx) if element: elements.append(element) # Create page page = Page( page_number=page_idx + 1, dimensions=Dimensions( width=page_result.get('width', 0), height=page_result.get('height', 0) ), elements=elements, metadata={'reading_order': page_result.get('reading_order', [])} ) pages.append(page) logger.debug(f"Extracted page {page_idx + 1} with {len(elements)} elements") return pages def _extract_from_layout_data( self, layout_data: Dict[str, Any] ) -> List[Page]: """Extract pages from traditional layout_data structure.""" pages = [] # Get page dimensions (assuming uniform for all pages) page_width = layout_data.get('page_width', 0) page_height = layout_data.get('page_height', 0) # Group elements by page elements_by_page = {} # Process text regions for text_region in layout_data.get('text_regions', []): page_num = text_region.get('page', 1) if page_num not in elements_by_page: elements_by_page[page_num] = [] element = self._convert_text_region(text_region) if element: elements_by_page[page_num].append(element) # Process images for img_meta in layout_data.get('images_metadata', []): page_num = img_meta.get('page', 1) if page_num not in elements_by_page: elements_by_page[page_num] = [] element = self._convert_image_metadata(img_meta) if element: elements_by_page[page_num].append(element) # Process tables for table_data in layout_data.get('tables', []): page_num = table_data.get('page', 1) if page_num not in elements_by_page: elements_by_page[page_num] = [] element = self._convert_table_data(table_data) if element: elements_by_page[page_num].append(element) # Create pages max_page = max(elements_by_page.keys()) if elements_by_page else 0 for page_num in range(1, max_page + 1): elements = elements_by_page.get(page_num, []) # Determine reading order based on position reading_order = self._calculate_reading_order(elements) page = Page( page_number=page_num, dimensions=Dimensions( width=page_width, height=page_height ), elements=elements, metadata={'reading_order': reading_order} ) pages.append(page) return pages def _convert_pp3_element( self, elem_data: Dict[str, Any], page_idx: int ) -> Optional[DocumentElement]: """Convert PP-StructureV3 element to DocumentElement.""" try: # Extract bbox bbox_data = elem_data.get('bbox', [0, 0, 0, 0]) bbox = BoundingBox( x0=float(bbox_data[0]), y0=float(bbox_data[1]), x1=float(bbox_data[2]), y1=float(bbox_data[3]) ) # Get element type element_type = elem_data.get('type', ElementType.TEXT) if isinstance(element_type, str): # Convert string to ElementType if needed element_type = ElementType[element_type] if element_type in ElementType.__members__ else ElementType.TEXT # Prepare content based on element type if element_type == ElementType.TABLE: # For tables, use TableData as content table_data = self._extract_table_data(elem_data) content = table_data if table_data else elem_data.get('content', '') elif element_type in [ElementType.IMAGE, ElementType.FIGURE]: # For images, use metadata dict as content content = { 'path': elem_data.get('img_path', ''), 'width': elem_data.get('width', 0), 'height': elem_data.get('height', 0), 'format': elem_data.get('format', 'unknown') } else: content = elem_data.get('content', '') # Create element element = DocumentElement( element_id=elem_data.get('element_id', f"elem_{self.element_counter}"), type=element_type, content=content, bbox=bbox, confidence=elem_data.get('confidence', 1.0), metadata=elem_data.get('metadata', {}) ) # Add style info if available if 'style' in elem_data: element.style = self._extract_style_info(elem_data['style']) self.element_counter += 1 return element except Exception as e: logger.warning(f"Failed to convert PP3 element: {e}") return None def _convert_text_region( self, text_region: Dict[str, Any] ) -> Optional[DocumentElement]: """Convert text region to DocumentElement.""" try: # Extract bbox (handle both formats: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] or [x0, y0, x1, y1]) bbox_data = text_region.get('bbox', [0, 0, 0, 0]) if isinstance(bbox_data, list) and len(bbox_data) == 4: if isinstance(bbox_data[0], list): # 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] x0 = float(bbox_data[0][0]) y0 = float(bbox_data[0][1]) x1 = float(bbox_data[2][0]) y1 = float(bbox_data[2][1]) else: # Simple format: [x0, y0, x1, y1] x0 = float(bbox_data[0]) y0 = float(bbox_data[1]) x1 = float(bbox_data[2]) y1 = float(bbox_data[3]) else: x0 = y0 = x1 = y1 = 0 bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1) element = DocumentElement( element_id=f"text_{self.element_counter}", type=ElementType.TEXT, content=text_region.get('text', ''), bbox=bbox, confidence=text_region.get('confidence', 1.0), metadata={'page': text_region.get('page', 1)} ) self.element_counter += 1 return element except Exception as e: logger.warning(f"Failed to convert text region: {e}") return None def _convert_image_metadata( self, img_meta: Dict[str, Any] ) -> Optional[DocumentElement]: """Convert image metadata to DocumentElement.""" try: # Extract bbox (handle both formats) bbox_data = img_meta.get('bbox', [0, 0, 0, 0]) if isinstance(bbox_data, list) and len(bbox_data) == 4: if isinstance(bbox_data[0], list): # 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] x0 = float(bbox_data[0][0]) y0 = float(bbox_data[0][1]) x1 = float(bbox_data[2][0]) y1 = float(bbox_data[2][1]) else: # Simple format: [x0, y0, x1, y1] x0 = float(bbox_data[0]) y0 = float(bbox_data[1]) x1 = float(bbox_data[2]) y1 = float(bbox_data[3]) else: x0 = y0 = x1 = y1 = 0 bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1) # Create image content dict image_content = { 'path': img_meta.get('path', ''), 'width': img_meta.get('width', 0), 'height': img_meta.get('height', 0), 'format': img_meta.get('format', 'unknown') } element = DocumentElement( element_id=f"img_{self.element_counter}", type=ElementType.IMAGE, content=image_content, bbox=bbox, metadata={'page': img_meta.get('page', 1)} ) self.element_counter += 1 return element except Exception as e: logger.warning(f"Failed to convert image metadata: {e}") return None def _convert_table_data( self, table_dict: Dict[str, Any] ) -> Optional[DocumentElement]: """Convert table data to DocumentElement.""" try: # Extract bbox bbox_data = table_dict.get('bbox', [0, 0, 0, 0]) bbox = BoundingBox( x0=float(bbox_data[0]), y0=float(bbox_data[1]), x1=float(bbox_data[2]), y1=float(bbox_data[3]) ) # Create table data # Note: TableData uses 'cols' not 'columns', and doesn't have 'html' field # HTML content is stored in metadata instead raw_cells = table_dict.get('cells', []) table_cells = [] # Convert raw cells to TableCell objects if needed for cell_data in raw_cells: if isinstance(cell_data, dict): from app.models.unified_document import TableCell table_cells.append(TableCell( row=cell_data.get('row', 0), col=cell_data.get('col', 0), row_span=cell_data.get('row_span', 1), col_span=cell_data.get('col_span', 1), content=cell_data.get('content', '') )) table_data = TableData( rows=table_dict.get('rows', 0), cols=table_dict.get('columns', table_dict.get('cols', 0)), cells=table_cells, caption=table_dict.get('caption') ) element = DocumentElement( element_id=f"table_{self.element_counter}", type=ElementType.TABLE, content=table_data, # Use TableData object as content bbox=bbox, metadata={'page': table_dict.get('page', 1), 'extracted_text': table_dict.get('extracted_text', '')} ) self.element_counter += 1 return element except Exception as e: logger.warning(f"Failed to convert table data: {e}") return None def _extract_table_data(self, elem_data: Dict) -> Optional[TableData]: """Extract table data from element.""" try: html = elem_data.get('html', '') extracted_text = elem_data.get('extracted_text', '') # Try to parse HTML to get rows and columns rows = 0 cols = 0 cells = [] if html: # Simple HTML parsing (could be enhanced with BeautifulSoup) rows = html.count('