""" Tool_OCR - Core OCR Service with Dual-track Processing Supports both PaddleOCR (for scanned documents) and direct extraction (for editable PDFs) """ import json import logging from pathlib import Path from typing import Dict, List, Optional, Tuple, Union from datetime import datetime import uuid from paddleocr import PaddleOCR, PPStructureV3 from PIL import Image from pdf2image import convert_from_path import paddle from app.core.config import settings from app.services.office_converter import OfficeConverter, OfficeConverterError # Import dual-track components try: from app.services.document_type_detector import DocumentTypeDetector, ProcessingTrackRecommendation from app.services.direct_extraction_engine import DirectExtractionEngine from app.services.ocr_to_unified_converter import OCRToUnifiedConverter from app.services.unified_document_exporter import UnifiedDocumentExporter from app.models.unified_document import ( UnifiedDocument, DocumentMetadata, ProcessingTrack, ElementType, DocumentElement, Page, Dimensions, BoundingBox, ProcessingInfo ) DUAL_TRACK_AVAILABLE = True except ImportError as e: logging.getLogger(__name__).warning(f"Dual-track components not available: {e}") DUAL_TRACK_AVAILABLE = False UnifiedDocumentExporter = None logger = logging.getLogger(__name__) class OCRService: """ Core OCR service using PaddleOCR-VL Handles text recognition and document structure analysis """ def __init__(self): """Initialize PaddleOCR and PPStructure engines with GPU detection and dual-track support""" self.ocr_languages = settings.ocr_languages_list self.confidence_threshold = settings.ocr_confidence_threshold # Initialize PaddleOCR engine (will be lazy-loaded per language) self.ocr_engines = {} # Initialize PP-Structure for layout analysis self.structure_engine = None # Initialize Office document converter self.office_converter = OfficeConverter() # Initialize dual-track components if available if DUAL_TRACK_AVAILABLE: self.document_detector = DocumentTypeDetector( min_text_length=100, sample_pages=3, text_coverage_threshold=0.9 ) self.direct_extraction_engine = DirectExtractionEngine( enable_table_detection=True, enable_image_extraction=True ) self.ocr_to_unified_converter = OCRToUnifiedConverter() self.dual_track_enabled = True logger.info("Dual-track processing enabled") else: self.document_detector = None self.direct_extraction_engine = None self.ocr_to_unified_converter = None self.dual_track_enabled = False logger.info("Dual-track processing not available, using OCR-only mode") # GPU Detection and Configuration self.gpu_available = False self.use_gpu = False self.gpu_info = {} # Model cache management for memory optimization self._model_last_used = {} # Track last usage time for each model self._memory_warning_logged = False self._detect_and_configure_gpu() # Log GPU optimization settings if settings.enable_memory_optimization: logger.info(f"GPU memory optimization enabled:") logger.info(f" - Memory limit: {settings.gpu_memory_limit_mb}MB") logger.info(f" - Model cache limit: {settings.model_cache_limit_mb}MB") logger.info(f" - Batch size: {settings.inference_batch_size}") logger.info(f" - Auto-unload unused models: {settings.auto_unload_unused_models}") logger.info("OCR Service initialized") def _detect_and_configure_gpu(self): """Detect GPU availability and configure usage""" try: # Check if forced CPU mode if settings.force_cpu_mode: logger.info("GPU mode forced to CPU by configuration") self.use_gpu = False self.gpu_info = { 'available': False, 'reason': 'CPU mode forced by configuration', } return # Check if PaddlePaddle is compiled with CUDA if paddle.is_compiled_with_cuda(): # Check if GPU devices are available gpu_count = paddle.device.cuda.device_count() if gpu_count > 0: self.gpu_available = True self.use_gpu = True # Get GPU device information device_id = settings.gpu_device_id if settings.gpu_device_id < gpu_count else 0 gpu_props = paddle.device.cuda.get_device_properties(device_id) self.gpu_info = { 'available': True, 'device_count': gpu_count, 'device_id': device_id, 'device_name': gpu_props.name, 'total_memory': gpu_props.total_memory, 'compute_capability': f"{gpu_props.major}.{gpu_props.minor}", } # Set GPU memory fraction try: paddle.device.set_device(f'gpu:{device_id}') logger.info(f"GPU {device_id} selected: {gpu_props.name}") logger.info(f"GPU memory: {gpu_props.total_memory / (1024**3):.2f} GB") logger.info(f"Compute capability: {gpu_props.major}.{gpu_props.minor}") logger.info(f"GPU memory fraction set to: {settings.gpu_memory_fraction}") except Exception as e: logger.warning(f"Failed to configure GPU device: {e}") self.use_gpu = False self.gpu_info['available'] = False self.gpu_info['reason'] = f'GPU configuration failed: {str(e)}' else: logger.warning("CUDA is available but no GPU devices found") self.gpu_info = { 'available': False, 'reason': 'CUDA compiled but no GPU devices detected', } else: logger.info("PaddlePaddle not compiled with CUDA support") self.gpu_info = { 'available': False, 'reason': 'PaddlePaddle not compiled with CUDA', } except Exception as e: logger.error(f"GPU detection failed: {e}") self.use_gpu = False self.gpu_info = { 'available': False, 'reason': f'GPU detection error: {str(e)}', } # Log final GPU status if self.use_gpu: logger.info(f"✓ GPU acceleration ENABLED - Using {self.gpu_info.get('device_name', 'Unknown GPU')}") else: reason = self.gpu_info.get('reason', 'Unknown') logger.info(f"ℹ GPU acceleration DISABLED - {reason} - Using CPU mode") def get_gpu_status(self) -> Dict: """ Get current GPU status and information Returns: Dictionary with GPU status information """ status = { 'gpu_enabled': self.use_gpu, 'gpu_available': self.gpu_available, **self.gpu_info, } # Add current GPU memory usage if GPU is being used if self.use_gpu and self.gpu_available: try: device_id = self.gpu_info.get('device_id', 0) # Get memory info (returns allocated, total in bytes) memory_allocated = paddle.device.cuda.memory_allocated(device_id) memory_reserved = paddle.device.cuda.memory_reserved(device_id) total_memory = self.gpu_info.get('total_memory', 0) status['memory_allocated_mb'] = memory_allocated / (1024**2) status['memory_reserved_mb'] = memory_reserved / (1024**2) status['memory_total_mb'] = total_memory / (1024**2) status['memory_utilization'] = (memory_allocated / total_memory * 100) if total_memory > 0 else 0 except Exception as e: logger.warning(f"Failed to get GPU memory info: {e}") return status def _check_gpu_memory_usage(self): """ Check GPU memory usage and log warnings if approaching limits. Implements memory optimization for RTX 4060 8GB. """ if not self.use_gpu or not settings.enable_memory_optimization: return try: device_id = self.gpu_info.get('device_id', 0) memory_allocated = paddle.device.cuda.memory_allocated(device_id) memory_allocated_mb = memory_allocated / (1024**2) memory_limit_mb = settings.gpu_memory_limit_mb utilization = (memory_allocated_mb / memory_limit_mb * 100) if memory_limit_mb > 0 else 0 if utilization > 90 and not self._memory_warning_logged: logger.warning(f"GPU memory usage high: {memory_allocated_mb:.0f}MB / {memory_limit_mb}MB ({utilization:.1f}%)") logger.warning("Consider enabling auto_unload_unused_models or reducing batch size") self._memory_warning_logged = True elif utilization > 75: logger.info(f"GPU memory: {memory_allocated_mb:.0f}MB / {memory_limit_mb}MB ({utilization:.1f}%)") except Exception as e: logger.debug(f"Memory check failed: {e}") def _cleanup_unused_models(self): """ Clean up unused language models to free GPU memory. Models idle longer than model_idle_timeout_seconds will be unloaded. """ if not settings.auto_unload_unused_models: return current_time = datetime.now() timeout = settings.model_idle_timeout_seconds models_to_remove = [] for lang, last_used in self._model_last_used.items(): if lang == 'structure': # Don't unload structure engine continue idle_seconds = (current_time - last_used).total_seconds() if idle_seconds > timeout: models_to_remove.append(lang) for lang in models_to_remove: if lang in self.ocr_engines: logger.info(f"Unloading idle OCR engine for {lang} (idle {timeout}s)") del self.ocr_engines[lang] del self._model_last_used[lang] if models_to_remove and self.use_gpu: # Clear CUDA cache try: paddle.device.cuda.empty_cache() logger.info(f"Cleared CUDA cache after unloading {len(models_to_remove)} models") except Exception as e: logger.debug(f"Cache clear failed: {e}") def clear_gpu_cache(self): """ Manually clear GPU memory cache. Useful after processing large documents. """ if not self.use_gpu: return try: paddle.device.cuda.empty_cache() logger.info("GPU cache cleared") except Exception as e: logger.warning(f"Failed to clear GPU cache: {e}") def get_ocr_engine(self, lang: str = 'ch') -> PaddleOCR: """ Get or create OCR engine for specified language with GPU support Args: lang: Language code (ch, en, japan, korean, etc.) Returns: PaddleOCR engine instance """ # Clean up unused models before loading new ones (memory optimization) if settings.auto_unload_unused_models: self._cleanup_unused_models() if lang not in self.ocr_engines: logger.info(f"Initializing PaddleOCR engine for language: {lang} (GPU: {self.use_gpu})") try: # PaddleOCR 3.x: Device is set globally via paddle.set_device() # No need to pass device/use_gpu/gpu_mem parameters self.ocr_engines[lang] = PaddleOCR( lang=lang, use_textline_orientation=True, # Replaces deprecated use_angle_cls ) # Track model loading for cache management self._model_last_used[lang] = datetime.now() logger.info(f"PaddleOCR engine ready for {lang} (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)") # Check GPU memory after loading if self.use_gpu and settings.enable_memory_optimization: self._check_gpu_memory_usage() except Exception as e: # If GPU initialization fails, fall back to CPU if self.use_gpu: logger.warning(f"GPU initialization failed, falling back to CPU: {e}") self.use_gpu = False # Switch to CPU device globally paddle.set_device('cpu') self.ocr_engines[lang] = PaddleOCR( lang=lang, use_textline_orientation=True, ) self._model_last_used[lang] = datetime.now() logger.info(f"PaddleOCR engine ready for {lang} (CPU mode - fallback)") else: raise else: # Update last used time for existing engine self._model_last_used[lang] = datetime.now() return self.ocr_engines[lang] def get_structure_engine(self) -> PPStructureV3: """ Get or create PP-Structure engine for layout analysis with GPU support Returns: PPStructure engine instance """ if self.structure_engine is None: logger.info(f"Initializing PP-StructureV3 engine (GPU: {self.use_gpu})") try: # PaddleOCR 3.x: Device is set globally via paddle.set_device() # Use configuration settings for memory optimization use_chart = settings.enable_chart_recognition use_formula = settings.enable_formula_recognition use_table = settings.enable_table_recognition layout_threshold = settings.layout_detection_threshold logger.info(f"PP-StructureV3 config: table={use_table}, formula={use_formula}, chart={use_chart}") self.structure_engine = PPStructureV3( use_doc_orientation_classify=False, use_doc_unwarping=False, use_textline_orientation=False, use_table_recognition=use_table, use_formula_recognition=use_formula, use_chart_recognition=use_chart, # Disabled by default to save ~500MB VRAM layout_threshold=layout_threshold, ) # Track model loading for cache management self._model_last_used['structure'] = datetime.now() logger.info(f"PP-StructureV3 engine ready (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)") # Check GPU memory after loading if self.use_gpu and settings.enable_memory_optimization: self._check_gpu_memory_usage() except Exception as e: # If GPU initialization fails, fall back to CPU if self.use_gpu: logger.warning(f"GPU initialization failed for PP-Structure, falling back to CPU: {e}") self.use_gpu = False # Switch to CPU device globally paddle.set_device('cpu') use_chart = settings.enable_chart_recognition use_formula = settings.enable_formula_recognition use_table = settings.enable_table_recognition layout_threshold = settings.layout_detection_threshold self.structure_engine = PPStructureV3( use_doc_orientation_classify=False, use_doc_unwarping=False, use_textline_orientation=False, use_table_recognition=use_table, use_formula_recognition=use_formula, use_chart_recognition=use_chart, layout_threshold=layout_threshold, ) logger.info("PP-StructureV3 engine ready (CPU mode - fallback)") else: raise return self.structure_engine def convert_pdf_to_images(self, pdf_path: Path, output_dir: Path) -> List[Path]: """ Convert PDF to images (one per page) Args: pdf_path: Path to PDF file output_dir: Directory to save converted images Returns: List of paths to converted images """ try: output_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Converting PDF {pdf_path.name} to images") # Convert PDF to images (300 DPI for good quality) images = convert_from_path( str(pdf_path), dpi=300, fmt='png' ) image_paths = [] for i, image in enumerate(images): # Save each page as PNG image_path = output_dir / f"{pdf_path.stem}_page_{i+1}.png" image.save(str(image_path), 'PNG') image_paths.append(image_path) logger.info(f"Saved page {i+1} to {image_path.name}") logger.info(f"Converted {len(image_paths)} pages from PDF") return image_paths except Exception as e: logger.error(f"PDF conversion error: {str(e)}") raise def process_image( self, image_path: Path, lang: str = 'ch', detect_layout: bool = True, confidence_threshold: Optional[float] = None, output_dir: Optional[Path] = None, current_page: int = 0 ) -> Dict: """ Process single image with OCR and layout analysis Args: image_path: Path to image file lang: Language for OCR detect_layout: Whether to perform layout analysis confidence_threshold: Minimum confidence threshold (uses default if None) output_dir: Optional output directory for saving extracted images current_page: Current page number (0-based) for multi-page documents Returns: Dictionary with OCR results and metadata """ start_time = datetime.now() threshold = confidence_threshold if confidence_threshold is not None else self.confidence_threshold try: # Check if file is Office document if self.office_converter.is_office_document(image_path): logger.info(f"Detected Office document: {image_path.name}, converting to PDF") try: # Convert Office document to PDF pdf_path = self.office_converter.convert_to_pdf(image_path) logger.info(f"Office document converted to PDF: {pdf_path.name}") # Process the PDF (will be handled by PDF processing logic below) image_path = pdf_path except OfficeConverterError as e: logger.error(f"Office conversion failed: {str(e)}") raise # Check if file is PDF is_pdf = image_path.suffix.lower() == '.pdf' if is_pdf: # Convert PDF to images logger.info(f"Detected PDF file: {image_path.name}, converting to images") pdf_images_dir = image_path.parent / f"{image_path.stem}_pages" image_paths = self.convert_pdf_to_images(image_path, pdf_images_dir) # Process all pages all_text_regions = [] total_confidence_sum = 0.0 total_valid_regions = 0 all_layout_data = [] all_images_metadata = [] all_ocr_dimensions = [] for page_num, page_image_path in enumerate(image_paths, 1): logger.info(f"Processing PDF page {page_num}/{len(image_paths)}") # Process each page with correct page number (0-based for layout data) page_result = self.process_image( page_image_path, lang=lang, detect_layout=detect_layout, confidence_threshold=confidence_threshold, output_dir=output_dir, current_page=page_num - 1 # Convert to 0-based page number for layout data ) # Accumulate results if page_result['status'] == 'success': # Add page number to each text region for region in page_result['text_regions']: region['page'] = page_num all_text_regions.append(region) total_confidence_sum += page_result['average_confidence'] * page_result['total_text_regions'] total_valid_regions += page_result['total_text_regions'] # Accumulate layout data (page numbers already set correctly in analyze_layout) if page_result.get('layout_data'): layout_data = page_result['layout_data'] all_layout_data.append(layout_data) # Accumulate images metadata (page numbers already set correctly in analyze_layout) if page_result.get('images_metadata'): all_images_metadata.extend(page_result['images_metadata']) # Store OCR dimensions for each page if page_result.get('ocr_dimensions'): all_ocr_dimensions.append({ 'page': page_num, 'width': page_result['ocr_dimensions']['width'], 'height': page_result['ocr_dimensions']['height'] }) # Calculate overall average confidence avg_confidence = total_confidence_sum / total_valid_regions if total_valid_regions > 0 else 0.0 # Combine layout data from all pages combined_layout = None if all_layout_data: combined_elements = [] for layout in all_layout_data: if layout.get('elements'): combined_elements.extend(layout['elements']) if combined_elements: combined_layout = { 'elements': combined_elements, 'total_elements': len(combined_elements), 'reading_order': list(range(len(combined_elements))), } # Generate combined markdown markdown_content = self.generate_markdown(all_text_regions, combined_layout) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() logger.info( f"PDF processing completed: {image_path.name} - " f"{len(image_paths)} pages, " f"{len(all_text_regions)} regions, " f"{avg_confidence:.2f} avg confidence, " f"{processing_time:.2f}s" ) return { 'status': 'success', 'file_name': image_path.name, 'language': lang, 'text_regions': all_text_regions, 'total_text_regions': len(all_text_regions), 'average_confidence': avg_confidence, 'layout_data': combined_layout, 'images_metadata': all_images_metadata, 'markdown_content': markdown_content, 'processing_time': processing_time, 'timestamp': datetime.utcnow().isoformat(), 'total_pages': len(image_paths), 'ocr_dimensions': all_ocr_dimensions if all_ocr_dimensions else None, } # Get OCR engine (for non-PDF images) ocr_engine = self.get_ocr_engine(lang) # Get the actual image dimensions that OCR will use from PIL import Image with Image.open(image_path) as img: ocr_width, ocr_height = img.size logger.info(f"OCR processing image dimensions: {ocr_width}x{ocr_height}") # Perform OCR logger.info(f"Processing image: {image_path.name}") # Note: In PaddleOCR 3.x, use_angle_cls is set during initialization, not in ocr() call ocr_results = ocr_engine.ocr(str(image_path)) # Parse OCR results (PaddleOCR 3.x format) text_regions = [] total_confidence = 0.0 valid_regions = 0 if ocr_results and isinstance(ocr_results, (list, tuple)) and len(ocr_results) > 0: # PaddleOCR 3.x returns a list of dictionaries (one per page) for page_result in ocr_results: if isinstance(page_result, dict): # New format: {'rec_texts': [...], 'rec_scores': [...], 'rec_polys': [...]} texts = page_result.get('rec_texts', []) scores = page_result.get('rec_scores', []) polys = page_result.get('rec_polys', []) # Process each recognized text for idx, text in enumerate(texts): # Get corresponding score and bbox confidence = scores[idx] if idx < len(scores) else 1.0 bbox = polys[idx] if idx < len(polys) else [] # Convert numpy array bbox to list for JSON serialization if hasattr(bbox, 'tolist'): bbox = bbox.tolist() # Filter by confidence threshold if confidence >= threshold: text_regions.append({ 'text': text, 'bbox': bbox, 'confidence': float(confidence), }) total_confidence += confidence valid_regions += 1 avg_confidence = total_confidence / valid_regions if valid_regions > 0 else 0.0 logger.info(f"Parsed {len(text_regions)} text regions with avg confidence {avg_confidence:.3f}") # Layout analysis (if requested) layout_data = None images_metadata = [] if detect_layout: # Pass current_page to analyze_layout for correct page numbering layout_data, images_metadata = self.analyze_layout(image_path, output_dir=output_dir, current_page=current_page) # Generate Markdown markdown_content = self.generate_markdown(text_regions, layout_data) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() result = { 'status': 'success', 'file_name': image_path.name, 'language': lang, 'text_regions': text_regions, 'total_text_regions': len(text_regions), 'average_confidence': avg_confidence, 'layout_data': layout_data, 'images_metadata': images_metadata, 'markdown_content': markdown_content, 'processing_time': processing_time, 'timestamp': datetime.utcnow().isoformat(), 'ocr_dimensions': { 'width': ocr_width, 'height': ocr_height } } # If layout data is enhanced, add enhanced results for converter if layout_data and layout_data.get('enhanced'): result['enhanced_results'] = [{ 'elements': layout_data.get('elements', []), 'reading_order': layout_data.get('reading_order', []), 'element_types': layout_data.get('element_types', {}), 'page': current_page, 'width': ocr_width, 'height': ocr_height }] logger.info( f"OCR completed: {image_path.name} - " f"{len(text_regions)} regions, " f"{avg_confidence:.2f} avg confidence, " f"{processing_time:.2f}s" ) return result except Exception as e: import traceback error_trace = traceback.format_exc() logger.error(f"OCR processing error for {image_path.name}: {str(e)}\n{error_trace}") return { 'status': 'error', 'file_name': image_path.name, 'error_message': str(e), 'processing_time': (datetime.now() - start_time).total_seconds(), } def _extract_table_text(self, html_content: str) -> str: """ Extract text from HTML table content for translation purposes Args: html_content: HTML content containing table Returns: Extracted text from table cells """ try: from html.parser import HTMLParser class TableTextExtractor(HTMLParser): def __init__(self): super().__init__() self.text_parts = [] self.in_table = False def handle_starttag(self, tag, attrs): if tag == 'table': self.in_table = True def handle_endtag(self, tag): if tag == 'table': self.in_table = False elif tag in ('td', 'th') and self.in_table: self.text_parts.append(' | ') # Cell separator elif tag == 'tr' and self.in_table: self.text_parts.append('\n') # Row separator def handle_data(self, data): if self.in_table: stripped = data.strip() if stripped: self.text_parts.append(stripped) parser = TableTextExtractor() parser.feed(html_content) # Clean up the extracted text extracted = ''.join(parser.text_parts) # Remove multiple separators import re extracted = re.sub(r'\s*\|\s*\|+\s*', ' | ', extracted) extracted = re.sub(r'\n+', '\n', extracted) extracted = extracted.strip() return extracted except Exception as e: logger.warning(f"Failed to extract table text: {e}") # Fallback: just remove HTML tags import re text = re.sub(r'<[^>]+>', ' ', html_content) text = re.sub(r'\s+', ' ', text) return text.strip() def analyze_layout(self, image_path: Path, output_dir: Optional[Path] = None, current_page: int = 0) -> Tuple[Optional[Dict], List[Dict]]: """ Analyze document layout using PP-StructureV3 with enhanced element extraction Args: image_path: Path to image file output_dir: Optional output directory for saving extracted images (defaults to image_path.parent) current_page: Current page number (0-based) for multi-page documents Returns: Tuple of (layout_data, images_metadata) """ try: structure_engine = self.get_structure_engine() # Try enhanced processing first try: from app.services.pp_structure_enhanced import PPStructureEnhanced enhanced_processor = PPStructureEnhanced(structure_engine) result = enhanced_processor.analyze_with_full_structure( image_path, output_dir, current_page ) if result.get('has_parsing_res_list'): logger.info(f"Enhanced PP-StructureV3 analysis successful with {result['total_elements']} elements") logger.info(f"Element types found: {result.get('element_types', {})}") # Convert to legacy format for compatibility layout_data = { 'elements': result['elements'], 'total_elements': result['total_elements'], 'reading_order': result['reading_order'], 'element_types': result.get('element_types', {}), 'enhanced': True } # Extract images metadata images_metadata = [] for elem in result.get('images', []): images_metadata.append({ 'element_id': elem['element_id'], 'type': 'image', 'page': elem['page'], 'bbox': elem['bbox'] }) return layout_data, images_metadata else: logger.info("parsing_res_list not available, using standard processing") except ImportError: logger.debug("Enhanced PP-StructureV3 module not available, using standard processing") except Exception as e: logger.warning(f"Enhanced processing failed, falling back to standard: {e}") # Standard processing (original implementation) logger.info(f"Running standard layout analysis on {image_path.name}") results = structure_engine.predict(str(image_path)) layout_elements = [] images_metadata = [] # Process each page result (for images, usually just one page) for page_idx, page_result in enumerate(results): # Get markdown dictionary from result object if hasattr(page_result, 'markdown'): markdown_dict = page_result.markdown logger.info(f"Page {page_idx} markdown keys: {markdown_dict.keys() if isinstance(markdown_dict, dict) else type(markdown_dict)}") # Extract layout information from markdown structure if isinstance(markdown_dict, dict): # Get markdown texts (HTML format with tables and structure) markdown_texts = markdown_dict.get('markdown_texts', '') markdown_images = markdown_dict.get('markdown_images', {}) # Create a layout element for the structured content if markdown_texts: # Parse HTML content to identify tables and text import re # Check if content contains tables has_table = '