feat: implement core dual-track processing infrastructure

Added foundation for dual-track document processing:

1. UnifiedDocument Model (backend/app/models/unified_document.py)
   - Common output format for both OCR and direct extraction
   - Comprehensive element types (23+ types from PP-StructureV3)
   - BoundingBox, StyleInfo, TableData structures
   - Backward compatibility with legacy format

2. DocumentTypeDetector Service (backend/app/services/document_type_detector.py)
   - Intelligent document type detection using python-magic
   - PDF editability analysis using PyMuPDF
   - Processing track recommendation with confidence scores
   - Support for PDF, images, Office docs, and text files

3. DirectExtractionEngine Service (backend/app/services/direct_extraction_engine.py)
   - Fast extraction from editable PDFs using PyMuPDF
   - Preserves fonts, colors, and exact positioning
   - Native and positional table detection
   - Image extraction with coordinates
   - Hyperlink and metadata extraction

4. Dependencies
   - Added PyMuPDF>=1.23.0 for PDF extraction
   - Added pdfplumber>=0.10.0 as fallback
   - Added python-magic-bin>=0.4.14 for file detection

Next: Integrate with OCR service for complete dual-track processing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-11-18 20:17:50 +08:00
parent cd3cbea49d
commit 2d50c128f7
4 changed files with 1729 additions and 0 deletions

View File

@@ -0,0 +1,694 @@
"""
Unified Document Model for Dual-track Processing
This module defines the common data structure used by both OCR and direct extraction tracks
to ensure consistent output format regardless of processing method.
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Union, Literal, Any
from datetime import datetime
from enum import Enum
class ElementType(str, Enum):
"""Document element types supporting all 23 PP-StructureV3 types plus custom ones"""
# Text elements
TEXT = "text"
TITLE = "title"
HEADER = "header"
FOOTER = "footer"
REFERENCE = "reference"
EQUATION = "equation"
FOOTNOTE = "footnote"
CAPTION = "caption"
# List elements
LIST = "list"
LIST_ITEM = "list_item"
# Table elements
TABLE = "table"
TABLE_CELL = "table_cell"
TABLE_CAPTION = "table_caption"
# Visual elements
IMAGE = "image"
FIGURE = "figure"
CHART = "chart"
DIAGRAM = "diagram"
# Structural elements
SECTION = "section"
PARAGRAPH = "paragraph"
PAGE_NUMBER = "page_number"
WATERMARK = "watermark"
HEADER_GROUP = "header_group"
BODY = "body"
# Special elements
CODE = "code"
FORMULA = "formula"
SIGNATURE = "signature"
STAMP = "stamp"
LOGO = "logo"
BARCODE = "barcode"
QR_CODE = "qr_code"
class ProcessingTrack(str, Enum):
"""Processing track used for the document"""
OCR = "ocr" # PaddleOCR PP-StructureV3 track
DIRECT = "direct" # PyMuPDF direct extraction track
HYBRID = "hybrid" # Mixed processing (future)
@dataclass
class BoundingBox:
"""Bounding box coordinates for document elements"""
x0: float # Left coordinate
y0: float # Top coordinate
x1: float # Right coordinate
y1: float # Bottom coordinate
@property
def width(self) -> float:
return self.x1 - self.x0
@property
def height(self) -> float:
return self.y1 - self.y0
@property
def center_x(self) -> float:
return (self.x0 + self.x1) / 2
@property
def center_y(self) -> float:
return (self.y0 + self.y1) / 2
def to_dict(self) -> Dict[str, float]:
return {
"x0": self.x0,
"y0": self.y0,
"x1": self.x1,
"y1": self.y1,
"width": self.width,
"height": self.height
}
def overlaps(self, other: 'BoundingBox', tolerance: float = 0) -> bool:
"""Check if this bbox overlaps with another"""
return not (
self.x1 + tolerance < other.x0 or
self.x0 - tolerance > other.x1 or
self.y1 + tolerance < other.y0 or
self.y0 - tolerance > other.y1
)
def contains(self, other: 'BoundingBox', tolerance: float = 0) -> bool:
"""Check if this bbox contains another"""
return (
self.x0 - tolerance <= other.x0 and
self.y0 - tolerance <= other.y0 and
self.x1 + tolerance >= other.x1 and
self.y1 + tolerance >= other.y1
)
@dataclass
class StyleInfo:
"""Style information for text elements"""
font_name: Optional[str] = None
font_size: Optional[float] = None
font_weight: Optional[str] = None # normal, bold
font_style: Optional[str] = None # normal, italic
text_color: Optional[int] = None # RGB as integer
bg_color: Optional[int] = None # Background color
alignment: Optional[str] = None # left, center, right, justify
@property
def is_bold(self) -> bool:
return self.font_weight == "bold"
@property
def is_italic(self) -> bool:
return self.font_style == "italic"
def get_rgb_color(self) -> Optional[tuple]:
"""Convert integer color to RGB tuple"""
if self.text_color is None:
return None
r = (self.text_color >> 16) & 0xFF
g = (self.text_color >> 8) & 0xFF
b = self.text_color & 0xFF
return (r, g, b)
def to_dict(self) -> Dict[str, Any]:
result = {}
if self.font_name:
result["font_name"] = self.font_name
if self.font_size:
result["font_size"] = self.font_size
if self.font_weight:
result["font_weight"] = self.font_weight
if self.font_style:
result["font_style"] = self.font_style
if self.text_color is not None:
result["text_color"] = self.text_color
result["text_color_rgb"] = self.get_rgb_color()
if self.bg_color is not None:
result["bg_color"] = self.bg_color
if self.alignment:
result["alignment"] = self.alignment
return result
@dataclass
class TableCell:
"""Table cell information"""
row: int
col: int
row_span: int = 1
col_span: int = 1
content: str = ""
bbox: Optional[BoundingBox] = None
style: Optional[StyleInfo] = None
def to_dict(self) -> Dict[str, Any]:
return {
"row": self.row,
"col": self.col,
"row_span": self.row_span,
"col_span": self.col_span,
"content": self.content,
"bbox": self.bbox.to_dict() if self.bbox else None,
"style": self.style.to_dict() if self.style else None
}
@dataclass
class TableData:
"""Structured table data"""
rows: int
cols: int
cells: List[TableCell] = field(default_factory=list)
headers: Optional[List[str]] = None
caption: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"rows": self.rows,
"cols": self.cols,
"cells": [cell.to_dict() for cell in self.cells],
"headers": self.headers,
"caption": self.caption
}
def to_html(self) -> str:
"""Convert table to HTML representation"""
html = ["<table>"]
if self.caption:
html.append(f"<caption>{self.caption}</caption>")
# Group cells by row
rows_data = {}
for cell in self.cells:
if cell.row not in rows_data:
rows_data[cell.row] = []
rows_data[cell.row].append(cell)
# Generate HTML
for row_idx in range(self.rows):
html.append("<tr>")
if row_idx in rows_data:
for cell in sorted(rows_data[row_idx], key=lambda c: c.col):
span_attrs = []
if cell.row_span > 1:
span_attrs.append(f'rowspan="{cell.row_span}"')
if cell.col_span > 1:
span_attrs.append(f'colspan="{cell.col_span}"')
span_str = " ".join(span_attrs)
tag = "th" if row_idx == 0 and self.headers else "td"
html.append(f'<{tag} {span_str}>{cell.content}</{tag}>')
html.append("</tr>")
html.append("</table>")
return "\n".join(html)
@dataclass
class DocumentElement:
"""Individual document element (text, image, table, etc.)"""
element_id: str
type: ElementType
content: Union[str, TableData, bytes, Dict[str, Any]]
bbox: BoundingBox
confidence: Optional[float] = None # OCR confidence (0-1)
style: Optional[StyleInfo] = None
metadata: Dict[str, Any] = field(default_factory=dict)
children: List['DocumentElement'] = field(default_factory=list)
@property
def is_text(self) -> bool:
return self.type in [
ElementType.TEXT, ElementType.TITLE, ElementType.HEADER,
ElementType.FOOTER, ElementType.CAPTION, ElementType.PARAGRAPH
]
@property
def is_visual(self) -> bool:
return self.type in [
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
ElementType.DIAGRAM, ElementType.LOGO
]
@property
def is_table(self) -> bool:
return self.type in [ElementType.TABLE, ElementType.TABLE_CELL]
def get_text(self) -> str:
"""Extract text content from element"""
if isinstance(self.content, str):
return self.content
elif isinstance(self.content, TableData):
# Extract text from table cells
texts = []
for cell in self.content.cells:
if cell.content:
texts.append(cell.content)
return " ".join(texts)
elif isinstance(self.content, dict) and "text" in self.content:
return self.content["text"]
return ""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
result = {
"element_id": self.element_id,
"type": self.type.value,
"bbox": self.bbox.to_dict(),
}
# Handle different content types
if isinstance(self.content, str):
result["content"] = self.content
elif isinstance(self.content, TableData):
result["content"] = self.content.to_dict()
result["content_type"] = "table"
elif isinstance(self.content, bytes):
result["content_type"] = "binary"
result["content_length"] = len(self.content)
elif isinstance(self.content, dict):
result["content"] = self.content
if self.confidence is not None:
result["confidence"] = self.confidence
if self.style:
result["style"] = self.style.to_dict()
if self.metadata:
result["metadata"] = self.metadata
if self.children:
result["children"] = [child.to_dict() for child in self.children]
return result
@dataclass
class Dimensions:
"""Page or image dimensions"""
width: float
height: float
dpi: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
result = {"width": self.width, "height": self.height}
if self.dpi:
result["dpi"] = self.dpi
return result
@dataclass
class Page:
"""Single page in a document"""
page_number: int # 1-based page number
elements: List[DocumentElement]
dimensions: Dimensions
metadata: Dict[str, Any] = field(default_factory=dict)
def get_reading_order(self) -> List[DocumentElement]:
"""Get elements in reading order (top to bottom, left to right)"""
return sorted(
self.elements,
key=lambda e: (e.bbox.y0, e.bbox.x0)
)
def get_elements_by_type(self, element_type: ElementType) -> List[DocumentElement]:
"""Get all elements of a specific type"""
return [e for e in self.elements if e.type == element_type]
def get_text_elements(self) -> List[DocumentElement]:
"""Get all text-containing elements"""
return [e for e in self.elements if e.is_text]
def get_tables(self) -> List[DocumentElement]:
"""Get all table elements"""
return [e for e in self.elements if e.type == ElementType.TABLE]
def get_images(self) -> List[DocumentElement]:
"""Get all image elements"""
return [e for e in self.elements if e.is_visual]
def extract_text(self, separator: str = "\n") -> str:
"""Extract all text from the page in reading order"""
texts = []
for element in self.get_reading_order():
text = element.get_text()
if text:
texts.append(text)
return separator.join(texts)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
return {
"page_number": self.page_number,
"elements": [e.to_dict() for e in self.elements],
"dimensions": self.dimensions.to_dict(),
"metadata": self.metadata,
"statistics": {
"total_elements": len(self.elements),
"text_elements": len(self.get_text_elements()),
"tables": len(self.get_tables()),
"images": len(self.get_images())
}
}
@dataclass
class DocumentMetadata:
"""Document-level metadata"""
filename: str
file_type: str
file_size: int
created_at: datetime
processing_track: ProcessingTrack
processing_time: float # seconds
language: Optional[str] = None
title: Optional[str] = None
author: Optional[str] = None
subject: Optional[str] = None
keywords: Optional[List[str]] = None
producer: Optional[str] = None
creator: Optional[str] = None
creation_date: Optional[datetime] = None
modification_date: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
result = {
"filename": self.filename,
"file_type": self.file_type,
"file_size": self.file_size,
"created_at": self.created_at.isoformat(),
"processing_track": self.processing_track.value,
"processing_time": self.processing_time,
}
# Add optional fields if present
optional_fields = [
"language", "title", "author", "subject",
"keywords", "producer", "creator"
]
for field in optional_fields:
value = getattr(self, field)
if value is not None:
result[field] = value
if self.creation_date:
result["creation_date"] = self.creation_date.isoformat()
if self.modification_date:
result["modification_date"] = self.modification_date.isoformat()
return result
@dataclass
class UnifiedDocument:
"""
Unified document representation for both OCR and direct extraction tracks.
This is the primary output format that ensures consistency across different
processing methods and enables seamless downstream processing.
"""
document_id: str
metadata: DocumentMetadata
pages: List[Page]
processing_errors: List[Dict[str, Any]] = field(default_factory=list)
@property
def page_count(self) -> int:
return len(self.pages)
@property
def total_elements(self) -> int:
return sum(len(page.elements) for page in self.pages)
def get_page(self, page_number: int) -> Optional[Page]:
"""Get page by number (1-based)"""
for page in self.pages:
if page.page_number == page_number:
return page
return None
def extract_all_text(self, page_separator: str = "\n\n") -> str:
"""Extract all text from the document"""
texts = []
for page in self.pages:
page_text = page.extract_text()
if page_text:
texts.append(page_text)
return page_separator.join(texts)
def get_all_tables(self) -> List[DocumentElement]:
"""Get all tables from all pages"""
tables = []
for page in self.pages:
tables.extend(page.get_tables())
return tables
def get_all_images(self) -> List[DocumentElement]:
"""Get all images from all pages"""
images = []
for page in self.pages:
images.extend(page.get_images())
return images
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
return {
"document_id": self.document_id,
"metadata": self.metadata.to_dict(),
"pages": [page.to_dict() for page in self.pages],
"statistics": {
"page_count": self.page_count,
"total_elements": self.total_elements,
"total_tables": len(self.get_all_tables()),
"total_images": len(self.get_all_images()),
},
"processing_errors": self.processing_errors
}
def to_legacy_format(self) -> Dict[str, Any]:
"""
Convert to legacy format for backward compatibility.
This ensures existing API clients continue to work while we transition
to the new unified format.
"""
# Extract text regions in legacy format
text_regions = []
layout_data = []
images_metadata = []
for page in self.pages:
page_num = page.page_number
for element in page.elements:
if element.is_text:
# Legacy text region format
text_regions.append({
"page": page_num,
"text": element.get_text(),
"confidence": element.confidence or 1.0,
"bbox": {
"x_min": element.bbox.x0,
"y_min": element.bbox.y0,
"x_max": element.bbox.x1,
"y_max": element.bbox.y1
}
})
# Legacy layout data
layout_item = {
"element_id": element.element_id,
"type": element.type.value,
"page": page_num - 1, # Legacy uses 0-based
"bbox": [element.bbox.x0, element.bbox.y0,
element.bbox.x1, element.bbox.y1]
}
if element.is_table and isinstance(element.content, TableData):
layout_item["content"] = element.content.to_html()
elif element.is_text:
layout_item["content"] = element.get_text()
layout_data.append(layout_item)
# Legacy image metadata
if element.is_visual:
images_metadata.append({
"element_id": element.element_id,
"type": "image",
"page": page_num - 1, # Legacy uses 0-based
"bbox": [element.bbox.x0, element.bbox.y0,
element.bbox.x1, element.bbox.y1]
})
# Calculate average confidence
confidences = [r["confidence"] for r in text_regions if r.get("confidence")]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
return {
"status": "success",
"filename": self.metadata.filename,
"text_regions": text_regions,
"total_text_regions": len(text_regions),
"average_confidence": avg_confidence,
"processing_time": self.metadata.processing_time,
"language": self.metadata.language or "ch",
"layout_data": {
"elements": layout_data,
"total_elements": len(layout_data)
},
"images_metadata": images_metadata,
"ocr_dimensions": {
"width": self.pages[0].dimensions.width if self.pages else 0,
"height": self.pages[0].dimensions.height if self.pages else 0
},
# New fields that won't break existing clients
"_unified_format": True,
"_processing_track": self.metadata.processing_track.value
}
class UnifiedDocumentConverter:
"""Converter utilities for UnifiedDocument format"""
@staticmethod
def from_ocr_result(ocr_result: Dict[str, Any],
document_id: str,
metadata: DocumentMetadata) -> UnifiedDocument:
"""
Convert PaddleOCR result to UnifiedDocument format.
This handles the conversion from PP-StructureV3 output to our unified format.
"""
pages = []
# Handle single page or multi-page results
if "pages" in ocr_result:
page_results = ocr_result["pages"]
else:
page_results = [ocr_result]
for page_idx, page_data in enumerate(page_results):
page_num = page_idx + 1
elements = []
# Convert text regions
for idx, text_region in enumerate(page_data.get("text_regions", [])):
bbox_data = text_region.get("bbox", {})
element = DocumentElement(
element_id=f"text_{page_num}_{idx}",
type=ElementType.TEXT,
content=text_region.get("text", ""),
bbox=BoundingBox(
x0=bbox_data.get("x_min", 0),
y0=bbox_data.get("y_min", 0),
x1=bbox_data.get("x_max", 0),
y1=bbox_data.get("y_max", 0)
),
confidence=text_region.get("confidence")
)
elements.append(element)
# Convert layout elements if available
if "layout_data" in page_data and page_data["layout_data"]:
for layout_elem in page_data["layout_data"].get("elements", []):
# Map layout type to ElementType
layout_type = layout_elem.get("type", "text")
element_type = ElementType.TEXT # Default
if "table" in layout_type.lower():
element_type = ElementType.TABLE
elif "image" in layout_type.lower() or "figure" in layout_type.lower():
element_type = ElementType.IMAGE
elif "title" in layout_type.lower():
element_type = ElementType.TITLE
elif "list" in layout_type.lower():
element_type = ElementType.LIST
# Create element
bbox_list = layout_elem.get("bbox", [0, 0, 0, 0])
element = DocumentElement(
element_id=layout_elem.get("element_id", f"layout_{page_num}_{len(elements)}"),
type=element_type,
content=layout_elem.get("content", ""),
bbox=BoundingBox(
x0=bbox_list[0] if len(bbox_list) > 0 else 0,
y0=bbox_list[1] if len(bbox_list) > 1 else 0,
x1=bbox_list[2] if len(bbox_list) > 2 else 0,
y1=bbox_list[3] if len(bbox_list) > 3 else 0
)
)
elements.append(element)
# Get page dimensions
ocr_dims = page_data.get("ocr_dimensions", {})
dimensions = Dimensions(
width=ocr_dims.get("width", 0),
height=ocr_dims.get("height", 0)
)
pages.append(Page(
page_number=page_num,
elements=elements,
dimensions=dimensions
))
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages
)
@staticmethod
def from_direct_extraction(extraction_result: Dict[str, Any],
document_id: str,
metadata: DocumentMetadata) -> UnifiedDocument:
"""
Convert PyMuPDF extraction result to UnifiedDocument format.
This will be implemented when we create the DirectExtractionEngine.
"""
# TODO: Implement when DirectExtractionEngine is created
pages = []
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages
)

View File

@@ -0,0 +1,633 @@
"""
Direct Extraction Engine using PyMuPDF
Handles direct text and structure extraction from editable PDFs without OCR.
This provides much faster processing and perfect accuracy for documents with
extractable text.
"""
import os
import logging
import fitz # PyMuPDF
import uuid
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
from datetime import datetime
import re
from ..models.unified_document import (
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
ElementType, ProcessingTrack
)
logger = logging.getLogger(__name__)
class DirectExtractionEngine:
"""
Engine for direct text extraction from editable PDFs using PyMuPDF.
This engine provides:
- Fast text extraction with exact positioning
- Font and style information preservation
- Table structure detection
- Image extraction with coordinates
- Hyperlink and annotation extraction
"""
def __init__(self,
enable_table_detection: bool = True,
enable_image_extraction: bool = True,
min_table_rows: int = 2,
min_table_cols: int = 2):
"""
Initialize the extraction engine.
Args:
enable_table_detection: Whether to detect and extract tables
enable_image_extraction: Whether to extract images
min_table_rows: Minimum rows for table detection
min_table_cols: Minimum columns for table detection
"""
self.enable_table_detection = enable_table_detection
self.enable_image_extraction = enable_image_extraction
self.min_table_rows = min_table_rows
self.min_table_cols = min_table_cols
def extract(self,
file_path: Path,
output_dir: Optional[Path] = None) -> UnifiedDocument:
"""
Extract content from PDF file to UnifiedDocument format.
Args:
file_path: Path to PDF file
output_dir: Optional directory to save extracted images
Returns:
UnifiedDocument with extracted content
"""
start_time = datetime.now()
document_id = str(uuid.uuid4())
try:
doc = fitz.open(str(file_path))
# Extract document metadata
metadata = self._extract_metadata(file_path, doc, start_time)
# Extract pages
pages = []
for page_num in range(len(doc)):
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
page = self._extract_page(
doc[page_num],
page_num + 1,
document_id,
output_dir
)
pages.append(page)
doc.close()
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
metadata.processing_time = processing_time
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages
)
except Exception as e:
logger.error(f"Error during direct extraction: {e}")
# Return partial result with error information
processing_time = (datetime.now() - start_time).total_seconds()
if 'metadata' not in locals():
metadata = DocumentMetadata(
filename=file_path.name,
file_type="pdf",
file_size=file_path.stat().st_size if file_path.exists() else 0,
created_at=datetime.now(),
processing_track=ProcessingTrack.DIRECT,
processing_time=processing_time
)
return UnifiedDocument(
document_id=document_id,
metadata=metadata,
pages=pages if 'pages' in locals() else [],
processing_errors=[{
"error": str(e),
"type": type(e).__name__
}]
)
def _extract_metadata(self,
file_path: Path,
doc: fitz.Document,
start_time: datetime) -> DocumentMetadata:
"""Extract document metadata"""
pdf_metadata = doc.metadata
return DocumentMetadata(
filename=file_path.name,
file_type="pdf",
file_size=file_path.stat().st_size,
created_at=start_time,
processing_track=ProcessingTrack.DIRECT,
processing_time=0.0, # Will be updated later
title=pdf_metadata.get("title"),
author=pdf_metadata.get("author"),
subject=pdf_metadata.get("subject"),
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
producer=pdf_metadata.get("producer"),
creator=pdf_metadata.get("creator"),
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
)
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
"""Parse PDF date string to datetime"""
if not date_str:
return None
try:
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
# Example: D:20240101120000+09'00
if date_str.startswith("D:"):
date_str = date_str[2:]
# Extract just the date/time part (first 14 characters)
if len(date_str) >= 14:
date_part = date_str[:14]
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
except:
pass
return None
def _extract_page(self,
page: fitz.Page,
page_num: int,
document_id: str,
output_dir: Optional[Path]) -> Page:
"""Extract content from a single page"""
elements = []
element_counter = 0
# Get page dimensions
rect = page.rect
dimensions = Dimensions(
width=rect.width,
height=rect.height,
dpi=72 # PDF standard DPI
)
# Extract text blocks with formatting
text_dict = page.get_text("dict")
for block_idx, block in enumerate(text_dict.get("blocks", [])):
if block.get("type") == 0: # Text block
element = self._process_text_block(
block, page_num, element_counter
)
if element:
elements.append(element)
element_counter += 1
# Extract tables (if enabled)
if self.enable_table_detection:
try:
# Try native table detection (PyMuPDF 1.23.0+)
tables = page.find_tables()
for table_idx, table in enumerate(tables):
element = self._process_native_table(
table, page_num, element_counter
)
if element:
elements.append(element)
element_counter += 1
except AttributeError:
# Fallback to positional table detection
logger.debug("Native table detection not available, using positional detection")
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
elements.extend(table_elements)
element_counter += len(table_elements)
# Extract images (if enabled)
if self.enable_image_extraction:
image_elements = self._extract_images(
page, page_num, document_id, element_counter, output_dir
)
elements.extend(image_elements)
element_counter += len(image_elements)
# Extract hyperlinks
links = page.get_links()
for link_idx, link in enumerate(links):
# Create link annotation element if it has URI
if link.get("uri"):
from_rect = link.get("from")
if from_rect:
element = DocumentElement(
element_id=f"link_{page_num}_{element_counter}",
type=ElementType.REFERENCE,
content={"uri": link["uri"], "type": "hyperlink"},
bbox=BoundingBox(
x0=from_rect.x0,
y0=from_rect.y0,
x1=from_rect.x1,
y1=from_rect.y1
),
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
)
elements.append(element)
element_counter += 1
# Extract vector graphics (as metadata)
drawings = page.get_drawings()
if drawings:
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
return Page(
page_number=page_num,
elements=elements,
dimensions=dimensions,
metadata={
"has_drawings": len(drawings) > 0,
"drawing_count": len(drawings),
"link_count": len(links)
}
)
def _process_text_block(self, block: Dict, page_num: int, counter: int) -> Optional[DocumentElement]:
"""Process a text block into a DocumentElement"""
# Calculate block bounding box
bbox_data = block.get("bbox", [0, 0, 0, 0])
bbox = BoundingBox(
x0=bbox_data[0],
y0=bbox_data[1],
x1=bbox_data[2],
y1=bbox_data[3]
)
# Extract text content
text_parts = []
styles = []
for line in block.get("lines", []):
for span in line.get("spans", []):
text = span.get("text", "")
if text:
text_parts.append(text)
# Extract style information
style = StyleInfo(
font_name=span.get("font"),
font_size=span.get("size"),
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
text_color=span.get("color")
)
styles.append(style)
if not text_parts:
return None
full_text = "".join(text_parts)
# Determine element type based on content and style
element_type = self._infer_element_type(full_text, styles)
# Use the most common style for the block
if styles:
block_style = styles[0] # Could be improved with style merging
else:
block_style = None
return DocumentElement(
element_id=f"text_{page_num}_{counter}",
type=element_type,
content=full_text,
bbox=bbox,
style=block_style,
confidence=1.0 # Direct extraction has perfect confidence
)
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
"""Infer element type based on text content and styling"""
text_lower = text.lower().strip()
# Check for common patterns
if len(text_lower) < 100 and styles:
# Short text with large font might be title/header
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
if avg_size > 16:
return ElementType.TITLE
elif avg_size > 14:
return ElementType.HEADER
# Check for list patterns
if re.match(r'^[\d•·▪▫◦‣]\s', text_lower):
return ElementType.LIST_ITEM
# Check for page numbers
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
return ElementType.PAGE_NUMBER
# Check for footnote patterns
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
return ElementType.FOOTNOTE
# Default to paragraph for longer text, text for shorter
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]:
"""Process a natively detected table"""
try:
# Extract table data
data = table.extract()
if not data or len(data) < self.min_table_rows:
return None
# Get table bounding box
bbox_data = table.bbox
bbox = BoundingBox(
x0=bbox_data[0],
y0=bbox_data[1],
x1=bbox_data[2],
y1=bbox_data[3]
)
# Create table cells
cells = []
for row_idx, row in enumerate(data):
for col_idx, cell_text in enumerate(row):
if cell_text:
cells.append(TableCell(
row=row_idx,
col=col_idx,
content=str(cell_text) if cell_text else ""
))
# Create table data
table_data = TableData(
rows=len(data),
cols=max(len(row) for row in data) if data else 0,
cells=cells,
headers=data[0] if data else None # Assume first row is header
)
return DocumentElement(
element_id=f"table_{page_num}_{counter}",
type=ElementType.TABLE,
content=table_data,
bbox=bbox,
confidence=1.0
)
except Exception as e:
logger.error(f"Error processing native table: {e}")
return None
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
"""Detect tables by analyzing text positioning"""
tables = []
# Get all words with positions
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
if not words:
return tables
# Group words by approximate row (y-coordinate)
rows = {}
for word in words:
y = round(word[1] / 5) * 5 # Round to nearest 5 points
if y not in rows:
rows[y] = []
rows[y].append({
'x0': word[0],
'y0': word[1],
'x1': word[2],
'y1': word[3],
'text': word[4],
'block': word[5] if len(word) > 5 else 0
})
# Sort rows by y-coordinate
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
# Find potential tables (consecutive rows with multiple columns)
current_table_rows = []
tables_found = []
for y, words_in_row in sorted_rows:
words_in_row.sort(key=lambda w: w['x0'])
if len(words_in_row) >= self.min_table_cols:
# Check if this could be a table row
x_positions = [w['x0'] for w in words_in_row]
# Check for somewhat regular spacing
if self._has_regular_spacing(x_positions):
current_table_rows.append((y, words_in_row))
else:
# End current table if exists
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
current_table_rows = []
else:
# End current table if exists
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
current_table_rows = []
# Don't forget the last table
if len(current_table_rows) >= self.min_table_rows:
tables_found.append(current_table_rows)
# Convert detected tables to DocumentElements
for table_idx, table_rows in enumerate(tables_found):
if not table_rows:
continue
# Calculate table bounding box
all_words = []
for _, words in table_rows:
all_words.extend(words)
min_x = min(w['x0'] for w in all_words)
min_y = min(w['y0'] for w in all_words)
max_x = max(w['x1'] for w in all_words)
max_y = max(w['y1'] for w in all_words)
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
# Create table cells
cells = []
for row_idx, (y, words) in enumerate(table_rows):
# Group words into columns
columns = self._group_into_columns(words, table_rows)
for col_idx, col_text in enumerate(columns):
if col_text:
cells.append(TableCell(
row=row_idx,
col=col_idx,
content=col_text
))
# Create table data
table_data = TableData(
rows=len(table_rows),
cols=max(len(self._group_into_columns(words, table_rows))
for _, words in table_rows),
cells=cells
)
element = DocumentElement(
element_id=f"table_{page_num}_{counter + table_idx}",
type=ElementType.TABLE,
content=table_data,
bbox=bbox,
confidence=0.8, # Lower confidence for positional detection
metadata={"detection_method": "positional"}
)
tables.append(element)
return tables
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
"""Check if x positions have somewhat regular spacing"""
if len(x_positions) < 3:
return False
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
avg_spacing = sum(spacings) / len(spacings)
# Check if spacings are within tolerance of average
for spacing in spacings:
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
return False
return True
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
"""Group words into columns based on x-position"""
if not words:
return []
# Find common column positions across all rows
all_x_positions = []
for _, row_words in all_rows:
all_x_positions.extend([w['x0'] for w in row_words])
# Cluster x-positions to find columns
column_positions = self._cluster_positions(all_x_positions)
# Assign words to columns
columns = [""] * len(column_positions)
for word in words:
# Find closest column
closest_col = 0
min_dist = float('inf')
for col_idx, col_x in enumerate(column_positions):
dist = abs(word['x0'] - col_x)
if dist < min_dist:
min_dist = dist
closest_col = col_idx
if columns[closest_col]:
columns[closest_col] += " " + word['text']
else:
columns[closest_col] = word['text']
return columns
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
"""Cluster positions to find common columns"""
if not positions:
return []
sorted_pos = sorted(positions)
clusters = [[sorted_pos[0]]]
for pos in sorted_pos[1:]:
# Check if position belongs to current cluster
if pos - clusters[-1][-1] < threshold:
clusters[-1].append(pos)
else:
clusters.append([pos])
# Return average position of each cluster
return [sum(cluster) / len(cluster) for cluster in clusters]
def _extract_images(self,
page: fitz.Page,
page_num: int,
document_id: str,
counter: int,
output_dir: Optional[Path]) -> List[DocumentElement]:
"""Extract images from page"""
elements = []
image_list = page.get_images()
for img_idx, img in enumerate(image_list):
try:
xref = img[0]
# Get image position(s)
img_rects = page.get_image_rects(xref)
if not img_rects:
continue
rect = img_rects[0] # Use first occurrence
bbox = BoundingBox(
x0=rect.x0,
y0=rect.y0,
x1=rect.x1,
y1=rect.y1
)
# Extract image data
pix = fitz.Pixmap(page.parent, xref)
image_data = {
"width": pix.width,
"height": pix.height,
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
"xref": xref
}
# Save image if output directory provided
if output_dir:
output_dir.mkdir(parents=True, exist_ok=True)
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
image_path = output_dir / image_filename
pix.save(str(image_path))
image_data["saved_path"] = str(image_path)
logger.debug(f"Saved image to {image_path}")
element = DocumentElement(
element_id=f"image_{page_num}_{counter + img_idx}",
type=ElementType.IMAGE,
content=image_data,
bbox=bbox,
confidence=1.0,
metadata={
"image_index": img_idx,
"xref": xref
}
)
elements.append(element)
pix = None # Free memory
except Exception as e:
logger.error(f"Error extracting image {img_idx}: {e}")
return elements

View File

@@ -0,0 +1,397 @@
"""
Document Type Detector Service
Intelligently determines the optimal processing track for documents based on
file type, content analysis, and editability checks.
"""
import os
import logging
import magic
import fitz # PyMuPDF
from pathlib import Path
from typing import Dict, Optional, Tuple, List
from enum import Enum
import statistics
logger = logging.getLogger(__name__)
class DocumentType(str, Enum):
"""Document type classification"""
PDF_EDITABLE = "pdf_editable" # PDF with extractable text
PDF_SCANNED = "pdf_scanned" # PDF with images/scanned content
PDF_MIXED = "pdf_mixed" # PDF with both text and scanned pages
IMAGE = "image" # Image files (PNG, JPG, etc.)
OFFICE_WORD = "office_word" # Word documents
OFFICE_EXCEL = "office_excel" # Excel spreadsheets
OFFICE_POWERPOINT = "office_ppt" # PowerPoint presentations
TEXT = "text" # Plain text files
UNKNOWN = "unknown" # Unknown format
class ProcessingTrackRecommendation:
"""Processing track recommendation with confidence"""
def __init__(self,
track: str,
confidence: float,
reason: str,
document_type: DocumentType,
metadata: Optional[Dict] = None):
self.track = track # "ocr" or "direct"
self.confidence = confidence # 0.0 to 1.0
self.reason = reason
self.document_type = document_type
self.metadata = metadata or {}
def to_dict(self) -> Dict:
return {
"recommended_track": self.track,
"confidence": self.confidence,
"reason": self.reason,
"document_type": self.document_type.value,
"metadata": self.metadata
}
class DocumentTypeDetector:
"""
Service for detecting document types and recommending processing tracks.
This service analyzes documents to determine:
1. The document type (PDF, image, Office, etc.)
2. Whether the document contains extractable text
3. The recommended processing track (OCR vs Direct)
"""
# MIME type mappings
IMAGE_MIMES = {
'image/png', 'image/jpeg', 'image/jpg', 'image/gif',
'image/bmp', 'image/tiff', 'image/webp'
}
OFFICE_MIMES = {
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': DocumentType.OFFICE_WORD,
'application/msword': DocumentType.OFFICE_WORD,
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': DocumentType.OFFICE_EXCEL,
'application/vnd.ms-excel': DocumentType.OFFICE_EXCEL,
'application/vnd.openxmlformats-officedocument.presentationml.presentation': DocumentType.OFFICE_POWERPOINT,
'application/vnd.ms-powerpoint': DocumentType.OFFICE_POWERPOINT,
}
def __init__(self,
min_text_length: int = 100,
sample_pages: int = 3,
text_coverage_threshold: float = 0.9):
"""
Initialize the detector.
Args:
min_text_length: Minimum text length to consider a page as having extractable text
sample_pages: Number of pages to sample for PDF analysis
text_coverage_threshold: Percentage of pages with text to classify as editable
"""
self.min_text_length = min_text_length
self.sample_pages = sample_pages
self.text_coverage_threshold = text_coverage_threshold
def detect(self, file_path: Path) -> ProcessingTrackRecommendation:
"""
Detect document type and recommend processing track.
Args:
file_path: Path to the document file
Returns:
ProcessingTrackRecommendation with track selection and metadata
"""
if not file_path.exists():
logger.error(f"File not found: {file_path}")
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.5,
reason="File not found, defaulting to OCR",
document_type=DocumentType.UNKNOWN
)
try:
# Detect MIME type
mime_type = magic.from_file(str(file_path), mime=True)
logger.info(f"Detected MIME type: {mime_type} for {file_path.name}")
# Route based on file type
if mime_type == 'application/pdf':
return self._analyze_pdf(file_path)
elif mime_type in self.IMAGE_MIMES:
return self._analyze_image(file_path, mime_type)
elif mime_type in self.OFFICE_MIMES:
return self._analyze_office(file_path, mime_type)
elif mime_type.startswith('text/'):
return self._analyze_text(file_path, mime_type)
else:
logger.warning(f"Unknown MIME type: {mime_type}")
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.5,
reason=f"Unknown file type ({mime_type}), defaulting to OCR",
document_type=DocumentType.UNKNOWN
)
except Exception as e:
logger.error(f"Error detecting document type: {e}")
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.3,
reason=f"Error during detection: {str(e)}",
document_type=DocumentType.UNKNOWN
)
def _analyze_pdf(self, file_path: Path) -> ProcessingTrackRecommendation:
"""
Analyze PDF to determine if it's editable or scanned.
Args:
file_path: Path to PDF file
Returns:
Processing track recommendation
"""
try:
doc = fitz.open(str(file_path))
total_pages = len(doc)
# Sample pages for analysis
pages_to_check = min(self.sample_pages, total_pages)
text_pages = []
page_details = []
for page_num in range(pages_to_check):
page = doc[page_num]
# Extract text
text = page.get_text()
text_length = len(text.strip())
# Check for images
images = page.get_images()
image_count = len(images)
# Calculate page area covered by images
page_rect = page.rect
page_area = page_rect.width * page_rect.height
image_area = 0
for img in images:
try:
# Get image rectangles
xref = img[0]
img_rects = page.get_image_rects(xref)
for rect in img_rects:
image_area += rect.width * rect.height
except:
pass
image_coverage = image_area / page_area if page_area > 0 else 0
# Determine if page has meaningful text
has_text = text_length >= self.min_text_length
text_pages.append(has_text)
page_details.append({
"page": page_num + 1,
"text_length": text_length,
"has_text": has_text,
"image_count": image_count,
"image_coverage": image_coverage
})
logger.debug(f"Page {page_num + 1}: text_length={text_length}, "
f"images={image_count}, image_coverage={image_coverage:.2%}")
doc.close()
# Calculate text coverage
text_coverage = sum(text_pages) / len(text_pages) if text_pages else 0
# Determine document type and track
metadata = {
"total_pages": total_pages,
"sampled_pages": pages_to_check,
"text_coverage": text_coverage,
"page_details": page_details
}
if text_coverage >= self.text_coverage_threshold:
# Mostly text-based PDF
return ProcessingTrackRecommendation(
track="direct",
confidence=0.95,
reason=f"PDF has extractable text on {text_coverage:.0%} of sampled pages",
document_type=DocumentType.PDF_EDITABLE,
metadata=metadata
)
elif text_coverage <= 0.1:
# Mostly scanned/image PDF
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.95,
reason=f"PDF appears to be scanned (only {text_coverage:.0%} pages have text)",
document_type=DocumentType.PDF_SCANNED,
metadata=metadata
)
else:
# Mixed content
# For mixed PDFs, we could implement page-level track selection in the future
# For now, use OCR to ensure we don't miss scanned content
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.7,
reason=f"PDF has mixed content ({text_coverage:.0%} text pages), using OCR for completeness",
document_type=DocumentType.PDF_MIXED,
metadata=metadata
)
except Exception as e:
logger.error(f"Error analyzing PDF: {e}")
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.5,
reason=f"Error analyzing PDF: {str(e)}",
document_type=DocumentType.PDF_SCANNED
)
def _analyze_image(self, file_path: Path, mime_type: str) -> ProcessingTrackRecommendation:
"""
Analyze image file.
Images always require OCR processing.
"""
file_size = file_path.stat().st_size
metadata = {
"mime_type": mime_type,
"file_size": file_size,
"file_extension": file_path.suffix
}
return ProcessingTrackRecommendation(
track="ocr",
confidence=1.0,
reason="Image files require OCR processing",
document_type=DocumentType.IMAGE,
metadata=metadata
)
def _analyze_office(self, file_path: Path, mime_type: str) -> ProcessingTrackRecommendation:
"""
Analyze Office document.
Currently routes all Office documents to OCR track.
Future enhancement: implement direct extraction for Office files.
"""
document_type = self.OFFICE_MIMES.get(mime_type, DocumentType.UNKNOWN)
file_size = file_path.stat().st_size
metadata = {
"mime_type": mime_type,
"file_size": file_size,
"file_extension": file_path.suffix
}
# TODO: In future, we could implement direct extraction for Office files
# using python-docx, openpyxl, python-pptx
return ProcessingTrackRecommendation(
track="ocr",
confidence=0.9,
reason="Office documents currently processed via OCR (direct extraction planned)",
document_type=document_type,
metadata=metadata
)
def _analyze_text(self, file_path: Path, mime_type: str) -> ProcessingTrackRecommendation:
"""
Analyze text file.
Plain text files can be directly processed without OCR.
"""
file_size = file_path.stat().st_size
metadata = {
"mime_type": mime_type,
"file_size": file_size,
"file_extension": file_path.suffix
}
return ProcessingTrackRecommendation(
track="direct",
confidence=1.0,
reason="Plain text files can be directly processed",
document_type=DocumentType.TEXT,
metadata=metadata
)
def analyze_batch(self, file_paths: List[Path]) -> Dict[str, ProcessingTrackRecommendation]:
"""
Analyze multiple files and return recommendations.
Args:
file_paths: List of file paths to analyze
Returns:
Dictionary mapping file paths to recommendations
"""
results = {}
for file_path in file_paths:
try:
recommendation = self.detect(file_path)
results[str(file_path)] = recommendation
logger.info(f"Analyzed {file_path.name}: {recommendation.track} "
f"(confidence: {recommendation.confidence:.2f})")
except Exception as e:
logger.error(f"Error analyzing {file_path}: {e}")
results[str(file_path)] = ProcessingTrackRecommendation(
track="ocr",
confidence=0.3,
reason=f"Error during analysis: {str(e)}",
document_type=DocumentType.UNKNOWN
)
return results
def get_statistics(self, recommendations: Dict[str, ProcessingTrackRecommendation]) -> Dict:
"""
Calculate statistics from batch analysis results.
Args:
recommendations: Dictionary of file recommendations
Returns:
Statistics dictionary
"""
if not recommendations:
return {"total": 0}
tracks = [r.track for r in recommendations.values()]
confidences = [r.confidence for r in recommendations.values()]
doc_types = [r.document_type.value for r in recommendations.values()]
stats = {
"total": len(recommendations),
"by_track": {
"ocr": tracks.count("ocr"),
"direct": tracks.count("direct")
},
"by_document_type": {},
"confidence": {
"mean": statistics.mean(confidences),
"median": statistics.median(confidences),
"min": min(confidences),
"max": max(confidences)
}
}
# Count by document type
for doc_type in set(doc_types):
stats["by_document_type"][doc_type] = doc_types.count(doc_type)
return stats