""" Tool_OCR - Core OCR Service PaddleOCR-VL integration for text and structure extraction """ import json import logging from pathlib import Path from typing import Dict, List, Optional, Tuple from datetime import datetime import uuid from paddleocr import PaddleOCR, PPStructureV3 from PIL import Image from pdf2image import convert_from_path from app.core.config import settings from app.services.office_converter import OfficeConverter, OfficeConverterError logger = logging.getLogger(__name__) class OCRService: """ Core OCR service using PaddleOCR-VL Handles text recognition and document structure analysis """ def __init__(self): """Initialize PaddleOCR and PPStructure engines""" self.ocr_languages = settings.ocr_languages_list self.confidence_threshold = settings.ocr_confidence_threshold # Initialize PaddleOCR engine (will be lazy-loaded per language) self.ocr_engines = {} # Initialize PP-Structure for layout analysis self.structure_engine = None # Initialize Office document converter self.office_converter = OfficeConverter() logger.info("OCR Service initialized") def get_ocr_engine(self, lang: str = 'ch') -> PaddleOCR: """ Get or create OCR engine for specified language Args: lang: Language code (ch, en, japan, korean, etc.) Returns: PaddleOCR engine instance """ if lang not in self.ocr_engines: logger.info(f"Initializing PaddleOCR engine for language: {lang}") self.ocr_engines[lang] = PaddleOCR( use_angle_cls=True, lang=lang, # Note: show_log and use_gpu parameters removed in PaddleOCR 3.x ) logger.info(f"PaddleOCR engine ready for {lang}") return self.ocr_engines[lang] def get_structure_engine(self) -> PPStructureV3: """ Get or create PP-Structure engine for layout analysis Returns: PPStructure engine instance """ if self.structure_engine is None: logger.info("Initializing PP-StructureV3 engine") self.structure_engine = PPStructureV3( use_doc_orientation_classify=False, use_doc_unwarping=False, use_textline_orientation=False, use_table_recognition=True, use_formula_recognition=True, layout_threshold=0.5, ) logger.info("PP-StructureV3 engine ready") return self.structure_engine def convert_pdf_to_images(self, pdf_path: Path, output_dir: Path) -> List[Path]: """ Convert PDF to images (one per page) Args: pdf_path: Path to PDF file output_dir: Directory to save converted images Returns: List of paths to converted images """ try: output_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Converting PDF {pdf_path.name} to images") # Convert PDF to images (300 DPI for good quality) images = convert_from_path( str(pdf_path), dpi=300, fmt='png' ) image_paths = [] for i, image in enumerate(images): # Save each page as PNG image_path = output_dir / f"{pdf_path.stem}_page_{i+1}.png" image.save(str(image_path), 'PNG') image_paths.append(image_path) logger.info(f"Saved page {i+1} to {image_path.name}") logger.info(f"Converted {len(image_paths)} pages from PDF") return image_paths except Exception as e: logger.error(f"PDF conversion error: {str(e)}") raise def process_image( self, image_path: Path, lang: str = 'ch', detect_layout: bool = True, confidence_threshold: Optional[float] = None ) -> Dict: """ Process single image with OCR and layout analysis Args: image_path: Path to image file lang: Language for OCR detect_layout: Whether to perform layout analysis confidence_threshold: Minimum confidence threshold (uses default if None) Returns: Dictionary with OCR results and metadata """ start_time = datetime.now() threshold = confidence_threshold if confidence_threshold is not None else self.confidence_threshold try: # Check if file is Office document if self.office_converter.is_office_document(image_path): logger.info(f"Detected Office document: {image_path.name}, converting to PDF") try: # Convert Office document to PDF pdf_path = self.office_converter.convert_to_pdf(image_path) logger.info(f"Office document converted to PDF: {pdf_path.name}") # Process the PDF (will be handled by PDF processing logic below) image_path = pdf_path except OfficeConverterError as e: logger.error(f"Office conversion failed: {str(e)}") raise # Check if file is PDF is_pdf = image_path.suffix.lower() == '.pdf' if is_pdf: # Convert PDF to images logger.info(f"Detected PDF file: {image_path.name}, converting to images") pdf_images_dir = image_path.parent / f"{image_path.stem}_pages" image_paths = self.convert_pdf_to_images(image_path, pdf_images_dir) # Process all pages all_text_regions = [] total_confidence_sum = 0.0 total_valid_regions = 0 all_layout_data = [] all_images_metadata = [] for page_num, page_image_path in enumerate(image_paths, 1): logger.info(f"Processing PDF page {page_num}/{len(image_paths)}") # Process each page page_result = self.process_image( page_image_path, lang=lang, detect_layout=detect_layout, confidence_threshold=confidence_threshold ) # Accumulate results if page_result['status'] == 'success': # Add page number to each text region for region in page_result['text_regions']: region['page'] = page_num all_text_regions.append(region) total_confidence_sum += page_result['average_confidence'] * page_result['total_text_regions'] total_valid_regions += page_result['total_text_regions'] # Accumulate layout data if page_result.get('layout_data'): all_layout_data.append(page_result['layout_data']) # Accumulate images metadata if page_result.get('images_metadata'): all_images_metadata.extend(page_result['images_metadata']) # Calculate overall average confidence avg_confidence = total_confidence_sum / total_valid_regions if total_valid_regions > 0 else 0.0 # Combine layout data from all pages combined_layout = None if all_layout_data: combined_elements = [] for layout in all_layout_data: if layout.get('elements'): combined_elements.extend(layout['elements']) if combined_elements: combined_layout = { 'elements': combined_elements, 'total_elements': len(combined_elements), 'reading_order': list(range(len(combined_elements))), } # Generate combined markdown markdown_content = self.generate_markdown(all_text_regions, combined_layout) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() logger.info( f"PDF processing completed: {image_path.name} - " f"{len(image_paths)} pages, " f"{len(all_text_regions)} regions, " f"{avg_confidence:.2f} avg confidence, " f"{processing_time:.2f}s" ) return { 'status': 'success', 'file_name': image_path.name, 'language': lang, 'text_regions': all_text_regions, 'total_text_regions': len(all_text_regions), 'average_confidence': avg_confidence, 'layout_data': combined_layout, 'images_metadata': all_images_metadata, 'markdown_content': markdown_content, 'processing_time': processing_time, 'timestamp': datetime.utcnow().isoformat(), 'total_pages': len(image_paths), } # Get OCR engine (for non-PDF images) ocr_engine = self.get_ocr_engine(lang) # Perform OCR logger.info(f"Processing image: {image_path.name}") # Note: In PaddleOCR 3.x, use_angle_cls is set during initialization, not in ocr() call ocr_results = ocr_engine.ocr(str(image_path)) # Parse OCR results (PaddleOCR 3.x format) text_regions = [] total_confidence = 0.0 valid_regions = 0 if ocr_results and isinstance(ocr_results, (list, tuple)) and len(ocr_results) > 0: # PaddleOCR 3.x returns a list of dictionaries (one per page) for page_result in ocr_results: if isinstance(page_result, dict): # New format: {'rec_texts': [...], 'rec_scores': [...], 'rec_polys': [...]} texts = page_result.get('rec_texts', []) scores = page_result.get('rec_scores', []) polys = page_result.get('rec_polys', []) # Process each recognized text for idx, text in enumerate(texts): # Get corresponding score and bbox confidence = scores[idx] if idx < len(scores) else 1.0 bbox = polys[idx] if idx < len(polys) else [] # Convert numpy array bbox to list for JSON serialization if hasattr(bbox, 'tolist'): bbox = bbox.tolist() # Filter by confidence threshold if confidence >= threshold: text_regions.append({ 'text': text, 'bbox': bbox, 'confidence': float(confidence), }) total_confidence += confidence valid_regions += 1 avg_confidence = total_confidence / valid_regions if valid_regions > 0 else 0.0 logger.info(f"Parsed {len(text_regions)} text regions with avg confidence {avg_confidence:.3f}") # Layout analysis (if requested) layout_data = None images_metadata = [] if detect_layout: layout_data, images_metadata = self.analyze_layout(image_path) # Generate Markdown markdown_content = self.generate_markdown(text_regions, layout_data) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() result = { 'status': 'success', 'file_name': image_path.name, 'language': lang, 'text_regions': text_regions, 'total_text_regions': len(text_regions), 'average_confidence': avg_confidence, 'layout_data': layout_data, 'images_metadata': images_metadata, 'markdown_content': markdown_content, 'processing_time': processing_time, 'timestamp': datetime.utcnow().isoformat(), } logger.info( f"OCR completed: {image_path.name} - " f"{len(text_regions)} regions, " f"{avg_confidence:.2f} avg confidence, " f"{processing_time:.2f}s" ) return result except Exception as e: import traceback error_trace = traceback.format_exc() logger.error(f"OCR processing error for {image_path.name}: {str(e)}\n{error_trace}") return { 'status': 'error', 'file_name': image_path.name, 'error_message': str(e), 'processing_time': (datetime.now() - start_time).total_seconds(), } def analyze_layout(self, image_path: Path) -> Tuple[Optional[Dict], List[Dict]]: """ Analyze document layout using PP-StructureV3 Args: image_path: Path to image file Returns: Tuple of (layout_data, images_metadata) """ try: structure_engine = self.get_structure_engine() # Perform structure analysis using predict() method (PaddleOCR 3.x API) logger.info(f"Running layout analysis on {image_path.name}") results = structure_engine.predict(str(image_path)) layout_elements = [] images_metadata = [] # Process each page result (for images, usually just one page) for page_idx, page_result in enumerate(results): # Get markdown dictionary from result object if hasattr(page_result, 'markdown'): markdown_dict = page_result.markdown logger.info(f"Page {page_idx} markdown keys: {markdown_dict.keys() if isinstance(markdown_dict, dict) else type(markdown_dict)}") # Extract layout information from markdown structure if isinstance(markdown_dict, dict): # Get markdown texts (HTML format with tables and structure) markdown_texts = markdown_dict.get('markdown_texts', '') markdown_images = markdown_dict.get('markdown_images', {}) # Create a layout element for the structured content if markdown_texts: # Parse HTML content to identify tables and text import re # Check if content contains tables has_table = ' str: """ Generate Markdown from OCR results Args: text_regions: List of text regions with bbox and text layout_data: Optional layout structure information Returns: Markdown formatted string """ markdown_lines = [] if layout_data and layout_data.get('elements'): # Generate structured Markdown based on layout for element in layout_data['elements']: element_type = element.get('type', 'text') content = element.get('content', '') if element_type == 'title': markdown_lines.append(f"# {content}\n") elif element_type == 'table': # Table in HTML format markdown_lines.append(content) markdown_lines.append("") elif element_type == 'figure': element_id = element.get('element_id') markdown_lines.append(f"![Figure {element_id}](./images/img_{element_id}.jpg)\n") else: markdown_lines.append(f"{content}\n") else: # Simple Markdown from text regions only # Sort by vertical position (top to bottom) def get_y_coord(region): """Safely extract Y coordinate from bbox""" bbox = region.get('bbox', []) if isinstance(bbox, (list, tuple)) and len(bbox) > 0: if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) > 1: return bbox[0][1] # [[x1,y1], [x2,y2], ...] format elif len(bbox) > 1: return bbox[1] # [x1, y1, x2, y2, ...] format return 0 # Default to 0 if can't extract sorted_regions = sorted(text_regions, key=get_y_coord) for region in sorted_regions: text = region['text'] markdown_lines.append(text) return "\n".join(markdown_lines) def save_results( self, result: Dict, output_dir: Path, file_id: str ) -> Tuple[Optional[Path], Optional[Path]]: """ Save OCR results to JSON and Markdown files Args: result: OCR result dictionary output_dir: Output directory file_id: Unique file identifier Returns: Tuple of (json_path, markdown_path) """ try: output_dir.mkdir(parents=True, exist_ok=True) # Save JSON json_path = output_dir / f"{file_id}_result.json" with open(json_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) # Save Markdown markdown_path = output_dir / f"{file_id}_output.md" markdown_content = result.get('markdown_content', '') with open(markdown_path, 'w', encoding='utf-8') as f: f.write(markdown_content) logger.info(f"Results saved: {json_path.name}, {markdown_path.name}") return json_path, markdown_path except Exception as e: logger.error(f"Error saving results: {str(e)}") return None, None