""" Unified Document Model for Dual-track Processing This module defines the common data structure used by both OCR and direct extraction tracks to ensure consistent output format regardless of processing method. """ from dataclasses import dataclass, field from typing import List, Dict, Optional, Union, Literal, Any from datetime import datetime from enum import Enum class ElementType(str, Enum): """Document element types supporting all 23 PP-StructureV3 types plus custom ones""" # Text elements TEXT = "text" TITLE = "title" HEADER = "header" FOOTER = "footer" REFERENCE = "reference" EQUATION = "equation" FOOTNOTE = "footnote" CAPTION = "caption" # List elements LIST = "list" LIST_ITEM = "list_item" # Table elements TABLE = "table" TABLE_CELL = "table_cell" TABLE_CAPTION = "table_caption" # Visual elements IMAGE = "image" FIGURE = "figure" CHART = "chart" DIAGRAM = "diagram" # Structural elements SECTION = "section" PARAGRAPH = "paragraph" PAGE_NUMBER = "page_number" WATERMARK = "watermark" HEADER_GROUP = "header_group" BODY = "body" # Special elements CODE = "code" FORMULA = "formula" SIGNATURE = "signature" STAMP = "stamp" LOGO = "logo" BARCODE = "barcode" QR_CODE = "qr_code" class ProcessingTrack(str, Enum): """Processing track used for the document""" OCR = "ocr" # PaddleOCR PP-StructureV3 track DIRECT = "direct" # PyMuPDF direct extraction track HYBRID = "hybrid" # Mixed processing (future) @dataclass class BoundingBox: """Bounding box coordinates for document elements""" x0: float # Left coordinate y0: float # Top coordinate x1: float # Right coordinate y1: float # Bottom coordinate @property def width(self) -> float: return self.x1 - self.x0 @property def height(self) -> float: return self.y1 - self.y0 @property def center_x(self) -> float: return (self.x0 + self.x1) / 2 @property def center_y(self) -> float: return (self.y0 + self.y1) / 2 def to_dict(self) -> Dict[str, float]: return { "x0": self.x0, "y0": self.y0, "x1": self.x1, "y1": self.y1, "width": self.width, "height": self.height } def overlaps(self, other: 'BoundingBox', tolerance: float = 0) -> bool: """Check if this bbox overlaps with another""" return not ( self.x1 + tolerance < other.x0 or self.x0 - tolerance > other.x1 or self.y1 + tolerance < other.y0 or self.y0 - tolerance > other.y1 ) def contains(self, other: 'BoundingBox', tolerance: float = 0) -> bool: """Check if this bbox contains another""" return ( self.x0 - tolerance <= other.x0 and self.y0 - tolerance <= other.y0 and self.x1 + tolerance >= other.x1 and self.y1 + tolerance >= other.y1 ) @dataclass class StyleInfo: """Style information for text elements""" font_name: Optional[str] = None font_size: Optional[float] = None font_weight: Optional[str] = None # normal, bold font_style: Optional[str] = None # normal, italic text_color: Optional[int] = None # RGB as integer bg_color: Optional[int] = None # Background color alignment: Optional[str] = None # left, center, right, justify @property def is_bold(self) -> bool: return self.font_weight == "bold" @property def is_italic(self) -> bool: return self.font_style == "italic" def get_rgb_color(self) -> Optional[tuple]: """Convert integer color to RGB tuple""" if self.text_color is None: return None r = (self.text_color >> 16) & 0xFF g = (self.text_color >> 8) & 0xFF b = self.text_color & 0xFF return (r, g, b) def to_dict(self) -> Dict[str, Any]: result = {} if self.font_name: result["font_name"] = self.font_name if self.font_size: result["font_size"] = self.font_size if self.font_weight: result["font_weight"] = self.font_weight if self.font_style: result["font_style"] = self.font_style if self.text_color is not None: result["text_color"] = self.text_color result["text_color_rgb"] = self.get_rgb_color() if self.bg_color is not None: result["bg_color"] = self.bg_color if self.alignment: result["alignment"] = self.alignment return result @dataclass class TableCell: """Table cell information""" row: int col: int row_span: int = 1 col_span: int = 1 content: str = "" bbox: Optional[BoundingBox] = None style: Optional[StyleInfo] = None def to_dict(self) -> Dict[str, Any]: return { "row": self.row, "col": self.col, "row_span": self.row_span, "col_span": self.col_span, "content": self.content, "bbox": self.bbox.to_dict() if self.bbox else None, "style": self.style.to_dict() if self.style else None } @dataclass class TableData: """Structured table data""" rows: int cols: int cells: List[TableCell] = field(default_factory=list) headers: Optional[List[str]] = None caption: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { "rows": self.rows, "cols": self.cols, "cells": [cell.to_dict() for cell in self.cells], "headers": self.headers, "caption": self.caption } def to_html(self) -> str: """Convert table to HTML representation""" html = [""] if self.caption: html.append(f"") # Group cells by row and column for quick lookup cell_map = {} for cell in self.cells: cell_map[(cell.row, cell.col)] = cell # Track which cells are covered by row/col spans covered = set() for cell in self.cells: if cell.row_span > 1 or cell.col_span > 1: for r in range(cell.row, cell.row + cell.row_span): for c in range(cell.col, cell.col + cell.col_span): if (r, c) != (cell.row, cell.col): covered.add((r, c)) # Generate HTML with proper column filling for row_idx in range(self.rows): html.append("") for col_idx in range(self.cols): # Skip cells covered by row/col spans if (row_idx, col_idx) in covered: continue cell = cell_map.get((row_idx, col_idx)) tag = "th" if row_idx == 0 and self.headers else "td" if cell: span_attrs = [] if cell.row_span > 1: span_attrs.append(f'rowspan="{cell.row_span}"') if cell.col_span > 1: span_attrs.append(f'colspan="{cell.col_span}"') span_str = " ".join(span_attrs) content = cell.content if cell.content else "" html.append(f'<{tag} {span_str}>{content}') else: # Fill in empty cell for missing positions html.append(f'<{tag}>') html.append("") html.append("
{self.caption}
") return "\n".join(html) @dataclass class DocumentElement: """Individual document element (text, image, table, etc.)""" element_id: str type: ElementType content: Union[str, TableData, bytes, Dict[str, Any]] bbox: BoundingBox confidence: Optional[float] = None # OCR confidence (0-1) style: Optional[StyleInfo] = None metadata: Dict[str, Any] = field(default_factory=dict) children: List['DocumentElement'] = field(default_factory=list) @property def is_text(self) -> bool: return self.type in [ ElementType.TEXT, ElementType.TITLE, ElementType.HEADER, ElementType.FOOTER, ElementType.CAPTION, ElementType.PARAGRAPH ] @property def is_visual(self) -> bool: return self.type in [ ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART, ElementType.DIAGRAM, ElementType.LOGO ] @property def is_table(self) -> bool: return self.type in [ElementType.TABLE, ElementType.TABLE_CELL] def get_text(self) -> str: """Extract text content from element""" if isinstance(self.content, str): return self.content elif isinstance(self.content, TableData): # Extract text from table cells texts = [] for cell in self.content.cells: if cell.content: texts.append(cell.content) return " ".join(texts) elif isinstance(self.content, dict) and "text" in self.content: return self.content["text"] return "" def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" result = { "element_id": self.element_id, "type": self.type.value, "bbox": self.bbox.to_dict(), } # Handle different content types if isinstance(self.content, str): result["content"] = self.content elif isinstance(self.content, TableData): result["content"] = self.content.to_dict() result["content_type"] = "table" elif isinstance(self.content, bytes): result["content_type"] = "binary" result["content_length"] = len(self.content) elif isinstance(self.content, dict): result["content"] = self.content if self.confidence is not None: result["confidence"] = self.confidence if self.style: result["style"] = self.style.to_dict() if self.metadata: result["metadata"] = self.metadata if self.children: result["children"] = [child.to_dict() for child in self.children] return result @dataclass class Dimensions: """Page or image dimensions""" width: float height: float dpi: Optional[int] = None def to_dict(self) -> Dict[str, Any]: result = {"width": self.width, "height": self.height} if self.dpi: result["dpi"] = self.dpi return result @dataclass class Page: """Single page in a document""" page_number: int # 1-based page number elements: List[DocumentElement] dimensions: Dimensions metadata: Dict[str, Any] = field(default_factory=dict) def get_reading_order(self) -> List[DocumentElement]: """Get elements in reading order (top to bottom, left to right)""" return sorted( self.elements, key=lambda e: (e.bbox.y0, e.bbox.x0) ) def get_elements_by_type(self, element_type: ElementType) -> List[DocumentElement]: """Get all elements of a specific type""" return [e for e in self.elements if e.type == element_type] def get_text_elements(self) -> List[DocumentElement]: """Get all text-containing elements""" return [e for e in self.elements if e.is_text] def get_tables(self) -> List[DocumentElement]: """Get all table elements""" return [e for e in self.elements if e.type == ElementType.TABLE] def get_images(self) -> List[DocumentElement]: """Get all image elements""" return [e for e in self.elements if e.is_visual] def extract_text(self, separator: str = "\n") -> str: """Extract all text from the page in reading order""" texts = [] for element in self.get_reading_order(): text = element.get_text() if text: texts.append(text) return separator.join(texts) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" return { "page_number": self.page_number, "elements": [e.to_dict() for e in self.elements], "dimensions": self.dimensions.to_dict(), "metadata": self.metadata, "statistics": { "total_elements": len(self.elements), "text_elements": len(self.get_text_elements()), "tables": len(self.get_tables()), "images": len(self.get_images()) } } @dataclass class DocumentMetadata: """Document-level metadata""" filename: str file_type: str file_size: int created_at: datetime processing_track: ProcessingTrack processing_time: float # seconds language: Optional[str] = None title: Optional[str] = None author: Optional[str] = None subject: Optional[str] = None keywords: Optional[List[str]] = None producer: Optional[str] = None creator: Optional[str] = None creation_date: Optional[datetime] = None modification_date: Optional[datetime] = None original_filename: Optional[str] = None # Original filename before conversion (e.g., Office → PDF) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" result = { "filename": self.filename, "file_type": self.file_type, "file_size": self.file_size, "created_at": self.created_at.isoformat(), "processing_track": self.processing_track.value, "processing_time": self.processing_time, } # Add optional fields if present optional_fields = [ "language", "title", "author", "subject", "keywords", "producer", "creator", "original_filename" ] for field in optional_fields: value = getattr(self, field) if value is not None: result[field] = value if self.creation_date: result["creation_date"] = self.creation_date.isoformat() if self.modification_date: result["modification_date"] = self.modification_date.isoformat() return result @dataclass class UnifiedDocument: """ Unified document representation for both OCR and direct extraction tracks. This is the primary output format that ensures consistency across different processing methods and enables seamless downstream processing. """ document_id: str metadata: DocumentMetadata pages: List[Page] processing_errors: List[Dict[str, Any]] = field(default_factory=list) @property def page_count(self) -> int: return len(self.pages) @property def total_elements(self) -> int: return sum(len(page.elements) for page in self.pages) def get_page(self, page_number: int) -> Optional[Page]: """Get page by number (1-based)""" for page in self.pages: if page.page_number == page_number: return page return None def extract_all_text(self, page_separator: str = "\n\n") -> str: """Extract all text from the document""" texts = [] for page in self.pages: page_text = page.extract_text() if page_text: texts.append(page_text) return page_separator.join(texts) def get_all_tables(self) -> List[DocumentElement]: """Get all tables from all pages""" tables = [] for page in self.pages: tables.extend(page.get_tables()) return tables def get_all_images(self) -> List[DocumentElement]: """Get all images from all pages""" images = [] for page in self.pages: images.extend(page.get_images()) return images def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" return { "document_id": self.document_id, "metadata": self.metadata.to_dict(), "pages": [page.to_dict() for page in self.pages], "statistics": { "page_count": self.page_count, "total_elements": self.total_elements, "total_tables": len(self.get_all_tables()), "total_images": len(self.get_all_images()), }, "processing_errors": self.processing_errors } def to_legacy_format(self) -> Dict[str, Any]: """ Convert to legacy format for backward compatibility. This ensures existing API clients continue to work while we transition to the new unified format. """ # Extract text regions in legacy format text_regions = [] layout_data = [] images_metadata = [] for page in self.pages: page_num = page.page_number for element in page.elements: if element.is_text: # Legacy text region format text_regions.append({ "page": page_num, "text": element.get_text(), "confidence": element.confidence or 1.0, "bbox": { "x_min": element.bbox.x0, "y_min": element.bbox.y0, "x_max": element.bbox.x1, "y_max": element.bbox.y1 } }) # Legacy layout data layout_item = { "element_id": element.element_id, "type": element.type.value, "page": page_num - 1, # Legacy uses 0-based "bbox": [element.bbox.x0, element.bbox.y0, element.bbox.x1, element.bbox.y1] } if element.is_table and isinstance(element.content, TableData): layout_item["content"] = element.content.to_html() elif element.is_text: layout_item["content"] = element.get_text() layout_data.append(layout_item) # Legacy image metadata if element.is_visual: images_metadata.append({ "element_id": element.element_id, "type": "image", "page": page_num - 1, # Legacy uses 0-based "bbox": [element.bbox.x0, element.bbox.y0, element.bbox.x1, element.bbox.y1] }) # Calculate average confidence confidences = [r["confidence"] for r in text_regions if r.get("confidence")] avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 return { "status": "success", "filename": self.metadata.filename, "text_regions": text_regions, "total_text_regions": len(text_regions), "average_confidence": avg_confidence, "processing_time": self.metadata.processing_time, "language": self.metadata.language or "ch", "layout_data": { "elements": layout_data, "total_elements": len(layout_data) }, "images_metadata": images_metadata, "ocr_dimensions": { "width": self.pages[0].dimensions.width if self.pages else 0, "height": self.pages[0].dimensions.height if self.pages else 0 }, # New fields that won't break existing clients "_unified_format": True, "_processing_track": self.metadata.processing_track.value } class UnifiedDocumentConverter: """Converter utilities for UnifiedDocument format""" @staticmethod def from_ocr_result(ocr_result: Dict[str, Any], document_id: str, metadata: DocumentMetadata) -> UnifiedDocument: """ Convert PaddleOCR result to UnifiedDocument format. This handles the conversion from PP-StructureV3 output to our unified format. """ pages = [] # Handle single page or multi-page results if "pages" in ocr_result: page_results = ocr_result["pages"] else: page_results = [ocr_result] for page_idx, page_data in enumerate(page_results): page_num = page_idx + 1 elements = [] # Convert text regions for idx, text_region in enumerate(page_data.get("text_regions", [])): bbox_data = text_region.get("bbox", {}) element = DocumentElement( element_id=f"text_{page_num}_{idx}", type=ElementType.TEXT, content=text_region.get("text", ""), bbox=BoundingBox( x0=bbox_data.get("x_min", 0), y0=bbox_data.get("y_min", 0), x1=bbox_data.get("x_max", 0), y1=bbox_data.get("y_max", 0) ), confidence=text_region.get("confidence") ) elements.append(element) # Convert layout elements if available if "layout_data" in page_data and page_data["layout_data"]: for layout_elem in page_data["layout_data"].get("elements", []): # Map layout type to ElementType layout_type = layout_elem.get("type", "text") element_type = ElementType.TEXT # Default if "table" in layout_type.lower(): element_type = ElementType.TABLE elif "image" in layout_type.lower() or "figure" in layout_type.lower(): element_type = ElementType.IMAGE elif "title" in layout_type.lower(): element_type = ElementType.TITLE elif "list" in layout_type.lower(): element_type = ElementType.LIST # Create element bbox_list = layout_elem.get("bbox", [0, 0, 0, 0]) element = DocumentElement( element_id=layout_elem.get("element_id", f"layout_{page_num}_{len(elements)}"), type=element_type, content=layout_elem.get("content", ""), bbox=BoundingBox( x0=bbox_list[0] if len(bbox_list) > 0 else 0, y0=bbox_list[1] if len(bbox_list) > 1 else 0, x1=bbox_list[2] if len(bbox_list) > 2 else 0, y1=bbox_list[3] if len(bbox_list) > 3 else 0 ) ) elements.append(element) # Get page dimensions ocr_dims = page_data.get("ocr_dimensions", {}) dimensions = Dimensions( width=ocr_dims.get("width", 0), height=ocr_dims.get("height", 0) ) pages.append(Page( page_number=page_num, elements=elements, dimensions=dimensions )) return UnifiedDocument( document_id=document_id, metadata=metadata, pages=pages ) @staticmethod def from_direct_extraction(extraction_result: Dict[str, Any], document_id: str, metadata: DocumentMetadata) -> UnifiedDocument: """ Convert PyMuPDF extraction result to UnifiedDocument format. This will be implemented when we create the DirectExtractionEngine. """ # TODO: Implement when DirectExtractionEngine is created pages = [] return UnifiedDocument( document_id=document_id, metadata=metadata, pages=pages )