Files
OCR/backend/app/services/ocr_service.py
egg ea0dd7456c feat: implement layout preprocessing backend
Backend implementation for add-layout-preprocessing proposal:
- Add LayoutPreprocessingService with CLAHE, sharpen, binarize
- Add auto-detection: analyze_image_quality() for contrast/edge metrics
- Integrate preprocessing into OCR pipeline (analyze_layout)
- Add Preview API: POST /api/v2/tasks/{id}/preview/preprocessing
- Add config options: layout_preprocessing_mode, thresholds
- Add schemas: PreprocessingConfig, PreprocessingPreviewResponse

Preprocessing only affects layout detection input.
Original images preserved for element extraction.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-27 15:17:20 +08:00

2143 lines
96 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tool_OCR - Core OCR Service with Dual-track Processing
Supports both PaddleOCR (for scanned documents) and direct extraction (for editable PDFs)
"""
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime
import uuid
import gc # For garbage collection
from paddleocr import PaddleOCR, PPStructureV3
from PIL import Image
from pdf2image import convert_from_path
import paddle
# Optional torch import for additional GPU memory management
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
from app.core.config import settings
from app.services.office_converter import OfficeConverter, OfficeConverterError
from app.services.memory_manager import get_model_manager, MemoryConfig, MemoryGuard, prediction_context
from app.services.layout_preprocessing_service import (
get_layout_preprocessing_service,
LayoutPreprocessingService,
)
from app.schemas.task import PreprocessingModeEnum, PreprocessingConfig
# Import dual-track components
try:
from app.services.document_type_detector import DocumentTypeDetector, ProcessingTrackRecommendation
from app.services.direct_extraction_engine import DirectExtractionEngine
from app.services.ocr_to_unified_converter import OCRToUnifiedConverter
from app.services.unified_document_exporter import UnifiedDocumentExporter
from app.models.unified_document import (
UnifiedDocument, DocumentMetadata,
ProcessingTrack, ElementType, DocumentElement, Page, Dimensions,
BoundingBox
)
DUAL_TRACK_AVAILABLE = True
except ImportError as e:
logging.getLogger(__name__).warning(f"Dual-track components not available: {e}")
DUAL_TRACK_AVAILABLE = False
UnifiedDocumentExporter = None
logger = logging.getLogger(__name__)
# Sentinel value for "use PubLayNet default" - explicitly NO model specification
_USE_PUBLAYNET_DEFAULT = "__USE_PUBLAYNET_DEFAULT__"
# Layout model mapping: user-friendly names to actual model names
# - "chinese": PP-DocLayout_plus-L - Best for Chinese documents (83.2% mAP, complex layouts)
# - "default": PubLayNet-based default model - Best for English documents
# - "cdla": picodet_lcnet_x1_0_fgd_layout_cdla - Alternative for Chinese layout
LAYOUT_MODEL_MAPPING = {
"chinese": "PP-DocLayout_plus-L",
"default": _USE_PUBLAYNET_DEFAULT, # Uses default PubLayNet-based model (no custom model)
"cdla": "picodet_lcnet_x1_0_fgd_layout_cdla",
}
class OCRService:
"""
Core OCR service using PaddleOCR-VL
Handles text recognition and document structure analysis
"""
def __init__(self):
"""Initialize PaddleOCR and PPStructure engines with GPU detection and dual-track support"""
self.ocr_languages = settings.ocr_languages_list
self.confidence_threshold = settings.ocr_confidence_threshold
# Initialize PaddleOCR engine (will be lazy-loaded per language)
self.ocr_engines = {}
# Initialize PP-Structure for layout analysis
self.structure_engine = None
# Initialize Office document converter
self.office_converter = OfficeConverter()
# Initialize dual-track components if available
if DUAL_TRACK_AVAILABLE:
self.document_detector = DocumentTypeDetector(
min_text_length=100,
sample_pages=3,
text_coverage_threshold=0.9
)
self.direct_extraction_engine = DirectExtractionEngine(
enable_table_detection=True,
enable_image_extraction=True
)
self.ocr_to_unified_converter = OCRToUnifiedConverter()
self.dual_track_enabled = True
logger.info("Dual-track processing enabled")
else:
self.document_detector = None
self.direct_extraction_engine = None
self.ocr_to_unified_converter = None
self.dual_track_enabled = False
logger.info("Dual-track processing not available, using OCR-only mode")
# GPU Detection and Configuration
self.gpu_available = False
self.use_gpu = False
self.gpu_info = {}
# Model cache management for memory optimization
self._model_last_used = {} # Track last usage time for each model
self._memory_warning_logged = False
# Initialize MemoryGuard for enhanced memory monitoring
self._memory_guard = None
if settings.enable_model_lifecycle_management:
try:
memory_config = MemoryConfig(
warning_threshold=settings.memory_warning_threshold,
critical_threshold=settings.memory_critical_threshold,
emergency_threshold=settings.memory_emergency_threshold,
model_idle_timeout_seconds=settings.pp_structure_idle_timeout_seconds,
gpu_memory_limit_mb=settings.gpu_memory_limit_mb,
enable_cpu_fallback=settings.enable_cpu_fallback,
)
self._memory_guard = MemoryGuard(memory_config)
logger.debug("MemoryGuard initialized for OCRService")
except Exception as e:
logger.warning(f"Failed to initialize MemoryGuard: {e}")
# Track if CPU fallback was activated
self._cpu_fallback_active = False
self._detect_and_configure_gpu()
# Log GPU optimization settings
if settings.enable_memory_optimization:
logger.info(f"GPU memory optimization enabled:")
logger.info(f" - Memory limit: {settings.gpu_memory_limit_mb}MB")
logger.info(f" - Model cache limit: {settings.model_cache_limit_mb}MB")
logger.info(f" - Batch size: {settings.inference_batch_size}")
logger.info(f" - Auto-unload unused models: {settings.auto_unload_unused_models}")
logger.info("OCR Service initialized")
def _detect_and_configure_gpu(self):
"""Detect GPU availability and configure usage"""
try:
# Check if forced CPU mode
if settings.force_cpu_mode:
logger.info("GPU mode forced to CPU by configuration")
self.use_gpu = False
self.gpu_info = {
'available': False,
'reason': 'CPU mode forced by configuration',
}
return
# Check if PaddlePaddle is compiled with CUDA
if paddle.is_compiled_with_cuda():
# Check if GPU devices are available
gpu_count = paddle.device.cuda.device_count()
if gpu_count > 0:
self.gpu_available = True
self.use_gpu = True
# Get GPU device information
device_id = settings.gpu_device_id if settings.gpu_device_id < gpu_count else 0
gpu_props = paddle.device.cuda.get_device_properties(device_id)
self.gpu_info = {
'available': True,
'device_count': gpu_count,
'device_id': device_id,
'device_name': gpu_props.name,
'total_memory': gpu_props.total_memory,
'compute_capability': f"{gpu_props.major}.{gpu_props.minor}",
}
# Set GPU memory fraction
try:
paddle.device.set_device(f'gpu:{device_id}')
logger.info(f"GPU {device_id} selected: {gpu_props.name}")
logger.info(f"GPU memory: {gpu_props.total_memory / (1024**3):.2f} GB")
logger.info(f"Compute capability: {gpu_props.major}.{gpu_props.minor}")
logger.info(f"GPU memory fraction set to: {settings.gpu_memory_fraction}")
except Exception as e:
logger.warning(f"Failed to configure GPU device: {e}")
self.use_gpu = False
self.gpu_info['available'] = False
self.gpu_info['reason'] = f'GPU configuration failed: {str(e)}'
else:
logger.warning("CUDA is available but no GPU devices found")
self.gpu_info = {
'available': False,
'reason': 'CUDA compiled but no GPU devices detected',
}
else:
logger.info("PaddlePaddle not compiled with CUDA support")
self.gpu_info = {
'available': False,
'reason': 'PaddlePaddle not compiled with CUDA',
}
except Exception as e:
logger.error(f"GPU detection failed: {e}")
self.use_gpu = False
self.gpu_info = {
'available': False,
'reason': f'GPU detection error: {str(e)}',
}
# Log final GPU status
if self.use_gpu:
logger.info(f"✓ GPU acceleration ENABLED - Using {self.gpu_info.get('device_name', 'Unknown GPU')}")
else:
reason = self.gpu_info.get('reason', 'Unknown')
logger.info(f" GPU acceleration DISABLED - {reason} - Using CPU mode")
def get_gpu_status(self) -> Dict:
"""
Get current GPU status and information
Returns:
Dictionary with GPU status information
"""
status = {
'gpu_enabled': self.use_gpu,
'gpu_available': self.gpu_available,
**self.gpu_info,
}
# Add current GPU memory usage if GPU is being used
if self.use_gpu and self.gpu_available:
try:
device_id = self.gpu_info.get('device_id', 0)
# Get memory info (returns allocated, total in bytes)
memory_allocated = paddle.device.cuda.memory_allocated(device_id)
memory_reserved = paddle.device.cuda.memory_reserved(device_id)
total_memory = self.gpu_info.get('total_memory', 0)
status['memory_allocated_mb'] = memory_allocated / (1024**2)
status['memory_reserved_mb'] = memory_reserved / (1024**2)
status['memory_total_mb'] = total_memory / (1024**2)
status['memory_utilization'] = (memory_allocated / total_memory * 100) if total_memory > 0 else 0
except Exception as e:
logger.warning(f"Failed to get GPU memory info: {e}")
return status
def _check_gpu_memory_usage(self):
"""
Check GPU memory usage and log warnings if approaching limits.
Uses MemoryGuard for enhanced monitoring with multiple backends.
"""
if not self.use_gpu or not settings.enable_memory_optimization:
return
try:
# Use MemoryGuard if available for better monitoring
if self._memory_guard:
stats = self._memory_guard.get_memory_stats()
# Log based on usage ratio
if stats.gpu_used_ratio > 0.90 and not self._memory_warning_logged:
logger.warning(
f"GPU memory usage critical: {stats.gpu_used_mb:.0f}MB / {stats.gpu_total_mb:.0f}MB "
f"({stats.gpu_used_ratio*100:.1f}%)"
)
logger.warning("Consider enabling auto_unload_unused_models or reducing batch size")
self._memory_warning_logged = True
# Trigger emergency cleanup if enabled
if settings.enable_emergency_cleanup:
self._cleanup_unused_models()
self._memory_guard.clear_gpu_cache()
elif stats.gpu_used_ratio > 0.75:
logger.info(
f"GPU memory: {stats.gpu_used_mb:.0f}MB / {stats.gpu_total_mb:.0f}MB "
f"({stats.gpu_used_ratio*100:.1f}%)"
)
else:
# Fallback to original implementation
device_id = self.gpu_info.get('device_id', 0)
memory_allocated = paddle.device.cuda.memory_allocated(device_id)
memory_allocated_mb = memory_allocated / (1024**2)
memory_limit_mb = settings.gpu_memory_limit_mb
utilization = (memory_allocated_mb / memory_limit_mb * 100) if memory_limit_mb > 0 else 0
if utilization > 90 and not self._memory_warning_logged:
logger.warning(f"GPU memory usage high: {memory_allocated_mb:.0f}MB / {memory_limit_mb}MB ({utilization:.1f}%)")
logger.warning("Consider enabling auto_unload_unused_models or reducing batch size")
self._memory_warning_logged = True
elif utilization > 75:
logger.info(f"GPU memory: {memory_allocated_mb:.0f}MB / {memory_limit_mb}MB ({utilization:.1f}%)")
except Exception as e:
logger.debug(f"Memory check failed: {e}")
def _cleanup_unused_models(self):
"""
Clean up unused models (including PP-StructureV3) to free GPU memory.
Models idle longer than model_idle_timeout_seconds will be unloaded.
Note: PP-StructureV3 is NO LONGER exempted from cleanup - it will be
unloaded based on pp_structure_idle_timeout_seconds configuration.
"""
if not settings.auto_unload_unused_models:
return
current_time = datetime.now()
models_to_remove = []
for lang, last_used in self._model_last_used.items():
# Use different timeout for structure engine vs language models
if lang == 'structure':
timeout = settings.pp_structure_idle_timeout_seconds
else:
timeout = settings.model_idle_timeout_seconds
idle_seconds = (current_time - last_used).total_seconds()
if idle_seconds > timeout:
models_to_remove.append(lang)
for model_key in models_to_remove:
if model_key == 'structure':
if self.structure_engine is not None:
logger.info(f"Unloading idle PP-StructureV3 engine (idle {settings.pp_structure_idle_timeout_seconds}s)")
self._unload_structure_engine()
if model_key in self._model_last_used:
del self._model_last_used[model_key]
elif model_key in self.ocr_engines:
logger.info(f"Unloading idle OCR engine for {model_key} (idle {settings.model_idle_timeout_seconds}s)")
del self.ocr_engines[model_key]
if model_key in self._model_last_used:
del self._model_last_used[model_key]
if models_to_remove and self.use_gpu:
# Clear CUDA cache
try:
paddle.device.cuda.empty_cache()
logger.info(f"Cleared CUDA cache after unloading {len(models_to_remove)} models")
except Exception as e:
logger.debug(f"Cache clear failed: {e}")
def _unload_structure_engine(self):
"""
Properly unload PP-StructureV3 engine and free GPU memory.
"""
if self.structure_engine is None:
return
try:
# Clear internal engine components
if hasattr(self.structure_engine, 'table_engine'):
self.structure_engine.table_engine = None
if hasattr(self.structure_engine, 'text_detector'):
self.structure_engine.text_detector = None
if hasattr(self.structure_engine, 'text_recognizer'):
self.structure_engine.text_recognizer = None
if hasattr(self.structure_engine, 'layout_predictor'):
self.structure_engine.layout_predictor = None
# Delete the engine
del self.structure_engine
self.structure_engine = None
# Force garbage collection
gc.collect()
# Clear GPU cache
if self.use_gpu:
paddle.device.cuda.empty_cache()
logger.info("PP-StructureV3 engine unloaded successfully")
except Exception as e:
logger.warning(f"Error unloading PP-StructureV3: {e}")
self.structure_engine = None
def clear_gpu_cache(self):
"""
Manually clear GPU memory cache.
Useful after processing large documents.
"""
if not self.use_gpu:
return
try:
paddle.device.cuda.empty_cache()
logger.info("GPU cache cleared")
except Exception as e:
logger.warning(f"Failed to clear GPU cache: {e}")
def get_ocr_engine(self, lang: str = 'ch') -> PaddleOCR:
"""
Get or create OCR engine for specified language with GPU support
Args:
lang: Language code (ch, en, japan, korean, etc.)
Returns:
PaddleOCR engine instance
"""
# Clean up unused models before loading new ones (memory optimization)
if settings.auto_unload_unused_models:
self._cleanup_unused_models()
if lang not in self.ocr_engines:
logger.info(f"Initializing PaddleOCR engine for language: {lang} (GPU: {self.use_gpu})")
try:
# PaddleOCR 3.x: Device is set globally via paddle.set_device()
# No need to pass device/use_gpu/gpu_mem parameters
self.ocr_engines[lang] = PaddleOCR(
lang=lang,
use_textline_orientation=True, # Replaces deprecated use_angle_cls
)
# Track model loading for cache management
self._model_last_used[lang] = datetime.now()
logger.info(f"PaddleOCR engine ready for {lang} (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)")
# Check GPU memory after loading
if self.use_gpu and settings.enable_memory_optimization:
self._check_gpu_memory_usage()
except Exception as e:
# If GPU initialization fails, fall back to CPU
if self.use_gpu:
logger.warning(f"GPU initialization failed, falling back to CPU: {e}")
self.use_gpu = False
# Switch to CPU device globally
paddle.set_device('cpu')
self.ocr_engines[lang] = PaddleOCR(
lang=lang,
use_textline_orientation=True,
)
self._model_last_used[lang] = datetime.now()
logger.info(f"PaddleOCR engine ready for {lang} (CPU mode - fallback)")
else:
raise
else:
# Update last used time for existing engine
self._model_last_used[lang] = datetime.now()
return self.ocr_engines[lang]
def _ensure_structure_engine(self, layout_model: Optional[str] = None) -> PPStructureV3:
"""
Get or create PP-Structure engine for layout analysis with GPU support.
Supports layout model selection for different document types.
Args:
layout_model: Layout detection model selection:
- "chinese": PP-DocLayout-S (best for Chinese documents)
- "default": PubLayNet-based (best for English documents)
- "cdla": CDLA model (alternative for Chinese layout)
- None: Use config default
Returns:
PPStructure engine instance
"""
# Resolve layout model name from user-friendly name
resolved_model_name = None
use_publaynet_default = False # Flag to explicitly use PubLayNet default (no model param)
if layout_model:
resolved_model_name = LAYOUT_MODEL_MAPPING.get(layout_model)
if layout_model not in LAYOUT_MODEL_MAPPING:
logger.warning(f"Unknown layout model '{layout_model}', using config default")
resolved_model_name = settings.layout_detection_model_name
elif resolved_model_name == _USE_PUBLAYNET_DEFAULT:
# User explicitly selected "default" - use PubLayNet without custom model
use_publaynet_default = True
resolved_model_name = None
logger.info(f"Using layout model: {layout_model} -> PubLayNet default (no custom model)")
else:
logger.info(f"Using layout model: {layout_model} -> {resolved_model_name}")
# Check if we need to recreate the engine due to different model
current_model = getattr(self, '_current_layout_model', None)
if self.structure_engine is not None and layout_model and layout_model != current_model:
logger.info(f"Layout model changed from {current_model} to {layout_model}, recreating engine")
self.structure_engine = None # Force recreation
# Use cached engine or create new one
if self.structure_engine is None:
logger.info(f"Initializing PP-StructureV3 engine (GPU: {self.use_gpu})")
try:
# PaddleOCR 3.x: Device is set globally via paddle.set_device()
# Use configuration settings for memory optimization
use_chart = settings.enable_chart_recognition
use_formula = settings.enable_formula_recognition
use_table = settings.enable_table_recognition
layout_threshold = settings.layout_detection_threshold
layout_nms = settings.layout_nms_threshold
layout_merge = settings.layout_merge_mode
layout_unclip = settings.layout_unclip_ratio
text_thresh = settings.text_det_thresh
text_box_thresh = settings.text_det_box_thresh
text_unclip = settings.text_det_unclip_ratio
# Layout model configuration:
# - If use_publaynet_default: don't specify any model (use PubLayNet default)
# - If resolved_model_name: use the specified model
# - Otherwise: use config default
if use_publaynet_default:
layout_model_name = None # Explicitly no model = PubLayNet default
elif resolved_model_name:
layout_model_name = resolved_model_name
else:
layout_model_name = settings.layout_detection_model_name
layout_model_dir = settings.layout_detection_model_dir
# Preprocessing configuration (Stage 1)
use_orientation = settings.use_doc_orientation_classify
use_unwarping = settings.use_doc_unwarping
use_textline = settings.use_textline_orientation
# Table and formula model configuration (Stage 4)
wired_table_model = settings.wired_table_model_name
wireless_table_model = settings.wireless_table_model_name
formula_model = settings.formula_recognition_model_name
logger.info(f"PP-StructureV3 config: table={use_table}, formula={use_formula}, chart={use_chart}")
logger.info(f"Preprocessing: orientation={use_orientation}, unwarping={use_unwarping}, textline={use_textline}")
logger.info(f"Layout model: name={layout_model_name}, dir={layout_model_dir}")
logger.info(f"Table models: wired={wired_table_model}, wireless={wireless_table_model}")
logger.info(f"Formula model: {formula_model}")
logger.info(f"Layout config: threshold={layout_threshold}, nms={layout_nms}, merge={layout_merge}, unclip={layout_unclip}")
logger.info(f"Text detection: thresh={text_thresh}, box_thresh={text_box_thresh}, unclip={text_unclip}")
# Build PPStructureV3 kwargs
pp_kwargs = {
# Preprocessing (Stage 1)
'use_doc_orientation_classify': use_orientation,
'use_doc_unwarping': use_unwarping,
'use_textline_orientation': use_textline,
# Element recognition (Stage 4)
'use_table_recognition': use_table,
'use_formula_recognition': use_formula,
'use_chart_recognition': use_chart,
# Layout detection parameters
'layout_threshold': layout_threshold,
'layout_nms': layout_nms,
'layout_unclip_ratio': layout_unclip,
'layout_merge_bboxes_mode': layout_merge,
# Text detection parameters
'text_det_thresh': text_thresh,
'text_det_box_thresh': text_box_thresh,
'text_det_unclip_ratio': text_unclip,
}
# Add layout model configuration if specified (Stage 3)
if layout_model_name:
pp_kwargs['layout_detection_model_name'] = layout_model_name
if layout_model_dir:
pp_kwargs['layout_detection_model_dir'] = layout_model_dir
# Add table structure model configuration (Stage 4)
# PPStructureV3 uses separate models for wired (bordered) and wireless (borderless) tables
# Both models should be configured for comprehensive table detection
if wired_table_model:
pp_kwargs['wired_table_structure_recognition_model_name'] = wired_table_model
if wireless_table_model:
pp_kwargs['wireless_table_structure_recognition_model_name'] = wireless_table_model
# Add formula recognition model configuration (Stage 4)
if formula_model:
pp_kwargs['formula_recognition_model_name'] = formula_model
self.structure_engine = PPStructureV3(**pp_kwargs)
# Track model loading for cache management
self._model_last_used['structure'] = datetime.now()
self._current_layout_model = layout_model # Track current model for recreation check
logger.info(f"PP-StructureV3 engine ready (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)")
# Check GPU memory after loading
if self.use_gpu and settings.enable_memory_optimization:
self._check_gpu_memory_usage()
except Exception as e:
# If GPU initialization fails, fall back to CPU
if self.use_gpu:
logger.warning(f"GPU initialization failed for PP-Structure, falling back to CPU: {e}")
self.use_gpu = False
# Switch to CPU device globally
paddle.set_device('cpu')
use_chart = settings.enable_chart_recognition
use_formula = settings.enable_formula_recognition
use_table = settings.enable_table_recognition
layout_threshold = settings.layout_detection_threshold
layout_model_name = settings.layout_detection_model_name
layout_model_dir = settings.layout_detection_model_dir
wired_table_model = settings.wired_table_model_name
wireless_table_model = settings.wireless_table_model_name
formula_model = settings.formula_recognition_model_name
# Build CPU fallback kwargs
cpu_kwargs = {
'use_doc_orientation_classify': settings.use_doc_orientation_classify,
'use_doc_unwarping': settings.use_doc_unwarping,
'use_textline_orientation': settings.use_textline_orientation,
'use_table_recognition': use_table,
'use_formula_recognition': use_formula,
'use_chart_recognition': use_chart,
'layout_threshold': layout_threshold,
}
if layout_model_name:
cpu_kwargs['layout_detection_model_name'] = layout_model_name
if layout_model_dir:
cpu_kwargs['layout_detection_model_dir'] = layout_model_dir
if wired_table_model:
cpu_kwargs['wired_table_structure_recognition_model_name'] = wired_table_model
if wireless_table_model:
cpu_kwargs['wireless_table_structure_recognition_model_name'] = wireless_table_model
if formula_model:
cpu_kwargs['formula_recognition_model_name'] = formula_model
self.structure_engine = PPStructureV3(**cpu_kwargs)
self._current_layout_model = layout_model # Track current model for recreation check
logger.info(f"PP-StructureV3 engine ready (CPU mode - fallback, layout_model={layout_model_name})")
else:
raise
return self.structure_engine
def cleanup_gpu_memory(self):
"""
Clean up GPU memory to prevent OOM errors.
This should be called after processing each document or batch.
Uses PaddlePaddle's built-in memory management and optionally torch if available.
"""
try:
# Clear PyTorch GPU cache if torch is available
if TORCH_AVAILABLE and torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
logger.debug("Cleared PyTorch GPU cache")
# Clear PaddlePaddle GPU cache
if paddle.device.is_compiled_with_cuda():
paddle.device.cuda.empty_cache()
logger.debug("Cleared PaddlePaddle GPU cache")
# Force garbage collection
gc.collect()
# Log current GPU memory status
if TORCH_AVAILABLE and torch.cuda.is_available():
allocated_mb = torch.cuda.memory_allocated() / 1024**2
reserved_mb = torch.cuda.memory_reserved() / 1024**2
logger.debug(f"GPU memory after cleanup - Allocated: {allocated_mb:.1f}MB, Reserved: {reserved_mb:.1f}MB")
except Exception as e:
logger.warning(f"GPU memory cleanup failed (non-critical): {e}")
# Don't fail the processing if cleanup fails
def check_gpu_memory(self, required_mb: int = 2000, enable_fallback: bool = True) -> bool:
"""
Check if sufficient GPU memory is available using MemoryGuard.
This method now uses MemoryGuard for accurate memory queries across
multiple backends (pynvml, torch, paddle) instead of returning True
blindly for PaddlePaddle-only environments.
Args:
required_mb: Required memory in MB (default 2000MB for OCR models)
enable_fallback: If True and CPU fallback is enabled, switch to CPU mode
when memory is insufficient instead of returning False
Returns:
True if sufficient memory is available, GPU is not used, or CPU fallback activated
"""
# If not using GPU, always return True
if not self.use_gpu:
return True
try:
# Use MemoryGuard if available for accurate multi-backend memory queries
if self._memory_guard:
is_available, stats = self._memory_guard.check_memory(
required_mb=required_mb,
device_id=self.gpu_info.get('device_id', 0)
)
if not is_available:
logger.warning(
f"GPU memory check failed: {stats.gpu_free_mb:.0f}MB free, "
f"{required_mb}MB required ({stats.gpu_used_ratio*100:.1f}% used)"
)
# Try to free memory
logger.info("Attempting memory cleanup before retry...")
self._cleanup_unused_models()
self._memory_guard.clear_gpu_cache()
# Check again
is_available, stats = self._memory_guard.check_memory(required_mb=required_mb)
if not is_available:
# Memory still insufficient after cleanup
if enable_fallback and settings.enable_cpu_fallback:
logger.warning(
f"Insufficient GPU memory ({stats.gpu_free_mb:.0f}MB) after cleanup. "
f"Activating CPU fallback mode."
)
self._activate_cpu_fallback()
return True # Continue with CPU
else:
logger.error(
f"Insufficient GPU memory: {stats.gpu_free_mb:.0f}MB available, "
f"{required_mb}MB required"
)
return False
logger.debug(
f"GPU memory check passed: {stats.gpu_free_mb:.0f}MB free "
f"({stats.gpu_used_ratio*100:.1f}% used)"
)
return True
else:
# Fallback to original implementation
free_memory = None
if TORCH_AVAILABLE and torch.cuda.is_available():
free_memory = torch.cuda.mem_get_info()[0] / 1024**2
elif paddle.device.is_compiled_with_cuda():
# PaddlePaddle doesn't have direct API to get free memory,
# use allocated memory to estimate
device_id = self.gpu_info.get('device_id', 0)
allocated = paddle.device.cuda.memory_allocated(device_id) / (1024**2)
total = settings.gpu_memory_limit_mb
free_memory = max(0, total - allocated)
logger.debug(f"Estimated free GPU memory: {free_memory:.0f}MB (total: {total}MB, allocated: {allocated:.0f}MB)")
if free_memory is not None:
if free_memory < required_mb:
logger.warning(f"Low GPU memory: {free_memory:.0f}MB available, {required_mb}MB required")
self.cleanup_gpu_memory()
# Recheck
if TORCH_AVAILABLE and torch.cuda.is_available():
free_memory = torch.cuda.mem_get_info()[0] / 1024**2
else:
allocated = paddle.device.cuda.memory_allocated(device_id) / (1024**2)
free_memory = max(0, total - allocated)
if free_memory < required_mb:
if enable_fallback and settings.enable_cpu_fallback:
logger.warning(f"Insufficient GPU memory after cleanup. Activating CPU fallback.")
self._activate_cpu_fallback()
return True
else:
logger.error(f"Insufficient GPU memory after cleanup: {free_memory:.0f}MB")
return False
logger.debug(f"GPU memory check passed: {free_memory:.0f}MB available")
return True
except Exception as e:
logger.warning(f"GPU memory check failed: {e}")
return True # Continue processing even if check fails
def _activate_cpu_fallback(self):
"""
Activate CPU fallback mode when GPU memory is insufficient.
This disables GPU usage for the current service instance.
"""
if self._cpu_fallback_active:
return # Already in CPU mode
logger.warning("=== CPU FALLBACK MODE ACTIVATED ===")
logger.warning("GPU memory insufficient, switching to CPU processing")
logger.warning("Performance will be significantly reduced")
self._cpu_fallback_active = True
self.use_gpu = False
# Update GPU info to reflect fallback
self.gpu_info['cpu_fallback'] = True
self.gpu_info['fallback_reason'] = 'GPU memory insufficient'
# Clear GPU cache to free memory
if self._memory_guard:
self._memory_guard.clear_gpu_cache()
def _restore_gpu_mode(self):
"""
Attempt to restore GPU mode after CPU fallback.
Called when memory pressure has been relieved.
"""
if not self._cpu_fallback_active:
return
if not self.gpu_available:
return
# Check if GPU memory is now available
if self._memory_guard:
is_available, stats = self._memory_guard.check_memory(
required_mb=settings.structure_model_memory_mb
)
if is_available:
logger.info("GPU memory available, restoring GPU mode")
self._cpu_fallback_active = False
self.use_gpu = True
self.gpu_info.pop('cpu_fallback', None)
self.gpu_info.pop('fallback_reason', None)
def convert_pdf_to_images(self, pdf_path: Path, output_dir: Path) -> List[Path]:
"""
Convert PDF to images (one per page)
Args:
pdf_path: Path to PDF file
output_dir: Directory to save converted images
Returns:
List of paths to converted images
"""
try:
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Converting PDF {pdf_path.name} to images")
# Convert PDF to images (300 DPI for good quality)
images = convert_from_path(
str(pdf_path),
dpi=300,
fmt='png'
)
image_paths = []
for i, image in enumerate(images):
# Save each page as PNG
image_path = output_dir / f"{pdf_path.stem}_page_{i+1}.png"
image.save(str(image_path), 'PNG')
image_paths.append(image_path)
logger.info(f"Saved page {i+1} to {image_path.name}")
logger.info(f"Converted {len(image_paths)} pages from PDF")
return image_paths
except Exception as e:
logger.error(f"PDF conversion error: {str(e)}")
raise
def process_image(
self,
image_path: Path,
lang: str = 'ch',
detect_layout: bool = True,
confidence_threshold: Optional[float] = None,
output_dir: Optional[Path] = None,
current_page: int = 0,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
) -> Dict:
"""
Process single image with OCR and layout analysis
Args:
image_path: Path to image file
lang: Language for OCR
detect_layout: Whether to perform layout analysis
confidence_threshold: Minimum confidence threshold (uses default if None)
output_dir: Optional output directory for saving extracted images
current_page: Current page number (0-based) for multi-page documents
layout_model: Layout detection model ('chinese', 'default', 'cdla')
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
Returns:
Dictionary with OCR results and metadata
"""
start_time = datetime.now()
threshold = confidence_threshold if confidence_threshold is not None else self.confidence_threshold
try:
# Pre-operation memory check: Try to restore GPU if in fallback and memory available
if self._cpu_fallback_active:
self._restore_gpu_mode()
if not self._cpu_fallback_active:
logger.info("GPU mode restored for processing")
# Initial memory check before starting any heavy processing
# Estimate memory requirement based on image type
estimated_memory_mb = 2500 # Conservative estimate for full OCR + layout
if detect_layout:
estimated_memory_mb += 500 # Additional for PP-StructureV3
if not self.check_gpu_memory(required_mb=estimated_memory_mb, enable_fallback=True):
logger.warning(
f"Pre-operation memory check failed ({estimated_memory_mb}MB required). "
f"Processing will attempt to proceed but may encounter issues."
)
# Check if file is Office document
if self.office_converter.is_office_document(image_path):
logger.info(f"Detected Office document: {image_path.name}, converting to PDF")
try:
# Convert Office document to PDF
pdf_path = self.office_converter.convert_to_pdf(image_path)
logger.info(f"Office document converted to PDF: {pdf_path.name}")
# Process the PDF (will be handled by PDF processing logic below)
image_path = pdf_path
except OfficeConverterError as e:
logger.error(f"Office conversion failed: {str(e)}")
raise
# Check if file is PDF
is_pdf = image_path.suffix.lower() == '.pdf'
if is_pdf:
# Convert PDF to images
logger.info(f"Detected PDF file: {image_path.name}, converting to images")
pdf_images_dir = image_path.parent / f"{image_path.stem}_pages"
image_paths = self.convert_pdf_to_images(image_path, pdf_images_dir)
# Process all pages
all_text_regions = []
total_confidence_sum = 0.0
total_valid_regions = 0
all_layout_data = []
all_images_metadata = []
all_ocr_dimensions = []
for page_num, page_image_path in enumerate(image_paths, 1):
logger.info(f"Processing PDF page {page_num}/{len(image_paths)}")
# Process each page with correct page number (0-based for layout data)
page_result = self.process_image(
page_image_path,
lang=lang,
detect_layout=detect_layout,
confidence_threshold=confidence_threshold,
output_dir=output_dir,
current_page=page_num - 1, # Convert to 0-based page number for layout data
layout_model=layout_model,
preprocessing_mode=preprocessing_mode,
preprocessing_config=preprocessing_config
)
# Accumulate results
if page_result['status'] == 'success':
# Add page number to each text region
for region in page_result['text_regions']:
region['page'] = page_num
all_text_regions.append(region)
total_confidence_sum += page_result['average_confidence'] * page_result['total_text_regions']
total_valid_regions += page_result['total_text_regions']
# Accumulate layout data (page numbers already set correctly in analyze_layout)
if page_result.get('layout_data'):
layout_data = page_result['layout_data']
all_layout_data.append(layout_data)
# Accumulate images metadata (page numbers already set correctly in analyze_layout)
if page_result.get('images_metadata'):
all_images_metadata.extend(page_result['images_metadata'])
# Store OCR dimensions for each page
if page_result.get('ocr_dimensions'):
all_ocr_dimensions.append({
'page': page_num,
'width': page_result['ocr_dimensions']['width'],
'height': page_result['ocr_dimensions']['height']
})
# Calculate overall average confidence
avg_confidence = total_confidence_sum / total_valid_regions if total_valid_regions > 0 else 0.0
# Combine layout data from all pages
combined_layout = None
if all_layout_data:
combined_elements = []
for layout in all_layout_data:
if layout.get('elements'):
combined_elements.extend(layout['elements'])
if combined_elements:
combined_layout = {
'elements': combined_elements,
'total_elements': len(combined_elements),
'reading_order': list(range(len(combined_elements))),
}
# Generate combined markdown
markdown_content = self.generate_markdown(all_text_regions, combined_layout)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
logger.info(
f"PDF processing completed: {image_path.name} - "
f"{len(image_paths)} pages, "
f"{len(all_text_regions)} regions, "
f"{avg_confidence:.2f} avg confidence, "
f"{processing_time:.2f}s"
)
return {
'status': 'success',
'file_name': image_path.name,
'language': lang,
'text_regions': all_text_regions,
'total_text_regions': len(all_text_regions),
'average_confidence': avg_confidence,
'layout_data': combined_layout,
'images_metadata': all_images_metadata,
'markdown_content': markdown_content,
'processing_time': processing_time,
'timestamp': datetime.utcnow().isoformat(),
'total_pages': len(image_paths),
'ocr_dimensions': all_ocr_dimensions if all_ocr_dimensions else None,
}
# Get OCR engine (for non-PDF images)
ocr_engine = self.get_ocr_engine(lang)
# Secondary memory check before OCR processing
if not self.check_gpu_memory(required_mb=1500, enable_fallback=True):
logger.warning(
f"OCR memory check: insufficient GPU memory (1500MB required). "
f"Mode: {'CPU fallback' if self._cpu_fallback_active else 'GPU (low memory)'}"
)
# Get the actual image dimensions that OCR will use
from PIL import Image
with Image.open(image_path) as img:
ocr_width, ocr_height = img.size
logger.info(f"OCR processing image dimensions: {ocr_width}x{ocr_height}")
# Perform OCR
logger.info(f"Processing image: {image_path.name}")
# Note: In PaddleOCR 3.x, use_angle_cls is set during initialization, not in ocr() call
ocr_results = ocr_engine.ocr(str(image_path))
# Parse OCR results (PaddleOCR 3.x format)
text_regions = []
total_confidence = 0.0
valid_regions = 0
if ocr_results and isinstance(ocr_results, (list, tuple)) and len(ocr_results) > 0:
# PaddleOCR 3.x returns a list of dictionaries (one per page)
for page_result in ocr_results:
if isinstance(page_result, dict):
# New format: {'rec_texts': [...], 'rec_scores': [...], 'rec_polys': [...]}
texts = page_result.get('rec_texts', [])
scores = page_result.get('rec_scores', [])
polys = page_result.get('rec_polys', [])
# Process each recognized text
for idx, text in enumerate(texts):
# Get corresponding score and bbox
confidence = scores[idx] if idx < len(scores) else 1.0
bbox = polys[idx] if idx < len(polys) else []
# Convert numpy array bbox to list for JSON serialization
if hasattr(bbox, 'tolist'):
bbox = bbox.tolist()
# Filter by confidence threshold
if confidence >= threshold:
text_regions.append({
'text': text,
'bbox': bbox,
'confidence': float(confidence),
})
total_confidence += confidence
valid_regions += 1
avg_confidence = total_confidence / valid_regions if valid_regions > 0 else 0.0
logger.info(f"Parsed {len(text_regions)} text regions with avg confidence {avg_confidence:.3f}")
# Layout analysis (if requested)
layout_data = None
images_metadata = []
if detect_layout:
# Pass current_page to analyze_layout for correct page numbering
layout_data, images_metadata = self.analyze_layout(
image_path,
output_dir=output_dir,
current_page=current_page,
layout_model=layout_model,
preprocessing_mode=preprocessing_mode,
preprocessing_config=preprocessing_config
)
# Generate Markdown
markdown_content = self.generate_markdown(text_regions, layout_data)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
result = {
'status': 'success',
'file_name': image_path.name,
'language': lang,
'text_regions': text_regions,
'total_text_regions': len(text_regions),
'average_confidence': avg_confidence,
'layout_data': layout_data,
'images_metadata': images_metadata,
'markdown_content': markdown_content,
'processing_time': processing_time,
'timestamp': datetime.utcnow().isoformat(),
'ocr_dimensions': {
'width': ocr_width,
'height': ocr_height
}
}
# If layout data is enhanced, add enhanced results for converter
if layout_data and layout_data.get('enhanced'):
result['enhanced_results'] = [{
'elements': layout_data.get('elements', []),
'reading_order': layout_data.get('reading_order', []),
'element_types': layout_data.get('element_types', {}),
'page': current_page,
'width': ocr_width,
'height': ocr_height
}]
# Generate PP-StructureV3 debug outputs if enabled
if settings.pp_structure_debug_enabled and output_dir:
try:
from app.services.pp_structure_debug import PPStructureDebug
debug_service = PPStructureDebug(output_dir)
# Save raw results as JSON
debug_service.save_raw_results(
pp_structure_results={
'elements': layout_data.get('elements', []),
'total_elements': layout_data.get('total_elements', 0),
'element_types': layout_data.get('element_types', {}),
'reading_order': layout_data.get('reading_order', []),
'enhanced': True,
'has_parsing_res_list': True
},
raw_ocr_regions=text_regions,
filename_prefix=image_path.stem
)
# Generate visualization if enabled
if settings.pp_structure_debug_visualization:
debug_service.generate_visualization(
image_path=image_path,
pp_structure_elements=layout_data.get('elements', []),
raw_ocr_regions=text_regions,
filename_prefix=image_path.stem
)
logger.info(f"Generated PP-StructureV3 debug outputs for {image_path.name}")
except Exception as debug_error:
logger.warning(f"Failed to generate debug outputs: {debug_error}")
logger.info(
f"OCR completed: {image_path.name} - "
f"{len(text_regions)} regions, "
f"{avg_confidence:.2f} avg confidence, "
f"{processing_time:.2f}s"
)
# Clean up GPU memory after processing
self.cleanup_gpu_memory()
return result
except Exception as e:
import traceback
error_trace = traceback.format_exc()
logger.error(f"OCR processing error for {image_path.name}: {str(e)}\n{error_trace}")
return {
'status': 'error',
'file_name': image_path.name,
'error_message': str(e),
'processing_time': (datetime.now() - start_time).total_seconds(),
}
def _extract_table_text(self, html_content: str) -> str:
"""
Extract text from HTML table content for translation purposes
Args:
html_content: HTML content containing table
Returns:
Extracted text from table cells
"""
try:
from html.parser import HTMLParser
class TableTextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.text_parts = []
self.in_table = False
def handle_starttag(self, tag, attrs):
if tag == 'table':
self.in_table = True
def handle_endtag(self, tag):
if tag == 'table':
self.in_table = False
elif tag in ('td', 'th') and self.in_table:
self.text_parts.append(' | ') # Cell separator
elif tag == 'tr' and self.in_table:
self.text_parts.append('\n') # Row separator
def handle_data(self, data):
if self.in_table:
stripped = data.strip()
if stripped:
self.text_parts.append(stripped)
parser = TableTextExtractor()
parser.feed(html_content)
# Clean up the extracted text
extracted = ''.join(parser.text_parts)
# Remove multiple separators
import re
extracted = re.sub(r'\s*\|\s*\|+\s*', ' | ', extracted)
extracted = re.sub(r'\n+', '\n', extracted)
extracted = extracted.strip()
return extracted
except Exception as e:
logger.warning(f"Failed to extract table text: {e}")
# Fallback: just remove HTML tags
import re
text = re.sub(r'<[^>]+>', ' ', html_content)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def analyze_layout(
self,
image_path: Path,
output_dir: Optional[Path] = None,
current_page: int = 0,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
) -> Tuple[Optional[Dict], List[Dict]]:
"""
Analyze document layout using PP-StructureV3 with enhanced element extraction
Args:
image_path: Path to image file
output_dir: Optional output directory for saving extracted images (defaults to image_path.parent)
current_page: Current page number (0-based) for multi-page documents
layout_model: Layout detection model ('chinese', 'default', 'cdla')
preprocessing_mode: Preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
Returns:
Tuple of (layout_data, images_metadata)
"""
try:
# Pre-operation memory check for layout analysis
if self._cpu_fallback_active:
self._restore_gpu_mode()
if not self._cpu_fallback_active:
logger.info("GPU mode restored for layout analysis")
if not self.check_gpu_memory(required_mb=2000, enable_fallback=True):
logger.warning(
f"Layout analysis pre-check: insufficient GPU memory (2000MB required). "
f"Mode: {'CPU fallback' if self._cpu_fallback_active else 'GPU'}"
)
structure_engine = self._ensure_structure_engine(layout_model)
# Apply image preprocessing for layout detection
# Preprocessing enhances faint lines/borders to improve table detection
# Original image is preserved for element extraction
preprocessed_image = None
preprocessing_result = None
# Determine preprocessing mode (default from config if not specified)
mode = preprocessing_mode or PreprocessingModeEnum(settings.layout_preprocessing_mode)
if mode != PreprocessingModeEnum.DISABLED:
try:
preprocessing_service = get_layout_preprocessing_service()
preprocessed_pil, preprocessing_result = preprocessing_service.preprocess_to_pil(
image_path,
mode=mode,
config=preprocessing_config
)
if preprocessing_result.was_processed:
preprocessed_image = preprocessed_pil
logger.info(
f"Layout preprocessing applied: mode={mode.value}, "
f"config={preprocessing_result.config_used}, "
f"metrics={preprocessing_result.quality_metrics}"
)
else:
logger.info(f"No preprocessing needed (mode={mode.value})")
except Exception as preprocess_error:
logger.warning(f"Preprocessing failed, using original image: {preprocess_error}")
preprocessed_image = None
# Try enhanced processing first
try:
from app.services.pp_structure_enhanced import PPStructureEnhanced
enhanced_processor = PPStructureEnhanced(structure_engine)
result = enhanced_processor.analyze_with_full_structure(
image_path, output_dir, current_page, preprocessed_image=preprocessed_image
)
if result.get('has_parsing_res_list'):
logger.info(f"Enhanced PP-StructureV3 analysis successful with {result['total_elements']} elements")
logger.info(f"Element types found: {result.get('element_types', {})}")
# Convert to legacy format for compatibility
layout_data = {
'elements': result['elements'],
'total_elements': result['total_elements'],
'reading_order': result['reading_order'],
'element_types': result.get('element_types', {}),
'enhanced': True
}
# Extract images metadata
images_metadata = []
for elem in result.get('images', []):
images_metadata.append({
'element_id': elem['element_id'],
'type': 'image',
'page': elem['page'],
'bbox': elem['bbox']
})
# Clean up GPU memory after enhanced processing
self.cleanup_gpu_memory()
return layout_data, images_metadata
else:
logger.info("parsing_res_list not available, using standard processing")
except ImportError:
logger.debug("Enhanced PP-StructureV3 module not available, using standard processing")
except Exception as e:
logger.warning(f"Enhanced processing failed, falling back to standard: {e}")
# Standard processing (original implementation)
logger.info(f"Running standard layout analysis on {image_path.name}")
# Memory check before PP-StructureV3 processing
if not self.check_gpu_memory(required_mb=2000, enable_fallback=True):
logger.warning(
f"PP-StructureV3 memory check: insufficient GPU memory (2000MB required). "
f"Mode: {'CPU fallback' if self._cpu_fallback_active else 'GPU (low memory)'}"
)
# Use prediction semaphore to control concurrent predictions
# This prevents OOM errors from multiple simultaneous PP-StructureV3.predict() calls
with prediction_context(timeout=settings.service_acquire_timeout_seconds) as acquired:
if not acquired:
logger.error("Failed to acquire prediction slot (timeout), returning empty layout")
return None, []
# Use preprocessed image if available, otherwise original path
if preprocessed_image is not None:
import numpy as np
# Convert PIL to numpy array (BGR format for PP-Structure)
predict_input = np.array(preprocessed_image)
if len(predict_input.shape) == 3 and predict_input.shape[2] == 3:
# Convert RGB to BGR
predict_input = predict_input[:, :, ::-1]
results = structure_engine.predict(predict_input)
else:
results = structure_engine.predict(str(image_path))
layout_elements = []
images_metadata = []
# Process each page result (for images, usually just one page)
for page_idx, page_result in enumerate(results):
# Get markdown dictionary from result object
if hasattr(page_result, 'markdown'):
markdown_dict = page_result.markdown
logger.info(f"Page {page_idx} markdown keys: {markdown_dict.keys() if isinstance(markdown_dict, dict) else type(markdown_dict)}")
# Extract layout information from markdown structure
if isinstance(markdown_dict, dict):
# Get markdown texts (HTML format with tables and structure)
markdown_texts = markdown_dict.get('markdown_texts', '')
markdown_images = markdown_dict.get('markdown_images', {})
# Create a layout element for the structured content
if markdown_texts:
# Parse HTML content to identify tables and text
import re
# Check if content contains tables
has_table = '<table' in markdown_texts.lower()
element = {
'element_id': len(layout_elements),
'type': 'table' if has_table else 'text',
'content': markdown_texts,
'page': current_page, # Use current_page parameter instead of page_idx
'bbox': [], # PP-StructureV3 doesn't provide individual bbox in this format
}
# Extract text from table for translation purposes
if has_table:
table_text = self._extract_table_text(markdown_texts)
element['extracted_text'] = table_text
logger.info(f"Extracted {len(table_text)} characters from table")
layout_elements.append(element)
# Add image metadata and SAVE images to disk
for img_idx, (img_path, img_obj) in enumerate(markdown_images.items()):
# Save image to disk
try:
# Determine base directory for saving images
base_dir = output_dir if output_dir else image_path.parent
# Create full path for image file
full_img_path = base_dir / img_path
# Create imgs/ subdirectory if it doesn't exist
full_img_path.parent.mkdir(parents=True, exist_ok=True)
# Save image object to disk
if hasattr(img_obj, 'save'):
# img_obj is PIL Image
img_obj.save(str(full_img_path))
logger.info(f"Saved extracted image to {full_img_path}")
else:
logger.warning(f"Image object for {img_path} does not have save() method, skipping")
except Exception as e:
logger.warning(f"Failed to save image {img_path}: {str(e)}")
# Continue processing even if image save fails
# Extract bbox from filename (format: img_in_table_box_x1_y1_x2_y2.jpg)
bbox = []
try:
import re
match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', img_path)
if match:
x1, y1, x2, y2 = map(int, match.groups())
# Convert to 4-point bbox format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
bbox = [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
logger.info(f"Extracted bbox from filename: {bbox}")
except Exception as e:
logger.warning(f"Failed to extract bbox from {img_path}: {e}")
images_metadata.append({
'element_id': len(layout_elements) + img_idx,
'image_path': img_path,
'type': 'image',
'page': current_page, # Use current_page parameter instead of page_idx
'bbox': bbox,
})
if layout_elements:
layout_data = {
'elements': layout_elements,
'total_elements': len(layout_elements),
'reading_order': list(range(len(layout_elements))),
}
logger.info(f"Detected {len(layout_elements)} layout elements")
# Clean up GPU memory after standard processing
self.cleanup_gpu_memory()
return layout_data, images_metadata
else:
logger.warning("No layout elements detected")
return None, []
except Exception as e:
import traceback
error_trace = traceback.format_exc()
logger.error(f"Layout analysis error: {str(e)}\n{error_trace}")
return None, []
def generate_markdown(
self,
text_regions: List[Dict],
layout_data: Optional[Dict] = None
) -> str:
"""
Generate Markdown from OCR results
Args:
text_regions: List of text regions with bbox and text
layout_data: Optional layout structure information
Returns:
Markdown formatted string
"""
markdown_lines = []
if layout_data and layout_data.get('elements'):
# Generate structured Markdown based on layout
for element in layout_data['elements']:
element_type = element.get('type', 'text')
content = element.get('content', '')
if element_type == 'title':
markdown_lines.append(f"# {content}\n")
elif element_type == 'table':
# Table in HTML format
markdown_lines.append(content)
markdown_lines.append("")
elif element_type == 'figure':
element_id = element.get('element_id')
markdown_lines.append(f"![Figure {element_id}](./images/img_{element_id}.jpg)\n")
else:
markdown_lines.append(f"{content}\n")
else:
# Simple Markdown from text regions only
# Sort by vertical position (top to bottom)
def get_y_coord(region):
"""Safely extract Y coordinate from bbox"""
bbox = region.get('bbox', [])
if isinstance(bbox, (list, tuple)) and len(bbox) > 0:
if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) > 1:
return bbox[0][1] # [[x1,y1], [x2,y2], ...] format
elif len(bbox) > 1:
return bbox[1] # [x1, y1, x2, y2, ...] format
return 0 # Default to 0 if can't extract
sorted_regions = sorted(text_regions, key=get_y_coord)
for region in sorted_regions:
text = region['text']
markdown_lines.append(text)
return "\n".join(markdown_lines)
def process_with_dual_track(
self,
file_path: Path,
lang: str = 'ch',
detect_layout: bool = True,
confidence_threshold: Optional[float] = None,
output_dir: Optional[Path] = None,
force_track: Optional[str] = None,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
) -> Union[UnifiedDocument, Dict]:
"""
Process document using dual-track approach.
Args:
file_path: Path to document file
lang: Language for OCR (if needed)
detect_layout: Whether to perform layout analysis
confidence_threshold: Minimum confidence threshold
output_dir: Optional output directory for extracted images
force_track: Force specific track ("ocr" or "direct"), None for auto-detection
layout_model: Layout detection model ('chinese', 'default', 'cdla') (used for OCR track only)
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
Returns:
UnifiedDocument if dual-track is enabled, Dict otherwise
"""
if not self.dual_track_enabled:
# Fallback to traditional OCR processing
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
)
start_time = datetime.now()
document_id = str(uuid.uuid4())
try:
# Detect document type and recommend processing track
if force_track:
logger.info(f"Forced to use {force_track} track")
recommendation = ProcessingTrackRecommendation(
track=force_track,
confidence=1.0,
reason=f"Forced by user",
document_type=None
)
else:
recommendation = self.document_detector.detect(file_path)
logger.info(f"Recommended track: {recommendation.track} (confidence: {recommendation.confidence:.2f})")
logger.info(f"Reason: {recommendation.reason}")
# Route to appropriate processing track
unified_doc = None
if recommendation.track == "direct":
# Use direct extraction for editable PDFs
logger.info("Using DIRECT extraction track (PyMuPDF)")
# Check if file is Office document - needs conversion to PDF first
actual_file_path = file_path
temp_pdf_path = None
if self.office_converter.is_office_document(file_path):
# Convert Office to PDF for direct extraction
logger.info(f"Converting Office document to PDF for direct extraction: {file_path.name}")
try:
# Convert to output directory or file parent
convert_dir = output_dir if output_dir else file_path.parent
temp_pdf_path = self.office_converter.convert_to_pdf(file_path, convert_dir)
actual_file_path = temp_pdf_path
logger.info(f"Office document converted to PDF: {temp_pdf_path.name}")
except OfficeConverterError as e:
logger.error(f"Office conversion failed, falling back to OCR: {e}")
# Fallback to OCR if conversion fails
recommendation = ProcessingTrackRecommendation(
track="ocr",
confidence=0.7,
reason=f"Office conversion failed ({str(e)}), using OCR as fallback",
document_type=recommendation.document_type
)
# Only proceed with direct extraction if track is still "direct"
if recommendation.track == "direct":
unified_doc = self.direct_extraction_engine.extract(actual_file_path, output_dir)
unified_doc.document_id = document_id
# Update metadata with original filename if Office was converted
if temp_pdf_path:
unified_doc.metadata.original_filename = file_path.name
# HYBRID MODE: Check if Direct track missed images (e.g., inline image blocks)
# If so, use OCR to extract images and merge them into the Direct result
pages_with_missing_images = self.direct_extraction_engine.check_document_for_missing_images(
actual_file_path
)
if pages_with_missing_images:
logger.info(f"Hybrid mode: Direct track missing images on pages {pages_with_missing_images}, using OCR to extract images")
try:
# Run OCR on the file to extract images
ocr_result = self.process_file_traditional(
actual_file_path, lang, detect_layout=True,
confidence_threshold=confidence_threshold,
output_dir=output_dir, layout_model=layout_model,
preprocessing_mode=preprocessing_mode,
preprocessing_config=preprocessing_config
)
# Convert OCR result to extract images
ocr_unified = self.ocr_to_unified_converter.convert(
ocr_result, actual_file_path, 0.0, lang
)
# Merge OCR-extracted images into Direct track result
images_added = self._merge_ocr_images_into_direct(
unified_doc, ocr_unified, pages_with_missing_images
)
if images_added > 0:
logger.info(f"Hybrid mode: Added {images_added} images from OCR to Direct track result")
unified_doc.metadata.processing_track = ProcessingTrack.HYBRID
else:
# Fallback: OCR didn't find images either, render inline image blocks directly
logger.info("Hybrid mode: OCR didn't find images, falling back to inline image rendering")
images_added = self.direct_extraction_engine.render_inline_image_regions(
actual_file_path, unified_doc, pages_with_missing_images, output_dir
)
if images_added > 0:
logger.info(f"Hybrid mode: Rendered {images_added} inline image regions")
unified_doc.metadata.processing_track = ProcessingTrack.HYBRID
except Exception as e:
logger.warning(f"Hybrid mode image extraction failed: {e}")
# Continue with Direct track result without images
# Use OCR track (either by recommendation or fallback)
if recommendation.track == "ocr":
# Use OCR for scanned documents, images, etc.
logger.info("Using OCR track (PaddleOCR)")
ocr_result = self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
)
# Convert OCR result to UnifiedDocument using the converter
processing_time_so_far = (datetime.now() - start_time).total_seconds()
unified_doc = self.ocr_to_unified_converter.convert(
ocr_result, file_path, processing_time_so_far, lang
)
unified_doc.document_id = document_id
# Update processing track metadata (only if not already set to HYBRID)
if unified_doc.metadata.processing_track != ProcessingTrack.HYBRID:
unified_doc.metadata.processing_track = (
ProcessingTrack.DIRECT if recommendation.track == "direct"
else ProcessingTrack.OCR
)
# Calculate total processing time
processing_time = (datetime.now() - start_time).total_seconds()
unified_doc.metadata.processing_time = processing_time
actual_track = unified_doc.metadata.processing_track.value
logger.info(f"Document processing completed in {processing_time:.2f}s using {actual_track} track")
return unified_doc
except Exception as e:
logger.error(f"Error in dual-track processing: {e}")
# Fallback to traditional OCR
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
)
def _merge_ocr_images_into_direct(
self,
direct_doc: 'UnifiedDocument',
ocr_doc: 'UnifiedDocument',
pages_with_missing_images: List[int]
) -> int:
"""
Merge OCR-extracted images into Direct track result.
This is used in hybrid mode when Direct track couldn't extract certain
images (like logos composed of inline image blocks).
Args:
direct_doc: UnifiedDocument from Direct track
ocr_doc: UnifiedDocument from OCR track
pages_with_missing_images: List of page numbers (1-indexed) that need images
Returns:
Number of images added
"""
images_added = 0
try:
# Get image element types to look for
image_types = {ElementType.FIGURE, ElementType.IMAGE, ElementType.LOGO}
for page_num in pages_with_missing_images:
# Find the target page in direct_doc
direct_page = None
for page in direct_doc.pages:
if page.page_number == page_num:
direct_page = page
break
if not direct_page:
continue
# Find the source page in ocr_doc
ocr_page = None
for page in ocr_doc.pages:
if page.page_number == page_num:
ocr_page = page
break
if not ocr_page:
continue
# Extract image elements from OCR page
for element in ocr_page.elements:
if element.type in image_types:
# Assign new element ID to avoid conflicts
new_element_id = f"hybrid_{element.element_id}"
element.element_id = new_element_id
# Add to direct page
direct_page.elements.append(element)
images_added += 1
logger.debug(f"Added image element {new_element_id} to page {page_num}")
# Update image count in direct_doc metadata
if images_added > 0:
current_images = direct_doc.metadata.total_images or 0
direct_doc.metadata.total_images = current_images + images_added
except Exception as e:
logger.error(f"Error merging OCR images into Direct track: {e}")
return images_added
def process_file_traditional(
self,
file_path: Path,
lang: str = 'ch',
detect_layout: bool = True,
confidence_threshold: Optional[float] = None,
output_dir: Optional[Path] = None,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
) -> Dict:
"""
Traditional OCR processing (legacy method).
Args:
file_path: Path to file
lang: Language for OCR
detect_layout: Whether to perform layout analysis
confidence_threshold: Minimum confidence threshold
output_dir: Optional output directory
layout_model: Layout detection model ('chinese', 'default', 'cdla')
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
Returns:
Dictionary with OCR results in legacy format
"""
# Check if it's a PDF that needs conversion
if file_path.suffix.lower() == '.pdf':
image_paths = self.convert_pdf_to_images(file_path, output_dir or file_path.parent)
# Process multiple pages
all_results = []
for i, image_path in enumerate(image_paths):
result = self.process_image(
image_path, lang, detect_layout, confidence_threshold, output_dir, i, layout_model,
preprocessing_mode, preprocessing_config
)
all_results.append(result)
# Combine results
combined_result = self._combine_results(all_results)
combined_result['filename'] = file_path.name
# Clean up GPU memory after processing all pages
self.cleanup_gpu_memory()
return combined_result
else:
# Single image or other file
return self.process_image(
file_path, lang, detect_layout, confidence_threshold, output_dir, 0, layout_model,
preprocessing_mode, preprocessing_config
)
def _combine_results(self, results: List[Dict]) -> Dict:
"""Combine multiple OCR results into one"""
if not results:
return {'status': 'error', 'error': 'No results to combine'}
combined = {
'status': 'success',
'text_regions': [],
'total_text_regions': 0,
'average_confidence': 0.0,
'processing_time': 0.0,
'pages': [],
'layout_data': {'elements': []},
'images_metadata': [],
'enhanced_results': [] # For PP-StructureV3 enhanced results
}
total_confidence = 0.0
total_regions = 0
has_enhanced = False
for page_num, result in enumerate(results):
if result['status'] == 'success':
# Add page number to text regions
for region in result.get('text_regions', []):
region['page'] = page_num + 1
combined['text_regions'].append(region)
# Accumulate statistics
total_regions += result.get('total_text_regions', 0)
total_confidence += result.get('average_confidence', 0) * result.get('total_text_regions', 0)
combined['processing_time'] += result.get('processing_time', 0)
# Collect layout data
if result.get('layout_data'):
layout = result['layout_data']
# Check if this is enhanced layout data
if layout.get('enhanced'):
has_enhanced = True
# Store enhanced results separately for converter
combined['enhanced_results'].append({
'elements': layout.get('elements', []),
'reading_order': layout.get('reading_order', []),
'element_types': layout.get('element_types', {}),
'page': page_num,
'width': result.get('ocr_dimensions', {}).get('width', 0),
'height': result.get('ocr_dimensions', {}).get('height', 0)
})
# Always collect elements for backward compatibility
for elem in layout.get('elements', []):
elem['page'] = page_num
combined['layout_data']['elements'].append(elem)
# Collect images metadata
for img in result.get('images_metadata', []):
img['page'] = page_num
combined['images_metadata'].append(img)
# Store page data
combined['pages'].append(result)
combined['total_text_regions'] = total_regions
combined['average_confidence'] = total_confidence / total_regions if total_regions > 0 else 0.0
combined['language'] = results[0].get('language', 'ch') if results else 'ch'
combined['gpu_used'] = results[0].get('gpu_used', False) if results else False
# Generate markdown
combined['markdown_content'] = self.generate_markdown(
combined['text_regions'], combined['layout_data']
)
return combined
def process(
self,
file_path: Path,
lang: str = 'ch',
detect_layout: bool = True,
confidence_threshold: Optional[float] = None,
output_dir: Optional[Path] = None,
use_dual_track: bool = True,
force_track: Optional[str] = None,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
) -> Union[UnifiedDocument, Dict]:
"""
Main processing method with dual-track support.
Args:
file_path: Path to document file
lang: Language for OCR
detect_layout: Whether to perform layout analysis
confidence_threshold: Minimum confidence threshold
output_dir: Optional output directory
use_dual_track: Whether to use dual-track processing (default True)
force_track: Force specific track ("ocr" or "direct")
layout_model: Layout detection model ('chinese', 'default', 'cdla') (used for OCR track only)
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
Returns:
UnifiedDocument if dual-track is enabled and use_dual_track=True,
Dict with legacy format otherwise
"""
# Use dual-track processing if:
# 1. use_dual_track is True (auto-detection), OR
# 2. force_track is specified (explicit track selection)
if (use_dual_track or force_track) and self.dual_track_enabled:
# Use dual-track processing (or forced track)
return self.process_with_dual_track(
file_path, lang, detect_layout, confidence_threshold, output_dir, force_track, layout_model,
preprocessing_mode, preprocessing_config
)
else:
# Use traditional OCR processing (no force_track support)
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
)
def process_legacy(
self,
file_path: Path,
lang: str = 'ch',
detect_layout: bool = True,
confidence_threshold: Optional[float] = None,
output_dir: Optional[Path] = None
) -> Dict:
"""
Legacy processing method that always returns Dict format.
Kept for backward compatibility.
Args:
file_path: Path to document file
lang: Language for OCR
detect_layout: Whether to perform layout analysis
confidence_threshold: Minimum confidence threshold
output_dir: Optional output directory
Returns:
Dictionary with OCR results in legacy format
"""
if self.dual_track_enabled:
# Use dual-track but convert to legacy format
result = self.process_with_dual_track(
file_path, lang, detect_layout, confidence_threshold, output_dir
)
# Convert UnifiedDocument to legacy format if needed
if isinstance(result, UnifiedDocument):
return result.to_legacy_format()
else:
return result
else:
# Use traditional processing
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir
)
def get_track_recommendation(self, file_path: Path) -> Optional[ProcessingTrackRecommendation]:
"""
Get processing track recommendation for a file.
Args:
file_path: Path to document file
Returns:
ProcessingTrackRecommendation if dual-track is enabled, None otherwise
"""
if not self.dual_track_enabled:
return None
try:
return self.document_detector.detect(file_path)
except Exception as e:
logger.error(f"Error getting track recommendation: {e}")
return None
def save_results(
self,
result: Union[UnifiedDocument, Dict],
output_dir: Path,
file_id: str,
source_file_path: Optional[Path] = None
) -> Tuple[Optional[Path], Optional[Path], Optional[Path]]:
"""
Save OCR results to JSON, Markdown, and layout-preserving PDF files
Args:
result: OCR result (UnifiedDocument or dictionary)
output_dir: Output directory
file_id: Unique file identifier
source_file_path: Optional path to original source file for PDF generation
Returns:
Tuple of (json_path, markdown_path, pdf_path)
"""
try:
output_dir.mkdir(parents=True, exist_ok=True)
# Use UnifiedDocumentExporter for standardized export
if isinstance(result, UnifiedDocument) and UnifiedDocumentExporter is not None:
# Use the new exporter for UnifiedDocument
json_path = output_dir / f"{file_id}_result.json"
UnifiedDocumentExporter.export_to_json(
result,
json_path,
include_metadata=True,
include_statistics=True
)
markdown_path = output_dir / f"{file_id}_output.md"
UnifiedDocumentExporter.export_to_markdown(
result,
markdown_path,
include_metadata_header=False # Keep output clean
)
markdown_content = result.extract_all_text()
else:
# Legacy path for dict results
result_dict = result if isinstance(result, dict) else result.to_dict()
markdown_content = result.get('markdown_content', '') if isinstance(result, dict) else ''
# Save JSON
json_path = output_dir / f"{file_id}_result.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result_dict, f, ensure_ascii=False, indent=2)
# Save Markdown
markdown_path = output_dir / f"{file_id}_output.md"
with open(markdown_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
logger.info(f"Results saved: {json_path.name}, {markdown_path.name}")
# Generate layout-preserving PDF
pdf_path = None
try:
from app.services.pdf_generator_service import pdf_generator_service
pdf_filename = f"{file_id}_layout.pdf"
pdf_path = output_dir / pdf_filename
logger.info(f"Generating layout-preserving PDF: {pdf_filename}")
# Use appropriate method based on result type
if isinstance(result, UnifiedDocument):
# Use direct UnifiedDocument generation for better accuracy
success = pdf_generator_service.generate_from_unified_document(
unified_doc=result,
output_path=pdf_path,
source_file_path=source_file_path
)
else:
# Legacy path: use JSON file
success = pdf_generator_service.generate_layout_pdf(
json_path=json_path,
output_path=pdf_path,
source_file_path=source_file_path
)
if success:
logger.info(f"✓ PDF generated successfully: {pdf_path.name}")
else:
logger.warning(f"✗ PDF generation failed for {file_id}")
pdf_path = None
except Exception as e:
logger.error(f"Error generating PDF for {file_id}: {str(e)}")
import traceback
traceback.print_exc()
pdf_path = None
return json_path, markdown_path, pdf_path
except Exception as e:
logger.error(f"Error saving results: {str(e)}")
return None, None, None