diff --git a/backend/app/models/unified_document.py b/backend/app/models/unified_document.py new file mode 100644 index 0000000..529fee2 --- /dev/null +++ b/backend/app/models/unified_document.py @@ -0,0 +1,694 @@ +""" +Unified Document Model for Dual-track Processing + +This module defines the common data structure used by both OCR and direct extraction tracks +to ensure consistent output format regardless of processing method. +""" + +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Union, Literal, Any +from datetime import datetime +from enum import Enum + + +class ElementType(str, Enum): + """Document element types supporting all 23 PP-StructureV3 types plus custom ones""" + # Text elements + TEXT = "text" + TITLE = "title" + HEADER = "header" + FOOTER = "footer" + REFERENCE = "reference" + EQUATION = "equation" + FOOTNOTE = "footnote" + CAPTION = "caption" + + # List elements + LIST = "list" + LIST_ITEM = "list_item" + + # Table elements + TABLE = "table" + TABLE_CELL = "table_cell" + TABLE_CAPTION = "table_caption" + + # Visual elements + IMAGE = "image" + FIGURE = "figure" + CHART = "chart" + DIAGRAM = "diagram" + + # Structural elements + SECTION = "section" + PARAGRAPH = "paragraph" + PAGE_NUMBER = "page_number" + WATERMARK = "watermark" + HEADER_GROUP = "header_group" + BODY = "body" + + # Special elements + CODE = "code" + FORMULA = "formula" + SIGNATURE = "signature" + STAMP = "stamp" + LOGO = "logo" + BARCODE = "barcode" + QR_CODE = "qr_code" + + +class ProcessingTrack(str, Enum): + """Processing track used for the document""" + OCR = "ocr" # PaddleOCR PP-StructureV3 track + DIRECT = "direct" # PyMuPDF direct extraction track + HYBRID = "hybrid" # Mixed processing (future) + + +@dataclass +class BoundingBox: + """Bounding box coordinates for document elements""" + x0: float # Left coordinate + y0: float # Top coordinate + x1: float # Right coordinate + y1: float # Bottom coordinate + + @property + def width(self) -> float: + return self.x1 - self.x0 + + @property + def height(self) -> float: + return self.y1 - self.y0 + + @property + def center_x(self) -> float: + return (self.x0 + self.x1) / 2 + + @property + def center_y(self) -> float: + return (self.y0 + self.y1) / 2 + + def to_dict(self) -> Dict[str, float]: + return { + "x0": self.x0, + "y0": self.y0, + "x1": self.x1, + "y1": self.y1, + "width": self.width, + "height": self.height + } + + def overlaps(self, other: 'BoundingBox', tolerance: float = 0) -> bool: + """Check if this bbox overlaps with another""" + return not ( + self.x1 + tolerance < other.x0 or + self.x0 - tolerance > other.x1 or + self.y1 + tolerance < other.y0 or + self.y0 - tolerance > other.y1 + ) + + def contains(self, other: 'BoundingBox', tolerance: float = 0) -> bool: + """Check if this bbox contains another""" + return ( + self.x0 - tolerance <= other.x0 and + self.y0 - tolerance <= other.y0 and + self.x1 + tolerance >= other.x1 and + self.y1 + tolerance >= other.y1 + ) + + +@dataclass +class StyleInfo: + """Style information for text elements""" + font_name: Optional[str] = None + font_size: Optional[float] = None + font_weight: Optional[str] = None # normal, bold + font_style: Optional[str] = None # normal, italic + text_color: Optional[int] = None # RGB as integer + bg_color: Optional[int] = None # Background color + alignment: Optional[str] = None # left, center, right, justify + + @property + def is_bold(self) -> bool: + return self.font_weight == "bold" + + @property + def is_italic(self) -> bool: + return self.font_style == "italic" + + def get_rgb_color(self) -> Optional[tuple]: + """Convert integer color to RGB tuple""" + if self.text_color is None: + return None + r = (self.text_color >> 16) & 0xFF + g = (self.text_color >> 8) & 0xFF + b = self.text_color & 0xFF + return (r, g, b) + + def to_dict(self) -> Dict[str, Any]: + result = {} + if self.font_name: + result["font_name"] = self.font_name + if self.font_size: + result["font_size"] = self.font_size + if self.font_weight: + result["font_weight"] = self.font_weight + if self.font_style: + result["font_style"] = self.font_style + if self.text_color is not None: + result["text_color"] = self.text_color + result["text_color_rgb"] = self.get_rgb_color() + if self.bg_color is not None: + result["bg_color"] = self.bg_color + if self.alignment: + result["alignment"] = self.alignment + return result + + +@dataclass +class TableCell: + """Table cell information""" + row: int + col: int + row_span: int = 1 + col_span: int = 1 + content: str = "" + bbox: Optional[BoundingBox] = None + style: Optional[StyleInfo] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "row": self.row, + "col": self.col, + "row_span": self.row_span, + "col_span": self.col_span, + "content": self.content, + "bbox": self.bbox.to_dict() if self.bbox else None, + "style": self.style.to_dict() if self.style else None + } + + +@dataclass +class TableData: + """Structured table data""" + rows: int + cols: int + cells: List[TableCell] = field(default_factory=list) + headers: Optional[List[str]] = None + caption: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "rows": self.rows, + "cols": self.cols, + "cells": [cell.to_dict() for cell in self.cells], + "headers": self.headers, + "caption": self.caption + } + + def to_html(self) -> str: + """Convert table to HTML representation""" + html = [""] + + if self.caption: + html.append(f"") + + # Group cells by row + rows_data = {} + for cell in self.cells: + if cell.row not in rows_data: + rows_data[cell.row] = [] + rows_data[cell.row].append(cell) + + # Generate HTML + for row_idx in range(self.rows): + html.append("") + if row_idx in rows_data: + for cell in sorted(rows_data[row_idx], key=lambda c: c.col): + span_attrs = [] + if cell.row_span > 1: + span_attrs.append(f'rowspan="{cell.row_span}"') + if cell.col_span > 1: + span_attrs.append(f'colspan="{cell.col_span}"') + span_str = " ".join(span_attrs) + tag = "th" if row_idx == 0 and self.headers else "td" + html.append(f'<{tag} {span_str}>{cell.content}') + html.append("") + + html.append("
{self.caption}
") + return "\n".join(html) + + +@dataclass +class DocumentElement: + """Individual document element (text, image, table, etc.)""" + element_id: str + type: ElementType + content: Union[str, TableData, bytes, Dict[str, Any]] + bbox: BoundingBox + confidence: Optional[float] = None # OCR confidence (0-1) + style: Optional[StyleInfo] = None + metadata: Dict[str, Any] = field(default_factory=dict) + children: List['DocumentElement'] = field(default_factory=list) + + @property + def is_text(self) -> bool: + return self.type in [ + ElementType.TEXT, ElementType.TITLE, ElementType.HEADER, + ElementType.FOOTER, ElementType.CAPTION, ElementType.PARAGRAPH + ] + + @property + def is_visual(self) -> bool: + return self.type in [ + ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART, + ElementType.DIAGRAM, ElementType.LOGO + ] + + @property + def is_table(self) -> bool: + return self.type in [ElementType.TABLE, ElementType.TABLE_CELL] + + def get_text(self) -> str: + """Extract text content from element""" + if isinstance(self.content, str): + return self.content + elif isinstance(self.content, TableData): + # Extract text from table cells + texts = [] + for cell in self.content.cells: + if cell.content: + texts.append(cell.content) + return " ".join(texts) + elif isinstance(self.content, dict) and "text" in self.content: + return self.content["text"] + return "" + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + result = { + "element_id": self.element_id, + "type": self.type.value, + "bbox": self.bbox.to_dict(), + } + + # Handle different content types + if isinstance(self.content, str): + result["content"] = self.content + elif isinstance(self.content, TableData): + result["content"] = self.content.to_dict() + result["content_type"] = "table" + elif isinstance(self.content, bytes): + result["content_type"] = "binary" + result["content_length"] = len(self.content) + elif isinstance(self.content, dict): + result["content"] = self.content + + if self.confidence is not None: + result["confidence"] = self.confidence + + if self.style: + result["style"] = self.style.to_dict() + + if self.metadata: + result["metadata"] = self.metadata + + if self.children: + result["children"] = [child.to_dict() for child in self.children] + + return result + + +@dataclass +class Dimensions: + """Page or image dimensions""" + width: float + height: float + dpi: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + result = {"width": self.width, "height": self.height} + if self.dpi: + result["dpi"] = self.dpi + return result + + +@dataclass +class Page: + """Single page in a document""" + page_number: int # 1-based page number + elements: List[DocumentElement] + dimensions: Dimensions + metadata: Dict[str, Any] = field(default_factory=dict) + + def get_reading_order(self) -> List[DocumentElement]: + """Get elements in reading order (top to bottom, left to right)""" + return sorted( + self.elements, + key=lambda e: (e.bbox.y0, e.bbox.x0) + ) + + def get_elements_by_type(self, element_type: ElementType) -> List[DocumentElement]: + """Get all elements of a specific type""" + return [e for e in self.elements if e.type == element_type] + + def get_text_elements(self) -> List[DocumentElement]: + """Get all text-containing elements""" + return [e for e in self.elements if e.is_text] + + def get_tables(self) -> List[DocumentElement]: + """Get all table elements""" + return [e for e in self.elements if e.type == ElementType.TABLE] + + def get_images(self) -> List[DocumentElement]: + """Get all image elements""" + return [e for e in self.elements if e.is_visual] + + def extract_text(self, separator: str = "\n") -> str: + """Extract all text from the page in reading order""" + texts = [] + for element in self.get_reading_order(): + text = element.get_text() + if text: + texts.append(text) + return separator.join(texts) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + return { + "page_number": self.page_number, + "elements": [e.to_dict() for e in self.elements], + "dimensions": self.dimensions.to_dict(), + "metadata": self.metadata, + "statistics": { + "total_elements": len(self.elements), + "text_elements": len(self.get_text_elements()), + "tables": len(self.get_tables()), + "images": len(self.get_images()) + } + } + + +@dataclass +class DocumentMetadata: + """Document-level metadata""" + filename: str + file_type: str + file_size: int + created_at: datetime + processing_track: ProcessingTrack + processing_time: float # seconds + language: Optional[str] = None + title: Optional[str] = None + author: Optional[str] = None + subject: Optional[str] = None + keywords: Optional[List[str]] = None + producer: Optional[str] = None + creator: Optional[str] = None + creation_date: Optional[datetime] = None + modification_date: Optional[datetime] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + result = { + "filename": self.filename, + "file_type": self.file_type, + "file_size": self.file_size, + "created_at": self.created_at.isoformat(), + "processing_track": self.processing_track.value, + "processing_time": self.processing_time, + } + + # Add optional fields if present + optional_fields = [ + "language", "title", "author", "subject", + "keywords", "producer", "creator" + ] + for field in optional_fields: + value = getattr(self, field) + if value is not None: + result[field] = value + + if self.creation_date: + result["creation_date"] = self.creation_date.isoformat() + if self.modification_date: + result["modification_date"] = self.modification_date.isoformat() + + return result + + +@dataclass +class UnifiedDocument: + """ + Unified document representation for both OCR and direct extraction tracks. + + This is the primary output format that ensures consistency across different + processing methods and enables seamless downstream processing. + """ + document_id: str + metadata: DocumentMetadata + pages: List[Page] + processing_errors: List[Dict[str, Any]] = field(default_factory=list) + + @property + def page_count(self) -> int: + return len(self.pages) + + @property + def total_elements(self) -> int: + return sum(len(page.elements) for page in self.pages) + + def get_page(self, page_number: int) -> Optional[Page]: + """Get page by number (1-based)""" + for page in self.pages: + if page.page_number == page_number: + return page + return None + + def extract_all_text(self, page_separator: str = "\n\n") -> str: + """Extract all text from the document""" + texts = [] + for page in self.pages: + page_text = page.extract_text() + if page_text: + texts.append(page_text) + return page_separator.join(texts) + + def get_all_tables(self) -> List[DocumentElement]: + """Get all tables from all pages""" + tables = [] + for page in self.pages: + tables.extend(page.get_tables()) + return tables + + def get_all_images(self) -> List[DocumentElement]: + """Get all images from all pages""" + images = [] + for page in self.pages: + images.extend(page.get_images()) + return images + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + return { + "document_id": self.document_id, + "metadata": self.metadata.to_dict(), + "pages": [page.to_dict() for page in self.pages], + "statistics": { + "page_count": self.page_count, + "total_elements": self.total_elements, + "total_tables": len(self.get_all_tables()), + "total_images": len(self.get_all_images()), + }, + "processing_errors": self.processing_errors + } + + def to_legacy_format(self) -> Dict[str, Any]: + """ + Convert to legacy format for backward compatibility. + + This ensures existing API clients continue to work while we transition + to the new unified format. + """ + # Extract text regions in legacy format + text_regions = [] + layout_data = [] + images_metadata = [] + + for page in self.pages: + page_num = page.page_number + + for element in page.elements: + if element.is_text: + # Legacy text region format + text_regions.append({ + "page": page_num, + "text": element.get_text(), + "confidence": element.confidence or 1.0, + "bbox": { + "x_min": element.bbox.x0, + "y_min": element.bbox.y0, + "x_max": element.bbox.x1, + "y_max": element.bbox.y1 + } + }) + + # Legacy layout data + layout_item = { + "element_id": element.element_id, + "type": element.type.value, + "page": page_num - 1, # Legacy uses 0-based + "bbox": [element.bbox.x0, element.bbox.y0, + element.bbox.x1, element.bbox.y1] + } + + if element.is_table and isinstance(element.content, TableData): + layout_item["content"] = element.content.to_html() + elif element.is_text: + layout_item["content"] = element.get_text() + + layout_data.append(layout_item) + + # Legacy image metadata + if element.is_visual: + images_metadata.append({ + "element_id": element.element_id, + "type": "image", + "page": page_num - 1, # Legacy uses 0-based + "bbox": [element.bbox.x0, element.bbox.y0, + element.bbox.x1, element.bbox.y1] + }) + + # Calculate average confidence + confidences = [r["confidence"] for r in text_regions if r.get("confidence")] + avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 + + return { + "status": "success", + "filename": self.metadata.filename, + "text_regions": text_regions, + "total_text_regions": len(text_regions), + "average_confidence": avg_confidence, + "processing_time": self.metadata.processing_time, + "language": self.metadata.language or "ch", + "layout_data": { + "elements": layout_data, + "total_elements": len(layout_data) + }, + "images_metadata": images_metadata, + "ocr_dimensions": { + "width": self.pages[0].dimensions.width if self.pages else 0, + "height": self.pages[0].dimensions.height if self.pages else 0 + }, + # New fields that won't break existing clients + "_unified_format": True, + "_processing_track": self.metadata.processing_track.value + } + + +class UnifiedDocumentConverter: + """Converter utilities for UnifiedDocument format""" + + @staticmethod + def from_ocr_result(ocr_result: Dict[str, Any], + document_id: str, + metadata: DocumentMetadata) -> UnifiedDocument: + """ + Convert PaddleOCR result to UnifiedDocument format. + + This handles the conversion from PP-StructureV3 output to our unified format. + """ + pages = [] + + # Handle single page or multi-page results + if "pages" in ocr_result: + page_results = ocr_result["pages"] + else: + page_results = [ocr_result] + + for page_idx, page_data in enumerate(page_results): + page_num = page_idx + 1 + elements = [] + + # Convert text regions + for idx, text_region in enumerate(page_data.get("text_regions", [])): + bbox_data = text_region.get("bbox", {}) + element = DocumentElement( + element_id=f"text_{page_num}_{idx}", + type=ElementType.TEXT, + content=text_region.get("text", ""), + bbox=BoundingBox( + x0=bbox_data.get("x_min", 0), + y0=bbox_data.get("y_min", 0), + x1=bbox_data.get("x_max", 0), + y1=bbox_data.get("y_max", 0) + ), + confidence=text_region.get("confidence") + ) + elements.append(element) + + # Convert layout elements if available + if "layout_data" in page_data and page_data["layout_data"]: + for layout_elem in page_data["layout_data"].get("elements", []): + # Map layout type to ElementType + layout_type = layout_elem.get("type", "text") + element_type = ElementType.TEXT # Default + + if "table" in layout_type.lower(): + element_type = ElementType.TABLE + elif "image" in layout_type.lower() or "figure" in layout_type.lower(): + element_type = ElementType.IMAGE + elif "title" in layout_type.lower(): + element_type = ElementType.TITLE + elif "list" in layout_type.lower(): + element_type = ElementType.LIST + + # Create element + bbox_list = layout_elem.get("bbox", [0, 0, 0, 0]) + element = DocumentElement( + element_id=layout_elem.get("element_id", f"layout_{page_num}_{len(elements)}"), + type=element_type, + content=layout_elem.get("content", ""), + bbox=BoundingBox( + x0=bbox_list[0] if len(bbox_list) > 0 else 0, + y0=bbox_list[1] if len(bbox_list) > 1 else 0, + x1=bbox_list[2] if len(bbox_list) > 2 else 0, + y1=bbox_list[3] if len(bbox_list) > 3 else 0 + ) + ) + elements.append(element) + + # Get page dimensions + ocr_dims = page_data.get("ocr_dimensions", {}) + dimensions = Dimensions( + width=ocr_dims.get("width", 0), + height=ocr_dims.get("height", 0) + ) + + pages.append(Page( + page_number=page_num, + elements=elements, + dimensions=dimensions + )) + + return UnifiedDocument( + document_id=document_id, + metadata=metadata, + pages=pages + ) + + @staticmethod + def from_direct_extraction(extraction_result: Dict[str, Any], + document_id: str, + metadata: DocumentMetadata) -> UnifiedDocument: + """ + Convert PyMuPDF extraction result to UnifiedDocument format. + + This will be implemented when we create the DirectExtractionEngine. + """ + # TODO: Implement when DirectExtractionEngine is created + pages = [] + return UnifiedDocument( + document_id=document_id, + metadata=metadata, + pages=pages + ) \ No newline at end of file diff --git a/backend/app/services/direct_extraction_engine.py b/backend/app/services/direct_extraction_engine.py new file mode 100644 index 0000000..36eebbb --- /dev/null +++ b/backend/app/services/direct_extraction_engine.py @@ -0,0 +1,633 @@ +""" +Direct Extraction Engine using PyMuPDF + +Handles direct text and structure extraction from editable PDFs without OCR. +This provides much faster processing and perfect accuracy for documents with +extractable text. +""" + +import os +import logging +import fitz # PyMuPDF +import uuid +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Any, Union +from datetime import datetime +import re + +from ..models.unified_document import ( + UnifiedDocument, DocumentElement, Page, DocumentMetadata, + BoundingBox, StyleInfo, TableData, TableCell, Dimensions, + ElementType, ProcessingTrack +) + +logger = logging.getLogger(__name__) + + +class DirectExtractionEngine: + """ + Engine for direct text extraction from editable PDFs using PyMuPDF. + + This engine provides: + - Fast text extraction with exact positioning + - Font and style information preservation + - Table structure detection + - Image extraction with coordinates + - Hyperlink and annotation extraction + """ + + def __init__(self, + enable_table_detection: bool = True, + enable_image_extraction: bool = True, + min_table_rows: int = 2, + min_table_cols: int = 2): + """ + Initialize the extraction engine. + + Args: + enable_table_detection: Whether to detect and extract tables + enable_image_extraction: Whether to extract images + min_table_rows: Minimum rows for table detection + min_table_cols: Minimum columns for table detection + """ + self.enable_table_detection = enable_table_detection + self.enable_image_extraction = enable_image_extraction + self.min_table_rows = min_table_rows + self.min_table_cols = min_table_cols + + def extract(self, + file_path: Path, + output_dir: Optional[Path] = None) -> UnifiedDocument: + """ + Extract content from PDF file to UnifiedDocument format. + + Args: + file_path: Path to PDF file + output_dir: Optional directory to save extracted images + + Returns: + UnifiedDocument with extracted content + """ + start_time = datetime.now() + document_id = str(uuid.uuid4()) + + try: + doc = fitz.open(str(file_path)) + + # Extract document metadata + metadata = self._extract_metadata(file_path, doc, start_time) + + # Extract pages + pages = [] + for page_num in range(len(doc)): + logger.info(f"Extracting page {page_num + 1}/{len(doc)}") + page = self._extract_page( + doc[page_num], + page_num + 1, + document_id, + output_dir + ) + pages.append(page) + + doc.close() + + # Calculate processing time + processing_time = (datetime.now() - start_time).total_seconds() + metadata.processing_time = processing_time + + logger.info(f"Direct extraction completed in {processing_time:.2f}s") + + return UnifiedDocument( + document_id=document_id, + metadata=metadata, + pages=pages + ) + + except Exception as e: + logger.error(f"Error during direct extraction: {e}") + # Return partial result with error information + processing_time = (datetime.now() - start_time).total_seconds() + + if 'metadata' not in locals(): + metadata = DocumentMetadata( + filename=file_path.name, + file_type="pdf", + file_size=file_path.stat().st_size if file_path.exists() else 0, + created_at=datetime.now(), + processing_track=ProcessingTrack.DIRECT, + processing_time=processing_time + ) + + return UnifiedDocument( + document_id=document_id, + metadata=metadata, + pages=pages if 'pages' in locals() else [], + processing_errors=[{ + "error": str(e), + "type": type(e).__name__ + }] + ) + + def _extract_metadata(self, + file_path: Path, + doc: fitz.Document, + start_time: datetime) -> DocumentMetadata: + """Extract document metadata""" + pdf_metadata = doc.metadata + + return DocumentMetadata( + filename=file_path.name, + file_type="pdf", + file_size=file_path.stat().st_size, + created_at=start_time, + processing_track=ProcessingTrack.DIRECT, + processing_time=0.0, # Will be updated later + title=pdf_metadata.get("title"), + author=pdf_metadata.get("author"), + subject=pdf_metadata.get("subject"), + keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None, + producer=pdf_metadata.get("producer"), + creator=pdf_metadata.get("creator"), + creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")), + modification_date=self._parse_pdf_date(pdf_metadata.get("modDate")) + ) + + def _parse_pdf_date(self, date_str: str) -> Optional[datetime]: + """Parse PDF date string to datetime""" + if not date_str: + return None + + try: + # PDF date format: D:YYYYMMDDHHmmSSOHH'mm + # Example: D:20240101120000+09'00 + if date_str.startswith("D:"): + date_str = date_str[2:] + + # Extract just the date/time part (first 14 characters) + if len(date_str) >= 14: + date_part = date_str[:14] + return datetime.strptime(date_part, "%Y%m%d%H%M%S") + except: + pass + + return None + + def _extract_page(self, + page: fitz.Page, + page_num: int, + document_id: str, + output_dir: Optional[Path]) -> Page: + """Extract content from a single page""" + elements = [] + element_counter = 0 + + # Get page dimensions + rect = page.rect + dimensions = Dimensions( + width=rect.width, + height=rect.height, + dpi=72 # PDF standard DPI + ) + + # Extract text blocks with formatting + text_dict = page.get_text("dict") + for block_idx, block in enumerate(text_dict.get("blocks", [])): + if block.get("type") == 0: # Text block + element = self._process_text_block( + block, page_num, element_counter + ) + if element: + elements.append(element) + element_counter += 1 + + # Extract tables (if enabled) + if self.enable_table_detection: + try: + # Try native table detection (PyMuPDF 1.23.0+) + tables = page.find_tables() + for table_idx, table in enumerate(tables): + element = self._process_native_table( + table, page_num, element_counter + ) + if element: + elements.append(element) + element_counter += 1 + except AttributeError: + # Fallback to positional table detection + logger.debug("Native table detection not available, using positional detection") + table_elements = self._detect_tables_by_position(page, page_num, element_counter) + elements.extend(table_elements) + element_counter += len(table_elements) + + # Extract images (if enabled) + if self.enable_image_extraction: + image_elements = self._extract_images( + page, page_num, document_id, element_counter, output_dir + ) + elements.extend(image_elements) + element_counter += len(image_elements) + + # Extract hyperlinks + links = page.get_links() + for link_idx, link in enumerate(links): + # Create link annotation element if it has URI + if link.get("uri"): + from_rect = link.get("from") + if from_rect: + element = DocumentElement( + element_id=f"link_{page_num}_{element_counter}", + type=ElementType.REFERENCE, + content={"uri": link["uri"], "type": "hyperlink"}, + bbox=BoundingBox( + x0=from_rect.x0, + y0=from_rect.y0, + x1=from_rect.x1, + y1=from_rect.y1 + ), + metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"} + ) + elements.append(element) + element_counter += 1 + + # Extract vector graphics (as metadata) + drawings = page.get_drawings() + if drawings: + logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands") + + return Page( + page_number=page_num, + elements=elements, + dimensions=dimensions, + metadata={ + "has_drawings": len(drawings) > 0, + "drawing_count": len(drawings), + "link_count": len(links) + } + ) + + def _process_text_block(self, block: Dict, page_num: int, counter: int) -> Optional[DocumentElement]: + """Process a text block into a DocumentElement""" + # Calculate block bounding box + bbox_data = block.get("bbox", [0, 0, 0, 0]) + bbox = BoundingBox( + x0=bbox_data[0], + y0=bbox_data[1], + x1=bbox_data[2], + y1=bbox_data[3] + ) + + # Extract text content + text_parts = [] + styles = [] + + for line in block.get("lines", []): + for span in line.get("spans", []): + text = span.get("text", "") + if text: + text_parts.append(text) + + # Extract style information + style = StyleInfo( + font_name=span.get("font"), + font_size=span.get("size"), + font_weight="bold" if span.get("flags", 0) & 2**4 else "normal", + font_style="italic" if span.get("flags", 0) & 2**1 else "normal", + text_color=span.get("color") + ) + styles.append(style) + + if not text_parts: + return None + + full_text = "".join(text_parts) + + # Determine element type based on content and style + element_type = self._infer_element_type(full_text, styles) + + # Use the most common style for the block + if styles: + block_style = styles[0] # Could be improved with style merging + else: + block_style = None + + return DocumentElement( + element_id=f"text_{page_num}_{counter}", + type=element_type, + content=full_text, + bbox=bbox, + style=block_style, + confidence=1.0 # Direct extraction has perfect confidence + ) + + def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType: + """Infer element type based on text content and styling""" + text_lower = text.lower().strip() + + # Check for common patterns + if len(text_lower) < 100 and styles: + # Short text with large font might be title/header + avg_size = sum(s.font_size or 12 for s in styles) / len(styles) + if avg_size > 16: + return ElementType.TITLE + elif avg_size > 14: + return ElementType.HEADER + + # Check for list patterns + if re.match(r'^[\d•·▪▫◦‣⁃]\s', text_lower): + return ElementType.LIST_ITEM + + # Check for page numbers + if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower): + return ElementType.PAGE_NUMBER + + # Check for footnote patterns + if re.match(r'^[\[\d+\]]|^\d+\)', text_lower): + return ElementType.FOOTNOTE + + # Default to paragraph for longer text, text for shorter + return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT + + def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]: + """Process a natively detected table""" + try: + # Extract table data + data = table.extract() + if not data or len(data) < self.min_table_rows: + return None + + # Get table bounding box + bbox_data = table.bbox + bbox = BoundingBox( + x0=bbox_data[0], + y0=bbox_data[1], + x1=bbox_data[2], + y1=bbox_data[3] + ) + + # Create table cells + cells = [] + for row_idx, row in enumerate(data): + for col_idx, cell_text in enumerate(row): + if cell_text: + cells.append(TableCell( + row=row_idx, + col=col_idx, + content=str(cell_text) if cell_text else "" + )) + + # Create table data + table_data = TableData( + rows=len(data), + cols=max(len(row) for row in data) if data else 0, + cells=cells, + headers=data[0] if data else None # Assume first row is header + ) + + return DocumentElement( + element_id=f"table_{page_num}_{counter}", + type=ElementType.TABLE, + content=table_data, + bbox=bbox, + confidence=1.0 + ) + + except Exception as e: + logger.error(f"Error processing native table: {e}") + return None + + def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]: + """Detect tables by analyzing text positioning""" + tables = [] + + # Get all words with positions + words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no) + + if not words: + return tables + + # Group words by approximate row (y-coordinate) + rows = {} + for word in words: + y = round(word[1] / 5) * 5 # Round to nearest 5 points + if y not in rows: + rows[y] = [] + rows[y].append({ + 'x0': word[0], + 'y0': word[1], + 'x1': word[2], + 'y1': word[3], + 'text': word[4], + 'block': word[5] if len(word) > 5 else 0 + }) + + # Sort rows by y-coordinate + sorted_rows = sorted(rows.items(), key=lambda x: x[0]) + + # Find potential tables (consecutive rows with multiple columns) + current_table_rows = [] + tables_found = [] + + for y, words_in_row in sorted_rows: + words_in_row.sort(key=lambda w: w['x0']) + + if len(words_in_row) >= self.min_table_cols: + # Check if this could be a table row + x_positions = [w['x0'] for w in words_in_row] + + # Check for somewhat regular spacing + if self._has_regular_spacing(x_positions): + current_table_rows.append((y, words_in_row)) + else: + # End current table if exists + if len(current_table_rows) >= self.min_table_rows: + tables_found.append(current_table_rows) + current_table_rows = [] + else: + # End current table if exists + if len(current_table_rows) >= self.min_table_rows: + tables_found.append(current_table_rows) + current_table_rows = [] + + # Don't forget the last table + if len(current_table_rows) >= self.min_table_rows: + tables_found.append(current_table_rows) + + # Convert detected tables to DocumentElements + for table_idx, table_rows in enumerate(tables_found): + if not table_rows: + continue + + # Calculate table bounding box + all_words = [] + for _, words in table_rows: + all_words.extend(words) + + min_x = min(w['x0'] for w in all_words) + min_y = min(w['y0'] for w in all_words) + max_x = max(w['x1'] for w in all_words) + max_y = max(w['y1'] for w in all_words) + + bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y) + + # Create table cells + cells = [] + for row_idx, (y, words) in enumerate(table_rows): + # Group words into columns + columns = self._group_into_columns(words, table_rows) + for col_idx, col_text in enumerate(columns): + if col_text: + cells.append(TableCell( + row=row_idx, + col=col_idx, + content=col_text + )) + + # Create table data + table_data = TableData( + rows=len(table_rows), + cols=max(len(self._group_into_columns(words, table_rows)) + for _, words in table_rows), + cells=cells + ) + + element = DocumentElement( + element_id=f"table_{page_num}_{counter + table_idx}", + type=ElementType.TABLE, + content=table_data, + bbox=bbox, + confidence=0.8, # Lower confidence for positional detection + metadata={"detection_method": "positional"} + ) + tables.append(element) + + return tables + + def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool: + """Check if x positions have somewhat regular spacing""" + if len(x_positions) < 3: + return False + + spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)] + avg_spacing = sum(spacings) / len(spacings) + + # Check if spacings are within tolerance of average + for spacing in spacings: + if abs(spacing - avg_spacing) > avg_spacing * tolerance: + return False + + return True + + def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]: + """Group words into columns based on x-position""" + if not words: + return [] + + # Find common column positions across all rows + all_x_positions = [] + for _, row_words in all_rows: + all_x_positions.extend([w['x0'] for w in row_words]) + + # Cluster x-positions to find columns + column_positions = self._cluster_positions(all_x_positions) + + # Assign words to columns + columns = [""] * len(column_positions) + for word in words: + # Find closest column + closest_col = 0 + min_dist = float('inf') + for col_idx, col_x in enumerate(column_positions): + dist = abs(word['x0'] - col_x) + if dist < min_dist: + min_dist = dist + closest_col = col_idx + + if columns[closest_col]: + columns[closest_col] += " " + word['text'] + else: + columns[closest_col] = word['text'] + + return columns + + def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]: + """Cluster positions to find common columns""" + if not positions: + return [] + + sorted_pos = sorted(positions) + clusters = [[sorted_pos[0]]] + + for pos in sorted_pos[1:]: + # Check if position belongs to current cluster + if pos - clusters[-1][-1] < threshold: + clusters[-1].append(pos) + else: + clusters.append([pos]) + + # Return average position of each cluster + return [sum(cluster) / len(cluster) for cluster in clusters] + + def _extract_images(self, + page: fitz.Page, + page_num: int, + document_id: str, + counter: int, + output_dir: Optional[Path]) -> List[DocumentElement]: + """Extract images from page""" + elements = [] + image_list = page.get_images() + + for img_idx, img in enumerate(image_list): + try: + xref = img[0] + + # Get image position(s) + img_rects = page.get_image_rects(xref) + if not img_rects: + continue + + rect = img_rects[0] # Use first occurrence + bbox = BoundingBox( + x0=rect.x0, + y0=rect.y0, + x1=rect.x1, + y1=rect.y1 + ) + + # Extract image data + pix = fitz.Pixmap(page.parent, xref) + image_data = { + "width": pix.width, + "height": pix.height, + "colorspace": pix.colorspace.name if pix.colorspace else "unknown", + "xref": xref + } + + # Save image if output directory provided + if output_dir: + output_dir.mkdir(parents=True, exist_ok=True) + image_filename = f"{document_id}_p{page_num}_img{img_idx}.png" + image_path = output_dir / image_filename + pix.save(str(image_path)) + image_data["saved_path"] = str(image_path) + logger.debug(f"Saved image to {image_path}") + + element = DocumentElement( + element_id=f"image_{page_num}_{counter + img_idx}", + type=ElementType.IMAGE, + content=image_data, + bbox=bbox, + confidence=1.0, + metadata={ + "image_index": img_idx, + "xref": xref + } + ) + elements.append(element) + + pix = None # Free memory + + except Exception as e: + logger.error(f"Error extracting image {img_idx}: {e}") + + return elements \ No newline at end of file diff --git a/backend/app/services/document_type_detector.py b/backend/app/services/document_type_detector.py new file mode 100644 index 0000000..cfeed1c --- /dev/null +++ b/backend/app/services/document_type_detector.py @@ -0,0 +1,397 @@ +""" +Document Type Detector Service + +Intelligently determines the optimal processing track for documents based on +file type, content analysis, and editability checks. +""" + +import os +import logging +import magic +import fitz # PyMuPDF +from pathlib import Path +from typing import Dict, Optional, Tuple, List +from enum import Enum +import statistics + +logger = logging.getLogger(__name__) + + +class DocumentType(str, Enum): + """Document type classification""" + PDF_EDITABLE = "pdf_editable" # PDF with extractable text + PDF_SCANNED = "pdf_scanned" # PDF with images/scanned content + PDF_MIXED = "pdf_mixed" # PDF with both text and scanned pages + IMAGE = "image" # Image files (PNG, JPG, etc.) + OFFICE_WORD = "office_word" # Word documents + OFFICE_EXCEL = "office_excel" # Excel spreadsheets + OFFICE_POWERPOINT = "office_ppt" # PowerPoint presentations + TEXT = "text" # Plain text files + UNKNOWN = "unknown" # Unknown format + + +class ProcessingTrackRecommendation: + """Processing track recommendation with confidence""" + + def __init__(self, + track: str, + confidence: float, + reason: str, + document_type: DocumentType, + metadata: Optional[Dict] = None): + self.track = track # "ocr" or "direct" + self.confidence = confidence # 0.0 to 1.0 + self.reason = reason + self.document_type = document_type + self.metadata = metadata or {} + + def to_dict(self) -> Dict: + return { + "recommended_track": self.track, + "confidence": self.confidence, + "reason": self.reason, + "document_type": self.document_type.value, + "metadata": self.metadata + } + + +class DocumentTypeDetector: + """ + Service for detecting document types and recommending processing tracks. + + This service analyzes documents to determine: + 1. The document type (PDF, image, Office, etc.) + 2. Whether the document contains extractable text + 3. The recommended processing track (OCR vs Direct) + """ + + # MIME type mappings + IMAGE_MIMES = { + 'image/png', 'image/jpeg', 'image/jpg', 'image/gif', + 'image/bmp', 'image/tiff', 'image/webp' + } + + OFFICE_MIMES = { + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': DocumentType.OFFICE_WORD, + 'application/msword': DocumentType.OFFICE_WORD, + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': DocumentType.OFFICE_EXCEL, + 'application/vnd.ms-excel': DocumentType.OFFICE_EXCEL, + 'application/vnd.openxmlformats-officedocument.presentationml.presentation': DocumentType.OFFICE_POWERPOINT, + 'application/vnd.ms-powerpoint': DocumentType.OFFICE_POWERPOINT, + } + + def __init__(self, + min_text_length: int = 100, + sample_pages: int = 3, + text_coverage_threshold: float = 0.9): + """ + Initialize the detector. + + Args: + min_text_length: Minimum text length to consider a page as having extractable text + sample_pages: Number of pages to sample for PDF analysis + text_coverage_threshold: Percentage of pages with text to classify as editable + """ + self.min_text_length = min_text_length + self.sample_pages = sample_pages + self.text_coverage_threshold = text_coverage_threshold + + def detect(self, file_path: Path) -> ProcessingTrackRecommendation: + """ + Detect document type and recommend processing track. + + Args: + file_path: Path to the document file + + Returns: + ProcessingTrackRecommendation with track selection and metadata + """ + if not file_path.exists(): + logger.error(f"File not found: {file_path}") + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.5, + reason="File not found, defaulting to OCR", + document_type=DocumentType.UNKNOWN + ) + + try: + # Detect MIME type + mime_type = magic.from_file(str(file_path), mime=True) + logger.info(f"Detected MIME type: {mime_type} for {file_path.name}") + + # Route based on file type + if mime_type == 'application/pdf': + return self._analyze_pdf(file_path) + elif mime_type in self.IMAGE_MIMES: + return self._analyze_image(file_path, mime_type) + elif mime_type in self.OFFICE_MIMES: + return self._analyze_office(file_path, mime_type) + elif mime_type.startswith('text/'): + return self._analyze_text(file_path, mime_type) + else: + logger.warning(f"Unknown MIME type: {mime_type}") + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.5, + reason=f"Unknown file type ({mime_type}), defaulting to OCR", + document_type=DocumentType.UNKNOWN + ) + + except Exception as e: + logger.error(f"Error detecting document type: {e}") + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.3, + reason=f"Error during detection: {str(e)}", + document_type=DocumentType.UNKNOWN + ) + + def _analyze_pdf(self, file_path: Path) -> ProcessingTrackRecommendation: + """ + Analyze PDF to determine if it's editable or scanned. + + Args: + file_path: Path to PDF file + + Returns: + Processing track recommendation + """ + try: + doc = fitz.open(str(file_path)) + total_pages = len(doc) + + # Sample pages for analysis + pages_to_check = min(self.sample_pages, total_pages) + text_pages = [] + page_details = [] + + for page_num in range(pages_to_check): + page = doc[page_num] + + # Extract text + text = page.get_text() + text_length = len(text.strip()) + + # Check for images + images = page.get_images() + image_count = len(images) + + # Calculate page area covered by images + page_rect = page.rect + page_area = page_rect.width * page_rect.height + image_area = 0 + + for img in images: + try: + # Get image rectangles + xref = img[0] + img_rects = page.get_image_rects(xref) + for rect in img_rects: + image_area += rect.width * rect.height + except: + pass + + image_coverage = image_area / page_area if page_area > 0 else 0 + + # Determine if page has meaningful text + has_text = text_length >= self.min_text_length + + text_pages.append(has_text) + page_details.append({ + "page": page_num + 1, + "text_length": text_length, + "has_text": has_text, + "image_count": image_count, + "image_coverage": image_coverage + }) + + logger.debug(f"Page {page_num + 1}: text_length={text_length}, " + f"images={image_count}, image_coverage={image_coverage:.2%}") + + doc.close() + + # Calculate text coverage + text_coverage = sum(text_pages) / len(text_pages) if text_pages else 0 + + # Determine document type and track + metadata = { + "total_pages": total_pages, + "sampled_pages": pages_to_check, + "text_coverage": text_coverage, + "page_details": page_details + } + + if text_coverage >= self.text_coverage_threshold: + # Mostly text-based PDF + return ProcessingTrackRecommendation( + track="direct", + confidence=0.95, + reason=f"PDF has extractable text on {text_coverage:.0%} of sampled pages", + document_type=DocumentType.PDF_EDITABLE, + metadata=metadata + ) + elif text_coverage <= 0.1: + # Mostly scanned/image PDF + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.95, + reason=f"PDF appears to be scanned (only {text_coverage:.0%} pages have text)", + document_type=DocumentType.PDF_SCANNED, + metadata=metadata + ) + else: + # Mixed content + # For mixed PDFs, we could implement page-level track selection in the future + # For now, use OCR to ensure we don't miss scanned content + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.7, + reason=f"PDF has mixed content ({text_coverage:.0%} text pages), using OCR for completeness", + document_type=DocumentType.PDF_MIXED, + metadata=metadata + ) + + except Exception as e: + logger.error(f"Error analyzing PDF: {e}") + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.5, + reason=f"Error analyzing PDF: {str(e)}", + document_type=DocumentType.PDF_SCANNED + ) + + def _analyze_image(self, file_path: Path, mime_type: str) -> ProcessingTrackRecommendation: + """ + Analyze image file. + + Images always require OCR processing. + """ + file_size = file_path.stat().st_size + metadata = { + "mime_type": mime_type, + "file_size": file_size, + "file_extension": file_path.suffix + } + + return ProcessingTrackRecommendation( + track="ocr", + confidence=1.0, + reason="Image files require OCR processing", + document_type=DocumentType.IMAGE, + metadata=metadata + ) + + def _analyze_office(self, file_path: Path, mime_type: str) -> ProcessingTrackRecommendation: + """ + Analyze Office document. + + Currently routes all Office documents to OCR track. + Future enhancement: implement direct extraction for Office files. + """ + document_type = self.OFFICE_MIMES.get(mime_type, DocumentType.UNKNOWN) + file_size = file_path.stat().st_size + + metadata = { + "mime_type": mime_type, + "file_size": file_size, + "file_extension": file_path.suffix + } + + # TODO: In future, we could implement direct extraction for Office files + # using python-docx, openpyxl, python-pptx + return ProcessingTrackRecommendation( + track="ocr", + confidence=0.9, + reason="Office documents currently processed via OCR (direct extraction planned)", + document_type=document_type, + metadata=metadata + ) + + def _analyze_text(self, file_path: Path, mime_type: str) -> ProcessingTrackRecommendation: + """ + Analyze text file. + + Plain text files can be directly processed without OCR. + """ + file_size = file_path.stat().st_size + metadata = { + "mime_type": mime_type, + "file_size": file_size, + "file_extension": file_path.suffix + } + + return ProcessingTrackRecommendation( + track="direct", + confidence=1.0, + reason="Plain text files can be directly processed", + document_type=DocumentType.TEXT, + metadata=metadata + ) + + def analyze_batch(self, file_paths: List[Path]) -> Dict[str, ProcessingTrackRecommendation]: + """ + Analyze multiple files and return recommendations. + + Args: + file_paths: List of file paths to analyze + + Returns: + Dictionary mapping file paths to recommendations + """ + results = {} + + for file_path in file_paths: + try: + recommendation = self.detect(file_path) + results[str(file_path)] = recommendation + logger.info(f"Analyzed {file_path.name}: {recommendation.track} " + f"(confidence: {recommendation.confidence:.2f})") + except Exception as e: + logger.error(f"Error analyzing {file_path}: {e}") + results[str(file_path)] = ProcessingTrackRecommendation( + track="ocr", + confidence=0.3, + reason=f"Error during analysis: {str(e)}", + document_type=DocumentType.UNKNOWN + ) + + return results + + def get_statistics(self, recommendations: Dict[str, ProcessingTrackRecommendation]) -> Dict: + """ + Calculate statistics from batch analysis results. + + Args: + recommendations: Dictionary of file recommendations + + Returns: + Statistics dictionary + """ + if not recommendations: + return {"total": 0} + + tracks = [r.track for r in recommendations.values()] + confidences = [r.confidence for r in recommendations.values()] + doc_types = [r.document_type.value for r in recommendations.values()] + + stats = { + "total": len(recommendations), + "by_track": { + "ocr": tracks.count("ocr"), + "direct": tracks.count("direct") + }, + "by_document_type": {}, + "confidence": { + "mean": statistics.mean(confidences), + "median": statistics.median(confidences), + "min": min(confidences), + "max": max(confidences) + } + } + + # Count by document type + for doc_type in set(doc_types): + stats["by_document_type"][doc_type] = doc_types.count(doc_type) + + return stats \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5c96ea8..c018589 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,11 @@ reportlab>=4.0.0 # Layout-preserving PDF generation with precise coordinate con PyPDF2>=3.0.0 # Extract dimensions from source PDF files # Note: pandoc needs to be installed via brew (brew install pandoc) +# ===== Direct PDF Extraction (Dual-track Processing) ===== +PyMuPDF>=1.23.0 # Primary library for editable PDF text/structure extraction +pdfplumber>=0.10.0 # Fallback for table extraction and validation +python-magic-bin>=0.4.14 # Windows-compatible file type detection + # ===== Data Export ===== pandas>=2.1.0 openpyxl>=3.1.0 # Excel support