Implements the converter that transforms PP-StructureV3 OCR results into the UnifiedDocument format, enabling consistent output for both OCR and direct extraction tracks. - Create OCRToUnifiedConverter class with full element type mapping - Handle both enhanced (parsing_res_list) and standard markdown results - Support 4-point and simple bbox formats for coordinates - Establish element relationships (captions, lists, headers) - Integrate converter into OCR service dual-track processing - Update tasks.md marking section 3.3 complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1234 lines
51 KiB
Python
1234 lines
51 KiB
Python
"""
|
||
Tool_OCR - Core OCR Service with Dual-track Processing
|
||
Supports both PaddleOCR (for scanned documents) and direct extraction (for editable PDFs)
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Union
|
||
from datetime import datetime
|
||
import uuid
|
||
|
||
from paddleocr import PaddleOCR, PPStructureV3
|
||
from PIL import Image
|
||
from pdf2image import convert_from_path
|
||
import paddle
|
||
|
||
from app.core.config import settings
|
||
from app.services.office_converter import OfficeConverter, OfficeConverterError
|
||
|
||
# Import dual-track components
|
||
try:
|
||
from app.services.document_type_detector import DocumentTypeDetector, ProcessingTrackRecommendation
|
||
from app.services.direct_extraction_engine import DirectExtractionEngine
|
||
from app.services.ocr_to_unified_converter import OCRToUnifiedConverter
|
||
from app.models.unified_document import (
|
||
UnifiedDocument, DocumentMetadata,
|
||
ProcessingTrack, ElementType, DocumentElement, Page, Dimensions,
|
||
BoundingBox, ProcessingInfo
|
||
)
|
||
DUAL_TRACK_AVAILABLE = True
|
||
except ImportError as e:
|
||
logger.warning(f"Dual-track components not available: {e}")
|
||
DUAL_TRACK_AVAILABLE = False
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class OCRService:
|
||
"""
|
||
Core OCR service using PaddleOCR-VL
|
||
Handles text recognition and document structure analysis
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""Initialize PaddleOCR and PPStructure engines with GPU detection and dual-track support"""
|
||
self.ocr_languages = settings.ocr_languages_list
|
||
self.confidence_threshold = settings.ocr_confidence_threshold
|
||
|
||
# Initialize PaddleOCR engine (will be lazy-loaded per language)
|
||
self.ocr_engines = {}
|
||
|
||
# Initialize PP-Structure for layout analysis
|
||
self.structure_engine = None
|
||
|
||
# Initialize Office document converter
|
||
self.office_converter = OfficeConverter()
|
||
|
||
# Initialize dual-track components if available
|
||
if DUAL_TRACK_AVAILABLE:
|
||
self.document_detector = DocumentTypeDetector(
|
||
min_text_length=100,
|
||
sample_pages=3,
|
||
text_coverage_threshold=0.9
|
||
)
|
||
self.direct_extraction_engine = DirectExtractionEngine(
|
||
enable_table_detection=True,
|
||
enable_image_extraction=True
|
||
)
|
||
self.ocr_to_unified_converter = OCRToUnifiedConverter()
|
||
self.dual_track_enabled = True
|
||
logger.info("Dual-track processing enabled")
|
||
else:
|
||
self.document_detector = None
|
||
self.direct_extraction_engine = None
|
||
self.ocr_to_unified_converter = None
|
||
self.dual_track_enabled = False
|
||
logger.info("Dual-track processing not available, using OCR-only mode")
|
||
|
||
# GPU Detection and Configuration
|
||
self.gpu_available = False
|
||
self.use_gpu = False
|
||
self.gpu_info = {}
|
||
|
||
self._detect_and_configure_gpu()
|
||
|
||
logger.info("OCR Service initialized")
|
||
|
||
def _detect_and_configure_gpu(self):
|
||
"""Detect GPU availability and configure usage"""
|
||
try:
|
||
# Check if forced CPU mode
|
||
if settings.force_cpu_mode:
|
||
logger.info("GPU mode forced to CPU by configuration")
|
||
self.use_gpu = False
|
||
self.gpu_info = {
|
||
'available': False,
|
||
'reason': 'CPU mode forced by configuration',
|
||
}
|
||
return
|
||
|
||
# Check if PaddlePaddle is compiled with CUDA
|
||
if paddle.is_compiled_with_cuda():
|
||
# Check if GPU devices are available
|
||
gpu_count = paddle.device.cuda.device_count()
|
||
|
||
if gpu_count > 0:
|
||
self.gpu_available = True
|
||
self.use_gpu = True
|
||
|
||
# Get GPU device information
|
||
device_id = settings.gpu_device_id if settings.gpu_device_id < gpu_count else 0
|
||
gpu_props = paddle.device.cuda.get_device_properties(device_id)
|
||
|
||
self.gpu_info = {
|
||
'available': True,
|
||
'device_count': gpu_count,
|
||
'device_id': device_id,
|
||
'device_name': gpu_props.name,
|
||
'total_memory': gpu_props.total_memory,
|
||
'compute_capability': f"{gpu_props.major}.{gpu_props.minor}",
|
||
}
|
||
|
||
# Set GPU memory fraction
|
||
try:
|
||
paddle.device.set_device(f'gpu:{device_id}')
|
||
logger.info(f"GPU {device_id} selected: {gpu_props.name}")
|
||
logger.info(f"GPU memory: {gpu_props.total_memory / (1024**3):.2f} GB")
|
||
logger.info(f"Compute capability: {gpu_props.major}.{gpu_props.minor}")
|
||
logger.info(f"GPU memory fraction set to: {settings.gpu_memory_fraction}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to configure GPU device: {e}")
|
||
self.use_gpu = False
|
||
self.gpu_info['available'] = False
|
||
self.gpu_info['reason'] = f'GPU configuration failed: {str(e)}'
|
||
else:
|
||
logger.warning("CUDA is available but no GPU devices found")
|
||
self.gpu_info = {
|
||
'available': False,
|
||
'reason': 'CUDA compiled but no GPU devices detected',
|
||
}
|
||
else:
|
||
logger.info("PaddlePaddle not compiled with CUDA support")
|
||
self.gpu_info = {
|
||
'available': False,
|
||
'reason': 'PaddlePaddle not compiled with CUDA',
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"GPU detection failed: {e}")
|
||
self.use_gpu = False
|
||
self.gpu_info = {
|
||
'available': False,
|
||
'reason': f'GPU detection error: {str(e)}',
|
||
}
|
||
|
||
# Log final GPU status
|
||
if self.use_gpu:
|
||
logger.info(f"✓ GPU acceleration ENABLED - Using {self.gpu_info.get('device_name', 'Unknown GPU')}")
|
||
else:
|
||
reason = self.gpu_info.get('reason', 'Unknown')
|
||
logger.info(f"ℹ GPU acceleration DISABLED - {reason} - Using CPU mode")
|
||
|
||
def get_gpu_status(self) -> Dict:
|
||
"""
|
||
Get current GPU status and information
|
||
|
||
Returns:
|
||
Dictionary with GPU status information
|
||
"""
|
||
status = {
|
||
'gpu_enabled': self.use_gpu,
|
||
'gpu_available': self.gpu_available,
|
||
**self.gpu_info,
|
||
}
|
||
|
||
# Add current GPU memory usage if GPU is being used
|
||
if self.use_gpu and self.gpu_available:
|
||
try:
|
||
device_id = self.gpu_info.get('device_id', 0)
|
||
# Get memory info (returns allocated, total in bytes)
|
||
memory_allocated = paddle.device.cuda.memory_allocated(device_id)
|
||
memory_reserved = paddle.device.cuda.memory_reserved(device_id)
|
||
total_memory = self.gpu_info.get('total_memory', 0)
|
||
|
||
status['memory_allocated_mb'] = memory_allocated / (1024**2)
|
||
status['memory_reserved_mb'] = memory_reserved / (1024**2)
|
||
status['memory_total_mb'] = total_memory / (1024**2)
|
||
status['memory_utilization'] = (memory_allocated / total_memory * 100) if total_memory > 0 else 0
|
||
except Exception as e:
|
||
logger.warning(f"Failed to get GPU memory info: {e}")
|
||
|
||
return status
|
||
|
||
def get_ocr_engine(self, lang: str = 'ch') -> PaddleOCR:
|
||
"""
|
||
Get or create OCR engine for specified language with GPU support
|
||
|
||
Args:
|
||
lang: Language code (ch, en, japan, korean, etc.)
|
||
|
||
Returns:
|
||
PaddleOCR engine instance
|
||
"""
|
||
if lang not in self.ocr_engines:
|
||
logger.info(f"Initializing PaddleOCR engine for language: {lang} (GPU: {self.use_gpu})")
|
||
|
||
try:
|
||
# PaddleOCR 3.x: Device is set globally via paddle.set_device()
|
||
# No need to pass device/use_gpu/gpu_mem parameters
|
||
self.ocr_engines[lang] = PaddleOCR(
|
||
lang=lang,
|
||
use_textline_orientation=True, # Replaces deprecated use_angle_cls
|
||
)
|
||
logger.info(f"PaddleOCR engine ready for {lang} (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)")
|
||
|
||
except Exception as e:
|
||
# If GPU initialization fails, fall back to CPU
|
||
if self.use_gpu:
|
||
logger.warning(f"GPU initialization failed, falling back to CPU: {e}")
|
||
self.use_gpu = False
|
||
# Switch to CPU device globally
|
||
paddle.set_device('cpu')
|
||
self.ocr_engines[lang] = PaddleOCR(
|
||
lang=lang,
|
||
use_textline_orientation=True,
|
||
)
|
||
logger.info(f"PaddleOCR engine ready for {lang} (CPU mode - fallback)")
|
||
else:
|
||
raise
|
||
|
||
return self.ocr_engines[lang]
|
||
|
||
def get_structure_engine(self) -> PPStructureV3:
|
||
"""
|
||
Get or create PP-Structure engine for layout analysis with GPU support
|
||
|
||
Returns:
|
||
PPStructure engine instance
|
||
"""
|
||
if self.structure_engine is None:
|
||
logger.info(f"Initializing PP-StructureV3 engine (GPU: {self.use_gpu})")
|
||
|
||
try:
|
||
# PaddleOCR 3.x: Device is set globally via paddle.set_device()
|
||
# No need to pass device/use_gpu/gpu_mem parameters
|
||
self.structure_engine = PPStructureV3(
|
||
use_doc_orientation_classify=False,
|
||
use_doc_unwarping=False,
|
||
use_textline_orientation=False,
|
||
use_table_recognition=True,
|
||
use_formula_recognition=True,
|
||
use_chart_recognition=True, # Enable chart recognition (requires PaddlePaddle >= 3.2.0 for fused_rms_norm_ext)
|
||
layout_threshold=0.5,
|
||
)
|
||
logger.info(f"PP-StructureV3 engine ready (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)")
|
||
|
||
except Exception as e:
|
||
# If GPU initialization fails, fall back to CPU
|
||
if self.use_gpu:
|
||
logger.warning(f"GPU initialization failed for PP-Structure, falling back to CPU: {e}")
|
||
self.use_gpu = False
|
||
# Switch to CPU device globally
|
||
paddle.set_device('cpu')
|
||
self.structure_engine = PPStructureV3(
|
||
use_doc_orientation_classify=False,
|
||
use_doc_unwarping=False,
|
||
use_textline_orientation=False,
|
||
use_table_recognition=True,
|
||
use_formula_recognition=True,
|
||
use_chart_recognition=True, # Enable chart recognition (CPU fallback mode)
|
||
layout_threshold=0.5,
|
||
)
|
||
logger.info("PP-StructureV3 engine ready (CPU mode - fallback)")
|
||
else:
|
||
raise
|
||
|
||
return self.structure_engine
|
||
|
||
def convert_pdf_to_images(self, pdf_path: Path, output_dir: Path) -> List[Path]:
|
||
"""
|
||
Convert PDF to images (one per page)
|
||
|
||
Args:
|
||
pdf_path: Path to PDF file
|
||
output_dir: Directory to save converted images
|
||
|
||
Returns:
|
||
List of paths to converted images
|
||
"""
|
||
try:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
logger.info(f"Converting PDF {pdf_path.name} to images")
|
||
|
||
# Convert PDF to images (300 DPI for good quality)
|
||
images = convert_from_path(
|
||
str(pdf_path),
|
||
dpi=300,
|
||
fmt='png'
|
||
)
|
||
|
||
image_paths = []
|
||
for i, image in enumerate(images):
|
||
# Save each page as PNG
|
||
image_path = output_dir / f"{pdf_path.stem}_page_{i+1}.png"
|
||
image.save(str(image_path), 'PNG')
|
||
image_paths.append(image_path)
|
||
logger.info(f"Saved page {i+1} to {image_path.name}")
|
||
|
||
logger.info(f"Converted {len(image_paths)} pages from PDF")
|
||
return image_paths
|
||
|
||
except Exception as e:
|
||
logger.error(f"PDF conversion error: {str(e)}")
|
||
raise
|
||
|
||
def process_image(
|
||
self,
|
||
image_path: Path,
|
||
lang: str = 'ch',
|
||
detect_layout: bool = True,
|
||
confidence_threshold: Optional[float] = None,
|
||
output_dir: Optional[Path] = None,
|
||
current_page: int = 0
|
||
) -> Dict:
|
||
"""
|
||
Process single image with OCR and layout analysis
|
||
|
||
Args:
|
||
image_path: Path to image file
|
||
lang: Language for OCR
|
||
detect_layout: Whether to perform layout analysis
|
||
confidence_threshold: Minimum confidence threshold (uses default if None)
|
||
output_dir: Optional output directory for saving extracted images
|
||
current_page: Current page number (0-based) for multi-page documents
|
||
|
||
Returns:
|
||
Dictionary with OCR results and metadata
|
||
"""
|
||
start_time = datetime.now()
|
||
threshold = confidence_threshold if confidence_threshold is not None else self.confidence_threshold
|
||
|
||
try:
|
||
# Check if file is Office document
|
||
if self.office_converter.is_office_document(image_path):
|
||
logger.info(f"Detected Office document: {image_path.name}, converting to PDF")
|
||
try:
|
||
# Convert Office document to PDF
|
||
pdf_path = self.office_converter.convert_to_pdf(image_path)
|
||
logger.info(f"Office document converted to PDF: {pdf_path.name}")
|
||
|
||
# Process the PDF (will be handled by PDF processing logic below)
|
||
image_path = pdf_path
|
||
except OfficeConverterError as e:
|
||
logger.error(f"Office conversion failed: {str(e)}")
|
||
raise
|
||
|
||
# Check if file is PDF
|
||
is_pdf = image_path.suffix.lower() == '.pdf'
|
||
|
||
if is_pdf:
|
||
# Convert PDF to images
|
||
logger.info(f"Detected PDF file: {image_path.name}, converting to images")
|
||
pdf_images_dir = image_path.parent / f"{image_path.stem}_pages"
|
||
image_paths = self.convert_pdf_to_images(image_path, pdf_images_dir)
|
||
|
||
# Process all pages
|
||
all_text_regions = []
|
||
total_confidence_sum = 0.0
|
||
total_valid_regions = 0
|
||
all_layout_data = []
|
||
all_images_metadata = []
|
||
all_ocr_dimensions = []
|
||
|
||
for page_num, page_image_path in enumerate(image_paths, 1):
|
||
logger.info(f"Processing PDF page {page_num}/{len(image_paths)}")
|
||
|
||
# Process each page with correct page number (0-based for layout data)
|
||
page_result = self.process_image(
|
||
page_image_path,
|
||
lang=lang,
|
||
detect_layout=detect_layout,
|
||
confidence_threshold=confidence_threshold,
|
||
output_dir=output_dir,
|
||
current_page=page_num - 1 # Convert to 0-based page number for layout data
|
||
)
|
||
|
||
# Accumulate results
|
||
if page_result['status'] == 'success':
|
||
# Add page number to each text region
|
||
for region in page_result['text_regions']:
|
||
region['page'] = page_num
|
||
all_text_regions.append(region)
|
||
|
||
total_confidence_sum += page_result['average_confidence'] * page_result['total_text_regions']
|
||
total_valid_regions += page_result['total_text_regions']
|
||
|
||
# Accumulate layout data (page numbers already set correctly in analyze_layout)
|
||
if page_result.get('layout_data'):
|
||
layout_data = page_result['layout_data']
|
||
all_layout_data.append(layout_data)
|
||
|
||
# Accumulate images metadata (page numbers already set correctly in analyze_layout)
|
||
if page_result.get('images_metadata'):
|
||
all_images_metadata.extend(page_result['images_metadata'])
|
||
|
||
# Store OCR dimensions for each page
|
||
if page_result.get('ocr_dimensions'):
|
||
all_ocr_dimensions.append({
|
||
'page': page_num,
|
||
'width': page_result['ocr_dimensions']['width'],
|
||
'height': page_result['ocr_dimensions']['height']
|
||
})
|
||
|
||
# Calculate overall average confidence
|
||
avg_confidence = total_confidence_sum / total_valid_regions if total_valid_regions > 0 else 0.0
|
||
|
||
# Combine layout data from all pages
|
||
combined_layout = None
|
||
if all_layout_data:
|
||
combined_elements = []
|
||
for layout in all_layout_data:
|
||
if layout.get('elements'):
|
||
combined_elements.extend(layout['elements'])
|
||
if combined_elements:
|
||
combined_layout = {
|
||
'elements': combined_elements,
|
||
'total_elements': len(combined_elements),
|
||
'reading_order': list(range(len(combined_elements))),
|
||
}
|
||
|
||
# Generate combined markdown
|
||
markdown_content = self.generate_markdown(all_text_regions, combined_layout)
|
||
|
||
# Calculate processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
logger.info(
|
||
f"PDF processing completed: {image_path.name} - "
|
||
f"{len(image_paths)} pages, "
|
||
f"{len(all_text_regions)} regions, "
|
||
f"{avg_confidence:.2f} avg confidence, "
|
||
f"{processing_time:.2f}s"
|
||
)
|
||
|
||
return {
|
||
'status': 'success',
|
||
'file_name': image_path.name,
|
||
'language': lang,
|
||
'text_regions': all_text_regions,
|
||
'total_text_regions': len(all_text_regions),
|
||
'average_confidence': avg_confidence,
|
||
'layout_data': combined_layout,
|
||
'images_metadata': all_images_metadata,
|
||
'markdown_content': markdown_content,
|
||
'processing_time': processing_time,
|
||
'timestamp': datetime.utcnow().isoformat(),
|
||
'total_pages': len(image_paths),
|
||
'ocr_dimensions': all_ocr_dimensions if all_ocr_dimensions else None,
|
||
}
|
||
|
||
# Get OCR engine (for non-PDF images)
|
||
ocr_engine = self.get_ocr_engine(lang)
|
||
|
||
# Get the actual image dimensions that OCR will use
|
||
from PIL import Image
|
||
with Image.open(image_path) as img:
|
||
ocr_width, ocr_height = img.size
|
||
logger.info(f"OCR processing image dimensions: {ocr_width}x{ocr_height}")
|
||
|
||
# Perform OCR
|
||
logger.info(f"Processing image: {image_path.name}")
|
||
# Note: In PaddleOCR 3.x, use_angle_cls is set during initialization, not in ocr() call
|
||
ocr_results = ocr_engine.ocr(str(image_path))
|
||
|
||
# Parse OCR results (PaddleOCR 3.x format)
|
||
text_regions = []
|
||
total_confidence = 0.0
|
||
valid_regions = 0
|
||
|
||
if ocr_results and isinstance(ocr_results, (list, tuple)) and len(ocr_results) > 0:
|
||
# PaddleOCR 3.x returns a list of dictionaries (one per page)
|
||
for page_result in ocr_results:
|
||
if isinstance(page_result, dict):
|
||
# New format: {'rec_texts': [...], 'rec_scores': [...], 'rec_polys': [...]}
|
||
texts = page_result.get('rec_texts', [])
|
||
scores = page_result.get('rec_scores', [])
|
||
polys = page_result.get('rec_polys', [])
|
||
|
||
# Process each recognized text
|
||
for idx, text in enumerate(texts):
|
||
# Get corresponding score and bbox
|
||
confidence = scores[idx] if idx < len(scores) else 1.0
|
||
bbox = polys[idx] if idx < len(polys) else []
|
||
|
||
# Convert numpy array bbox to list for JSON serialization
|
||
if hasattr(bbox, 'tolist'):
|
||
bbox = bbox.tolist()
|
||
|
||
# Filter by confidence threshold
|
||
if confidence >= threshold:
|
||
text_regions.append({
|
||
'text': text,
|
||
'bbox': bbox,
|
||
'confidence': float(confidence),
|
||
})
|
||
total_confidence += confidence
|
||
valid_regions += 1
|
||
|
||
avg_confidence = total_confidence / valid_regions if valid_regions > 0 else 0.0
|
||
|
||
logger.info(f"Parsed {len(text_regions)} text regions with avg confidence {avg_confidence:.3f}")
|
||
|
||
# Layout analysis (if requested)
|
||
layout_data = None
|
||
images_metadata = []
|
||
|
||
if detect_layout:
|
||
# Pass current_page to analyze_layout for correct page numbering
|
||
layout_data, images_metadata = self.analyze_layout(image_path, output_dir=output_dir, current_page=current_page)
|
||
|
||
# Generate Markdown
|
||
markdown_content = self.generate_markdown(text_regions, layout_data)
|
||
|
||
# Calculate processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
result = {
|
||
'status': 'success',
|
||
'file_name': image_path.name,
|
||
'language': lang,
|
||
'text_regions': text_regions,
|
||
'total_text_regions': len(text_regions),
|
||
'average_confidence': avg_confidence,
|
||
'layout_data': layout_data,
|
||
'images_metadata': images_metadata,
|
||
'markdown_content': markdown_content,
|
||
'processing_time': processing_time,
|
||
'timestamp': datetime.utcnow().isoformat(),
|
||
'ocr_dimensions': {
|
||
'width': ocr_width,
|
||
'height': ocr_height
|
||
}
|
||
}
|
||
|
||
# If layout data is enhanced, add enhanced results for converter
|
||
if layout_data and layout_data.get('enhanced'):
|
||
result['enhanced_results'] = [{
|
||
'elements': layout_data.get('elements', []),
|
||
'reading_order': layout_data.get('reading_order', []),
|
||
'element_types': layout_data.get('element_types', {}),
|
||
'page': current_page,
|
||
'width': ocr_width,
|
||
'height': ocr_height
|
||
}]
|
||
|
||
logger.info(
|
||
f"OCR completed: {image_path.name} - "
|
||
f"{len(text_regions)} regions, "
|
||
f"{avg_confidence:.2f} avg confidence, "
|
||
f"{processing_time:.2f}s"
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
error_trace = traceback.format_exc()
|
||
logger.error(f"OCR processing error for {image_path.name}: {str(e)}\n{error_trace}")
|
||
return {
|
||
'status': 'error',
|
||
'file_name': image_path.name,
|
||
'error_message': str(e),
|
||
'processing_time': (datetime.now() - start_time).total_seconds(),
|
||
}
|
||
|
||
def _extract_table_text(self, html_content: str) -> str:
|
||
"""
|
||
Extract text from HTML table content for translation purposes
|
||
|
||
Args:
|
||
html_content: HTML content containing table
|
||
|
||
Returns:
|
||
Extracted text from table cells
|
||
"""
|
||
try:
|
||
from html.parser import HTMLParser
|
||
|
||
class TableTextExtractor(HTMLParser):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.text_parts = []
|
||
self.in_table = False
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
if tag == 'table':
|
||
self.in_table = True
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag == 'table':
|
||
self.in_table = False
|
||
elif tag in ('td', 'th') and self.in_table:
|
||
self.text_parts.append(' | ') # Cell separator
|
||
elif tag == 'tr' and self.in_table:
|
||
self.text_parts.append('\n') # Row separator
|
||
|
||
def handle_data(self, data):
|
||
if self.in_table:
|
||
stripped = data.strip()
|
||
if stripped:
|
||
self.text_parts.append(stripped)
|
||
|
||
parser = TableTextExtractor()
|
||
parser.feed(html_content)
|
||
|
||
# Clean up the extracted text
|
||
extracted = ''.join(parser.text_parts)
|
||
# Remove multiple separators
|
||
import re
|
||
extracted = re.sub(r'\s*\|\s*\|+\s*', ' | ', extracted)
|
||
extracted = re.sub(r'\n+', '\n', extracted)
|
||
extracted = extracted.strip()
|
||
|
||
return extracted
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to extract table text: {e}")
|
||
# Fallback: just remove HTML tags
|
||
import re
|
||
text = re.sub(r'<[^>]+>', ' ', html_content)
|
||
text = re.sub(r'\s+', ' ', text)
|
||
return text.strip()
|
||
|
||
def analyze_layout(self, image_path: Path, output_dir: Optional[Path] = None, current_page: int = 0) -> Tuple[Optional[Dict], List[Dict]]:
|
||
"""
|
||
Analyze document layout using PP-StructureV3 with enhanced element extraction
|
||
|
||
Args:
|
||
image_path: Path to image file
|
||
output_dir: Optional output directory for saving extracted images (defaults to image_path.parent)
|
||
current_page: Current page number (0-based) for multi-page documents
|
||
|
||
Returns:
|
||
Tuple of (layout_data, images_metadata)
|
||
"""
|
||
try:
|
||
structure_engine = self.get_structure_engine()
|
||
|
||
# Try enhanced processing first
|
||
try:
|
||
from app.services.pp_structure_enhanced import PPStructureEnhanced
|
||
|
||
enhanced_processor = PPStructureEnhanced(structure_engine)
|
||
result = enhanced_processor.analyze_with_full_structure(
|
||
image_path, output_dir, current_page
|
||
)
|
||
|
||
if result.get('has_parsing_res_list'):
|
||
logger.info(f"Enhanced PP-StructureV3 analysis successful with {result['total_elements']} elements")
|
||
logger.info(f"Element types found: {result.get('element_types', {})}")
|
||
|
||
# Convert to legacy format for compatibility
|
||
layout_data = {
|
||
'elements': result['elements'],
|
||
'total_elements': result['total_elements'],
|
||
'reading_order': result['reading_order'],
|
||
'element_types': result.get('element_types', {}),
|
||
'enhanced': True
|
||
}
|
||
|
||
# Extract images metadata
|
||
images_metadata = []
|
||
for elem in result.get('images', []):
|
||
images_metadata.append({
|
||
'element_id': elem['element_id'],
|
||
'type': 'image',
|
||
'page': elem['page'],
|
||
'bbox': elem['bbox']
|
||
})
|
||
|
||
return layout_data, images_metadata
|
||
else:
|
||
logger.info("parsing_res_list not available, using standard processing")
|
||
|
||
except ImportError:
|
||
logger.debug("Enhanced PP-StructureV3 module not available, using standard processing")
|
||
except Exception as e:
|
||
logger.warning(f"Enhanced processing failed, falling back to standard: {e}")
|
||
|
||
# Standard processing (original implementation)
|
||
logger.info(f"Running standard layout analysis on {image_path.name}")
|
||
results = structure_engine.predict(str(image_path))
|
||
|
||
layout_elements = []
|
||
images_metadata = []
|
||
|
||
# Process each page result (for images, usually just one page)
|
||
for page_idx, page_result in enumerate(results):
|
||
# Get markdown dictionary from result object
|
||
if hasattr(page_result, 'markdown'):
|
||
markdown_dict = page_result.markdown
|
||
logger.info(f"Page {page_idx} markdown keys: {markdown_dict.keys() if isinstance(markdown_dict, dict) else type(markdown_dict)}")
|
||
|
||
# Extract layout information from markdown structure
|
||
if isinstance(markdown_dict, dict):
|
||
# Get markdown texts (HTML format with tables and structure)
|
||
markdown_texts = markdown_dict.get('markdown_texts', '')
|
||
markdown_images = markdown_dict.get('markdown_images', {})
|
||
|
||
# Create a layout element for the structured content
|
||
if markdown_texts:
|
||
# Parse HTML content to identify tables and text
|
||
import re
|
||
|
||
# Check if content contains tables
|
||
has_table = '<table' in markdown_texts.lower()
|
||
|
||
element = {
|
||
'element_id': len(layout_elements),
|
||
'type': 'table' if has_table else 'text',
|
||
'content': markdown_texts,
|
||
'page': current_page, # Use current_page parameter instead of page_idx
|
||
'bbox': [], # PP-StructureV3 doesn't provide individual bbox in this format
|
||
}
|
||
|
||
# Extract text from table for translation purposes
|
||
if has_table:
|
||
table_text = self._extract_table_text(markdown_texts)
|
||
element['extracted_text'] = table_text
|
||
logger.info(f"Extracted {len(table_text)} characters from table")
|
||
|
||
layout_elements.append(element)
|
||
|
||
# Add image metadata and SAVE images to disk
|
||
for img_idx, (img_path, img_obj) in enumerate(markdown_images.items()):
|
||
# Save image to disk
|
||
try:
|
||
# Determine base directory for saving images
|
||
base_dir = output_dir if output_dir else image_path.parent
|
||
|
||
# Create full path for image file
|
||
full_img_path = base_dir / img_path
|
||
|
||
# Create imgs/ subdirectory if it doesn't exist
|
||
full_img_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Save image object to disk
|
||
if hasattr(img_obj, 'save'):
|
||
# img_obj is PIL Image
|
||
img_obj.save(str(full_img_path))
|
||
logger.info(f"Saved extracted image to {full_img_path}")
|
||
else:
|
||
logger.warning(f"Image object for {img_path} does not have save() method, skipping")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to save image {img_path}: {str(e)}")
|
||
# Continue processing even if image save fails
|
||
|
||
# Extract bbox from filename (format: img_in_table_box_x1_y1_x2_y2.jpg)
|
||
bbox = []
|
||
try:
|
||
import re
|
||
match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', img_path)
|
||
if match:
|
||
x1, y1, x2, y2 = map(int, match.groups())
|
||
# Convert to 4-point bbox format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
|
||
bbox = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
|
||
logger.info(f"Extracted bbox from filename: {bbox}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to extract bbox from {img_path}: {e}")
|
||
|
||
images_metadata.append({
|
||
'element_id': len(layout_elements) + img_idx,
|
||
'image_path': img_path,
|
||
'type': 'image',
|
||
'page': current_page, # Use current_page parameter instead of page_idx
|
||
'bbox': bbox,
|
||
})
|
||
|
||
if layout_elements:
|
||
layout_data = {
|
||
'elements': layout_elements,
|
||
'total_elements': len(layout_elements),
|
||
'reading_order': list(range(len(layout_elements))),
|
||
}
|
||
logger.info(f"Detected {len(layout_elements)} layout elements")
|
||
return layout_data, images_metadata
|
||
else:
|
||
logger.warning("No layout elements detected")
|
||
return None, []
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
error_trace = traceback.format_exc()
|
||
logger.error(f"Layout analysis error: {str(e)}\n{error_trace}")
|
||
return None, []
|
||
|
||
def generate_markdown(
|
||
self,
|
||
text_regions: List[Dict],
|
||
layout_data: Optional[Dict] = None
|
||
) -> str:
|
||
"""
|
||
Generate Markdown from OCR results
|
||
|
||
Args:
|
||
text_regions: List of text regions with bbox and text
|
||
layout_data: Optional layout structure information
|
||
|
||
Returns:
|
||
Markdown formatted string
|
||
"""
|
||
markdown_lines = []
|
||
|
||
if layout_data and layout_data.get('elements'):
|
||
# Generate structured Markdown based on layout
|
||
for element in layout_data['elements']:
|
||
element_type = element.get('type', 'text')
|
||
content = element.get('content', '')
|
||
|
||
if element_type == 'title':
|
||
markdown_lines.append(f"# {content}\n")
|
||
elif element_type == 'table':
|
||
# Table in HTML format
|
||
markdown_lines.append(content)
|
||
markdown_lines.append("")
|
||
elif element_type == 'figure':
|
||
element_id = element.get('element_id')
|
||
markdown_lines.append(f"\n")
|
||
else:
|
||
markdown_lines.append(f"{content}\n")
|
||
|
||
else:
|
||
# Simple Markdown from text regions only
|
||
# Sort by vertical position (top to bottom)
|
||
def get_y_coord(region):
|
||
"""Safely extract Y coordinate from bbox"""
|
||
bbox = region.get('bbox', [])
|
||
if isinstance(bbox, (list, tuple)) and len(bbox) > 0:
|
||
if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) > 1:
|
||
return bbox[0][1] # [[x1,y1], [x2,y2], ...] format
|
||
elif len(bbox) > 1:
|
||
return bbox[1] # [x1, y1, x2, y2, ...] format
|
||
return 0 # Default to 0 if can't extract
|
||
|
||
sorted_regions = sorted(text_regions, key=get_y_coord)
|
||
|
||
for region in sorted_regions:
|
||
text = region['text']
|
||
markdown_lines.append(text)
|
||
|
||
return "\n".join(markdown_lines)
|
||
|
||
def process_with_dual_track(
|
||
self,
|
||
file_path: Path,
|
||
lang: str = 'ch',
|
||
detect_layout: bool = True,
|
||
confidence_threshold: Optional[float] = None,
|
||
output_dir: Optional[Path] = None,
|
||
force_track: Optional[str] = None
|
||
) -> Union[UnifiedDocument, Dict]:
|
||
"""
|
||
Process document using dual-track approach.
|
||
|
||
Args:
|
||
file_path: Path to document file
|
||
lang: Language for OCR (if needed)
|
||
detect_layout: Whether to perform layout analysis
|
||
confidence_threshold: Minimum confidence threshold
|
||
output_dir: Optional output directory for extracted images
|
||
force_track: Force specific track ("ocr" or "direct"), None for auto-detection
|
||
|
||
Returns:
|
||
UnifiedDocument if dual-track is enabled, Dict otherwise
|
||
"""
|
||
if not self.dual_track_enabled:
|
||
# Fallback to traditional OCR processing
|
||
return self.process_file_traditional(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir
|
||
)
|
||
|
||
start_time = datetime.now()
|
||
document_id = str(uuid.uuid4())
|
||
|
||
try:
|
||
# Detect document type and recommend processing track
|
||
if force_track:
|
||
logger.info(f"Forced to use {force_track} track")
|
||
recommendation = ProcessingTrackRecommendation(
|
||
track=force_track,
|
||
confidence=1.0,
|
||
reason=f"Forced by user",
|
||
document_type=None
|
||
)
|
||
else:
|
||
recommendation = self.document_detector.detect(file_path)
|
||
logger.info(f"Recommended track: {recommendation.track} (confidence: {recommendation.confidence:.2f})")
|
||
logger.info(f"Reason: {recommendation.reason}")
|
||
|
||
# Route to appropriate processing track
|
||
if recommendation.track == "direct":
|
||
# Use direct extraction for editable PDFs
|
||
logger.info("Using DIRECT extraction track (PyMuPDF)")
|
||
unified_doc = self.direct_extraction_engine.extract(file_path, output_dir)
|
||
unified_doc.document_id = document_id
|
||
else:
|
||
# Use OCR for scanned documents, images, etc.
|
||
logger.info("Using OCR track (PaddleOCR)")
|
||
ocr_result = self.process_file_traditional(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir
|
||
)
|
||
|
||
# Convert OCR result to UnifiedDocument using the converter
|
||
processing_time_so_far = (datetime.now() - start_time).total_seconds()
|
||
unified_doc = self.ocr_to_unified_converter.convert(
|
||
ocr_result, file_path, processing_time_so_far, lang
|
||
)
|
||
unified_doc.document_id = document_id
|
||
|
||
# Update processing track metadata
|
||
unified_doc.metadata.processing_track = (
|
||
ProcessingTrack.DIRECT if recommendation.track == "direct"
|
||
else ProcessingTrack.OCR
|
||
)
|
||
|
||
# Calculate total processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
unified_doc.metadata.processing_time = processing_time
|
||
|
||
logger.info(f"Document processing completed in {processing_time:.2f}s using {recommendation.track} track")
|
||
|
||
return unified_doc
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in dual-track processing: {e}")
|
||
# Fallback to traditional OCR
|
||
return self.process_file_traditional(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir
|
||
)
|
||
|
||
def process_file_traditional(
|
||
self,
|
||
file_path: Path,
|
||
lang: str = 'ch',
|
||
detect_layout: bool = True,
|
||
confidence_threshold: Optional[float] = None,
|
||
output_dir: Optional[Path] = None
|
||
) -> Dict:
|
||
"""
|
||
Traditional OCR processing (legacy method).
|
||
|
||
Args:
|
||
file_path: Path to file
|
||
lang: Language for OCR
|
||
detect_layout: Whether to perform layout analysis
|
||
confidence_threshold: Minimum confidence threshold
|
||
output_dir: Optional output directory
|
||
|
||
Returns:
|
||
Dictionary with OCR results in legacy format
|
||
"""
|
||
# Check if it's a PDF that needs conversion
|
||
if file_path.suffix.lower() == '.pdf':
|
||
image_paths = self.convert_pdf_to_images(file_path, output_dir or file_path.parent)
|
||
|
||
# Process multiple pages
|
||
all_results = []
|
||
for i, image_path in enumerate(image_paths):
|
||
result = self.process_image(
|
||
image_path, lang, detect_layout, confidence_threshold, output_dir, i
|
||
)
|
||
all_results.append(result)
|
||
|
||
# Combine results
|
||
combined_result = self._combine_results(all_results)
|
||
combined_result['filename'] = file_path.name
|
||
return combined_result
|
||
|
||
else:
|
||
# Single image or other file
|
||
return self.process_image(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir, 0
|
||
)
|
||
|
||
def _combine_results(self, results: List[Dict]) -> Dict:
|
||
"""Combine multiple OCR results into one"""
|
||
if not results:
|
||
return {'status': 'error', 'error': 'No results to combine'}
|
||
|
||
combined = {
|
||
'status': 'success',
|
||
'text_regions': [],
|
||
'total_text_regions': 0,
|
||
'average_confidence': 0.0,
|
||
'processing_time': 0.0,
|
||
'pages': [],
|
||
'layout_data': {'elements': []},
|
||
'images_metadata': [],
|
||
'enhanced_results': [] # For PP-StructureV3 enhanced results
|
||
}
|
||
|
||
total_confidence = 0.0
|
||
total_regions = 0
|
||
has_enhanced = False
|
||
|
||
for page_num, result in enumerate(results):
|
||
if result['status'] == 'success':
|
||
# Add page number to text regions
|
||
for region in result.get('text_regions', []):
|
||
region['page'] = page_num + 1
|
||
combined['text_regions'].append(region)
|
||
|
||
# Accumulate statistics
|
||
total_regions += result.get('total_text_regions', 0)
|
||
total_confidence += result.get('average_confidence', 0) * result.get('total_text_regions', 0)
|
||
combined['processing_time'] += result.get('processing_time', 0)
|
||
|
||
# Collect layout data
|
||
if result.get('layout_data'):
|
||
layout = result['layout_data']
|
||
# Check if this is enhanced layout data
|
||
if layout.get('enhanced'):
|
||
has_enhanced = True
|
||
# Store enhanced results separately for converter
|
||
combined['enhanced_results'].append({
|
||
'elements': layout.get('elements', []),
|
||
'reading_order': layout.get('reading_order', []),
|
||
'element_types': layout.get('element_types', {}),
|
||
'page': page_num,
|
||
'width': result.get('ocr_dimensions', {}).get('width', 0),
|
||
'height': result.get('ocr_dimensions', {}).get('height', 0)
|
||
})
|
||
# Always collect elements for backward compatibility
|
||
for elem in layout.get('elements', []):
|
||
elem['page'] = page_num
|
||
combined['layout_data']['elements'].append(elem)
|
||
|
||
# Collect images metadata
|
||
for img in result.get('images_metadata', []):
|
||
img['page'] = page_num
|
||
combined['images_metadata'].append(img)
|
||
|
||
# Store page data
|
||
combined['pages'].append(result)
|
||
|
||
combined['total_text_regions'] = total_regions
|
||
combined['average_confidence'] = total_confidence / total_regions if total_regions > 0 else 0.0
|
||
combined['language'] = results[0].get('language', 'ch') if results else 'ch'
|
||
combined['gpu_used'] = results[0].get('gpu_used', False) if results else False
|
||
|
||
# Generate markdown
|
||
combined['markdown_content'] = self.generate_markdown(
|
||
combined['text_regions'], combined['layout_data']
|
||
)
|
||
|
||
return combined
|
||
|
||
def process(
|
||
self,
|
||
file_path: Path,
|
||
lang: str = 'ch',
|
||
detect_layout: bool = True,
|
||
confidence_threshold: Optional[float] = None,
|
||
output_dir: Optional[Path] = None,
|
||
use_dual_track: bool = True,
|
||
force_track: Optional[str] = None
|
||
) -> Union[UnifiedDocument, Dict]:
|
||
"""
|
||
Main processing method with dual-track support.
|
||
|
||
Args:
|
||
file_path: Path to document file
|
||
lang: Language for OCR
|
||
detect_layout: Whether to perform layout analysis
|
||
confidence_threshold: Minimum confidence threshold
|
||
output_dir: Optional output directory
|
||
use_dual_track: Whether to use dual-track processing (default True)
|
||
force_track: Force specific track ("ocr" or "direct")
|
||
|
||
Returns:
|
||
UnifiedDocument if dual-track is enabled and use_dual_track=True,
|
||
Dict with legacy format otherwise
|
||
"""
|
||
if use_dual_track and self.dual_track_enabled:
|
||
# Use dual-track processing
|
||
return self.process_with_dual_track(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir, force_track
|
||
)
|
||
else:
|
||
# Use traditional OCR processing
|
||
return self.process_file_traditional(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir
|
||
)
|
||
|
||
def process_legacy(
|
||
self,
|
||
file_path: Path,
|
||
lang: str = 'ch',
|
||
detect_layout: bool = True,
|
||
confidence_threshold: Optional[float] = None,
|
||
output_dir: Optional[Path] = None
|
||
) -> Dict:
|
||
"""
|
||
Legacy processing method that always returns Dict format.
|
||
Kept for backward compatibility.
|
||
|
||
Args:
|
||
file_path: Path to document file
|
||
lang: Language for OCR
|
||
detect_layout: Whether to perform layout analysis
|
||
confidence_threshold: Minimum confidence threshold
|
||
output_dir: Optional output directory
|
||
|
||
Returns:
|
||
Dictionary with OCR results in legacy format
|
||
"""
|
||
if self.dual_track_enabled:
|
||
# Use dual-track but convert to legacy format
|
||
result = self.process_with_dual_track(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir
|
||
)
|
||
|
||
# Convert UnifiedDocument to legacy format if needed
|
||
if isinstance(result, UnifiedDocument):
|
||
return result.to_legacy_format()
|
||
else:
|
||
return result
|
||
else:
|
||
# Use traditional processing
|
||
return self.process_file_traditional(
|
||
file_path, lang, detect_layout, confidence_threshold, output_dir
|
||
)
|
||
|
||
def get_track_recommendation(self, file_path: Path) -> Optional[ProcessingTrackRecommendation]:
|
||
"""
|
||
Get processing track recommendation for a file.
|
||
|
||
Args:
|
||
file_path: Path to document file
|
||
|
||
Returns:
|
||
ProcessingTrackRecommendation if dual-track is enabled, None otherwise
|
||
"""
|
||
if not self.dual_track_enabled:
|
||
return None
|
||
|
||
try:
|
||
return self.document_detector.detect(file_path)
|
||
except Exception as e:
|
||
logger.error(f"Error getting track recommendation: {e}")
|
||
return None
|
||
|
||
def save_results(
|
||
self,
|
||
result: Union[UnifiedDocument, Dict],
|
||
output_dir: Path,
|
||
file_id: str,
|
||
source_file_path: Optional[Path] = None
|
||
) -> Tuple[Optional[Path], Optional[Path], Optional[Path]]:
|
||
"""
|
||
Save OCR results to JSON, Markdown, and layout-preserving PDF files
|
||
|
||
Args:
|
||
result: OCR result (UnifiedDocument or dictionary)
|
||
output_dir: Output directory
|
||
file_id: Unique file identifier
|
||
source_file_path: Optional path to original source file for PDF generation
|
||
|
||
Returns:
|
||
Tuple of (json_path, markdown_path, pdf_path)
|
||
"""
|
||
try:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Convert UnifiedDocument to dict if needed
|
||
if isinstance(result, UnifiedDocument):
|
||
result_dict = result.to_dict()
|
||
legacy_result = result.to_legacy_format()
|
||
markdown_content = result.extract_all_text()
|
||
else:
|
||
result_dict = result
|
||
legacy_result = result
|
||
markdown_content = result.get('markdown_content', '')
|
||
|
||
# Save JSON (use dict format for compatibility)
|
||
json_path = output_dir / f"{file_id}_result.json"
|
||
with open(json_path, 'w', encoding='utf-8') as f:
|
||
json.dump(result_dict if isinstance(result, UnifiedDocument) else result,
|
||
f, ensure_ascii=False, indent=2)
|
||
|
||
# Save Markdown
|
||
markdown_path = output_dir / f"{file_id}_output.md"
|
||
with open(markdown_path, 'w', encoding='utf-8') as f:
|
||
f.write(markdown_content)
|
||
|
||
logger.info(f"Results saved: {json_path.name}, {markdown_path.name}")
|
||
|
||
# Generate layout-preserving PDF
|
||
pdf_path = None
|
||
try:
|
||
from app.services.pdf_generator_service import pdf_generator_service
|
||
|
||
pdf_filename = f"{file_id}_layout.pdf"
|
||
pdf_path = output_dir / pdf_filename
|
||
|
||
logger.info(f"Generating layout-preserving PDF: {pdf_filename}")
|
||
|
||
success = pdf_generator_service.generate_layout_pdf(
|
||
json_path=json_path,
|
||
output_path=pdf_path,
|
||
source_file_path=source_file_path
|
||
)
|
||
|
||
if success:
|
||
logger.info(f"✓ PDF generated successfully: {pdf_path.name}")
|
||
else:
|
||
logger.warning(f"✗ PDF generation failed for {file_id}")
|
||
pdf_path = None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error generating PDF for {file_id}: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
pdf_path = None
|
||
|
||
return json_path, markdown_path, pdf_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error saving results: {str(e)}")
|
||
return None, None, None
|