""" Enhanced PP-StructureV3 processing with full element extraction This module provides enhanced PP-StructureV3 processing that extracts all 23 element types with their bbox coordinates and reading order. """ import logging from pathlib import Path from typing import Dict, List, Optional, Tuple, Any, TYPE_CHECKING import json import gc # Import ScalingInfo for type checking (avoid circular imports at runtime) if TYPE_CHECKING: from app.services.layout_preprocessing_service import ScalingInfo # Optional torch import for additional GPU memory management try: import torch TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False import paddle from paddleocr import PPStructureV3 from PIL import Image import numpy as np import cv2 from app.models.unified_document import ElementType from app.core.config import settings from app.services.memory_manager import prediction_context from app.services.cv_table_detector import CVTableDetector logger = logging.getLogger(__name__) class PPStructureEnhanced: """ Enhanced PP-StructureV3 processor that extracts all available element types and structure information from parsing_res_list. """ # Mapping from PP-StructureV3 types to our ElementType ELEMENT_TYPE_MAPPING = { 'title': ElementType.TITLE, 'paragraph_title': ElementType.TITLE, # PP-StructureV3 block_label 'text': ElementType.TEXT, 'paragraph': ElementType.PARAGRAPH, 'figure': ElementType.FIGURE, 'figure_caption': ElementType.CAPTION, 'table': ElementType.TABLE, 'table_caption': ElementType.TABLE_CAPTION, 'header': ElementType.HEADER, 'footer': ElementType.FOOTER, 'reference': ElementType.REFERENCE, 'equation': ElementType.EQUATION, 'formula': ElementType.FORMULA, 'list-item': ElementType.LIST_ITEM, 'list': ElementType.LIST, 'code': ElementType.CODE, 'footnote': ElementType.FOOTNOTE, 'page-number': ElementType.PAGE_NUMBER, 'watermark': ElementType.WATERMARK, 'signature': ElementType.SIGNATURE, 'stamp': ElementType.STAMP, 'seal': ElementType.STAMP, # PP-StructureV3 may use 'seal' label 'logo': ElementType.LOGO, 'barcode': ElementType.BARCODE, 'qr-code': ElementType.QR_CODE, # Default fallback 'image': ElementType.IMAGE, 'chart': ElementType.CHART, 'diagram': ElementType.DIAGRAM, } def __init__(self, structure_engine: PPStructureV3): """ Initialize with existing PP-StructureV3 engine. Args: structure_engine: Initialized PPStructureV3 instance """ self.structure_engine = structure_engine def analyze_with_full_structure( self, image_path: Path, output_dir: Optional[Path] = None, current_page: int = 0, preprocessed_image: Optional[Image.Image] = None, scaling_info: Optional['ScalingInfo'] = None, save_visualization: bool = False, use_cv_table_detection: bool = False ) -> Dict[str, Any]: """ Analyze document with full PP-StructureV3 capabilities. Args: image_path: Path to original image file (used for cropping extracted images) output_dir: Optional output directory for saving extracted content current_page: Current page number (0-based) preprocessed_image: Optional preprocessed PIL Image for layout detection. If provided, this is used for PP-Structure prediction, but original image_path is still used for cropping images. scaling_info: Optional ScalingInfo from preprocessing. If image was scaled for layout detection, all bbox coordinates will be scaled back to original image coordinates for proper cropping. save_visualization: If True, save detection visualization images (layout_det_res, layout_order_res, overall_ocr_res, etc.) use_cv_table_detection: If True, use CV-based line detection for wired tables instead of ML-based cell detection (RT-DETR-L) Returns: Dictionary with complete structure information including: - elements: List of all detected elements with types and bbox (in original coords) - reading_order: Reading order indices - images: Extracted images with metadata - tables: Extracted tables with structure - visualization_dir: Path to visualization images (if save_visualization=True) """ try: logger.info(f"Enhanced PP-StructureV3 analysis on {image_path.name}") if preprocessed_image: logger.info("Using preprocessed image for layout detection") # Perform structure analysis with semaphore control # This prevents OOM errors from multiple simultaneous predictions with prediction_context(timeout=settings.service_acquire_timeout_seconds) as acquired: if not acquired: logger.error("Failed to acquire prediction slot (timeout), returning empty result") return { 'has_parsing_res_list': False, 'elements': [], 'total_elements': 0, 'images': [], 'tables': [], 'element_types': {}, 'error': 'Prediction slot timeout' } # Use preprocessed image if provided, otherwise use original path if preprocessed_image is not None: # Convert PIL to numpy array (BGR format for PP-Structure) predict_input = np.array(preprocessed_image) if len(predict_input.shape) == 3 and predict_input.shape[2] == 3: # Convert RGB to BGR predict_input = predict_input[:, :, ::-1] results = self.structure_engine.predict(predict_input) else: results = self.structure_engine.predict(str(image_path)) all_elements = [] all_images = [] all_tables = [] visualization_dir = None # Process each page result for page_idx, page_result in enumerate(results): # Save visualization images if requested if save_visualization and output_dir and hasattr(page_result, 'save_to_img'): try: vis_dir = output_dir / 'visualization' vis_dir.mkdir(parents=True, exist_ok=True) page_result.save_to_img(str(vis_dir)) visualization_dir = vis_dir logger.info(f"Saved visualization images to {vis_dir}") except Exception as e: logger.warning(f"Failed to save visualization images: {e}") # Try to access parsing_res_list and table_res_list (the complete structure) parsing_res_list = None table_res_list = None result_dict = None # Method 1: Direct access to json attribute (check both top-level and res) if hasattr(page_result, 'json'): result_json = page_result.json if isinstance(result_json, dict): result_dict = result_json # Check top-level if 'parsing_res_list' in result_json: parsing_res_list = result_json['parsing_res_list'] logger.info(f"Found parsing_res_list at top level with {len(parsing_res_list)} elements") # Check inside 'res' (new structure in paddlex) elif 'res' in result_json and isinstance(result_json['res'], dict): result_dict = result_json['res'] if 'parsing_res_list' in result_json['res']: parsing_res_list = result_json['res']['parsing_res_list'] logger.info(f"Found parsing_res_list inside 'res' with {len(parsing_res_list)} elements") # Method 2: Try direct dict access (LayoutParsingResultV2 inherits from dict) elif isinstance(page_result, dict): result_dict = page_result if 'parsing_res_list' in page_result: parsing_res_list = page_result['parsing_res_list'] logger.info(f"Found parsing_res_list via dict access with {len(parsing_res_list)} elements") elif 'res' in page_result and isinstance(page_result['res'], dict): result_dict = page_result['res'] if 'parsing_res_list' in page_result['res']: parsing_res_list = page_result['res']['parsing_res_list'] logger.info(f"Found parsing_res_list inside page_result['res'] with {len(parsing_res_list)} elements") # Method 3: Try to access as attribute elif hasattr(page_result, 'parsing_res_list'): parsing_res_list = page_result.parsing_res_list logger.info(f"Found parsing_res_list attribute with {len(parsing_res_list)} elements") if hasattr(page_result, '__dict__'): result_dict = page_result.__dict__ # Method 4: Check if result has to_dict method elif hasattr(page_result, 'to_dict'): result_dict = page_result.to_dict() if 'parsing_res_list' in result_dict: parsing_res_list = result_dict['parsing_res_list'] logger.info(f"Found parsing_res_list in to_dict with {len(parsing_res_list)} elements") elif 'res' in result_dict and isinstance(result_dict['res'], dict): result_dict = result_dict['res'] if 'parsing_res_list' in result_dict: parsing_res_list = result_dict['parsing_res_list'] logger.info(f"Found parsing_res_list in to_dict['res'] with {len(parsing_res_list)} elements") # Extract table_res_list which contains cell_box_list layout_det_res = None if result_dict: if 'table_res_list' in result_dict: table_res_list = result_dict['table_res_list'] logger.info(f"Found table_res_list with {len(table_res_list)} tables") for i, tbl in enumerate(table_res_list): if 'cell_box_list' in tbl: logger.info(f" Table {i}: {len(tbl['cell_box_list'])} cell boxes") # Extract layout_det_res for Image-in-Table processing if 'layout_det_res' in result_dict: layout_det_res = result_dict['layout_det_res'] logger.info(f"Found layout_det_res with {len(layout_det_res.get('boxes', []))} boxes") # Process parsing_res_list if found if parsing_res_list: elements = self._process_parsing_res_list( parsing_res_list, current_page, output_dir, image_path, scaling_info, table_res_list=table_res_list, # Pass table_res_list for cell_box_list layout_det_res=layout_det_res, # Pass layout_det_res for Image-in-Table use_cv_table_detection=use_cv_table_detection # Use CV for wired tables ) all_elements.extend(elements) # Extract tables and images from elements table_bboxes = [] # Collect table bboxes for standalone image filtering for elem in elements: if elem['type'] == ElementType.TABLE: all_tables.append(elem) table_bboxes.append(elem.get('bbox', [0, 0, 0, 0])) elif elem['type'] in [ElementType.IMAGE, ElementType.FIGURE]: all_images.append(elem) # Extract standalone images from layout_det_res (images NOT inside tables) if layout_det_res and image_path and output_dir: standalone_images = self._extract_standalone_images( layout_det_res, table_bboxes, image_path, output_dir, current_page, len(elements), scaling_info ) if standalone_images: all_elements.extend(standalone_images) all_images.extend(standalone_images) logger.info(f"Extracted {len(standalone_images)} standalone images from layout_det_res") else: # Fallback to markdown if parsing_res_list not available logger.warning("parsing_res_list not found, falling back to markdown") elements = self._process_markdown_fallback( page_result, current_page, output_dir ) all_elements.extend(elements) # Create reading order based on element positions reading_order = self._determine_reading_order(all_elements) result = { 'elements': all_elements, 'total_elements': len(all_elements), 'reading_order': reading_order, 'tables': all_tables, 'images': all_images, 'element_types': self._count_element_types(all_elements), 'has_parsing_res_list': parsing_res_list is not None } # Add visualization directory if available if visualization_dir: result['visualization_dir'] = str(visualization_dir) return result except Exception as e: logger.error(f"Enhanced PP-StructureV3 analysis error: {e}") import traceback traceback.print_exc() # Clean up GPU memory on error try: if TORCH_AVAILABLE and torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() if paddle.device.is_compiled_with_cuda(): paddle.device.cuda.empty_cache() gc.collect() except: pass # Ignore cleanup errors return { 'elements': [], 'total_elements': 0, 'reading_order': [], 'tables': [], 'images': [], 'element_types': {}, 'has_parsing_res_list': False, 'error': str(e) } def _process_parsing_res_list( self, parsing_res_list: List[Dict], current_page: int, output_dir: Optional[Path], source_image_path: Optional[Path] = None, scaling_info: Optional['ScalingInfo'] = None, table_res_list: Optional[List[Dict]] = None, layout_det_res: Optional[Dict] = None, use_cv_table_detection: bool = False ) -> List[Dict[str, Any]]: """ Process parsing_res_list to extract all elements. Args: parsing_res_list: List of parsed elements from PP-StructureV3 scaling_info: Scaling information for bbox coordinate restoration current_page: Current page number output_dir: Optional output directory source_image_path: Path to source image for cropping image regions table_res_list: Optional list of table results containing cell_box_list layout_det_res: Optional layout detection result for Image-in-Table processing use_cv_table_detection: If True, use CV line detection for wired tables Returns: List of processed elements with normalized structure """ elements = [] for idx, item in enumerate(parsing_res_list): # Debug: log the structure of the first item if idx == 0: logger.info(f"First parsing_res_list item structure: {list(item.keys()) if isinstance(item, dict) else type(item)}") logger.info(f"First parsing_res_list item sample: {str(item)[:500]}") # Extract element type (check both 'type' and 'block_label') element_type = item.get('type', '') or item.get('block_label', 'text') element_type = element_type.lower() mapped_type = self.ELEMENT_TYPE_MAPPING.get( element_type, ElementType.TEXT ) # Extract bbox (check multiple possible keys) layout_bbox = ( item.get('layout_bbox', []) or item.get('block_bbox', []) or item.get('bbox', []) ) # Ensure bbox has 4 values if len(layout_bbox) >= 4: bbox = list(layout_bbox[:4]) # [x1, y1, x2, y2] else: bbox = [0, 0, 0, 0] # Default if bbox missing logger.warning(f"Element {idx} has invalid bbox: {layout_bbox}") # Scale bbox back to original image coordinates if image was scaled # This is critical for proper cropping from original high-resolution image if scaling_info and scaling_info.was_scaled and bbox != [0, 0, 0, 0]: scale_factor = scaling_info.scale_factor bbox = [ bbox[0] * scale_factor, # x1 bbox[1] * scale_factor, # y1 bbox[2] * scale_factor, # x2 bbox[3] * scale_factor # y2 ] if idx == 0: # Log only for first element to avoid spam logger.info( f"Scaled bbox to original coords: " f"{[round(x, 1) for x in layout_bbox[:4]]} -> {[round(x, 1) for x in bbox]} " f"(factor={scale_factor:.3f})" ) # Extract content (check multiple possible keys) content = ( item.get('content', '') or item.get('block_content', '') or '' ) # Additional fallback for content in 'res' field if not content and 'res' in item: res = item.get('res', {}) if isinstance(res, dict): content = res.get('content', '') or res.get('text', '') elif isinstance(res, str): content = res # Content-based HTML table detection: PP-StructureV3 sometimes # classifies tables as 'text' but returns HTML table content html_table_content = None if content and ' res_data['boxes'] > SLANeXt 補充 cell_boxes_extracted = False # First, try to get cell_box_list from table_res_list (pp_demo style) if table_res_list and not cell_boxes_extracted: # Match table by HTML content or find closest bbox for tbl_res in table_res_list: if 'cell_box_list' in tbl_res and tbl_res['cell_box_list']: # Check if HTML matches tbl_html = tbl_res.get('pred_html', '') if html_content and tbl_html: # Simple check: if both have same structure if tbl_html[:100] == html_content[:100]: cell_boxes = tbl_res['cell_box_list'] # cell_box_list is already in absolute coordinates element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes] element['cell_boxes_source'] = 'table_res_list' cell_boxes_extracted = True logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes from table_res_list (HTML match)") break # If no HTML match, use first available table_res with cell_box_list if not cell_boxes_extracted: for tbl_res in table_res_list: if 'cell_box_list' in tbl_res and tbl_res['cell_box_list']: cell_boxes = tbl_res['cell_box_list'] element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes] element['cell_boxes_source'] = 'table_res_list' cell_boxes_extracted = True logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes from table_res_list (first available)") # Remove used table_res to avoid reuse table_res_list.remove(tbl_res) break if not cell_boxes_extracted and 'boxes' in res_data: # PPStructureV3 returned cell boxes in res (unlikely in PaddleX 3.x) cell_boxes = res_data['boxes'] logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes in res_data") # 獲取表格自身的偏移量 (用於將 Cell 的相對座標轉為絕對座標) table_x, table_y = 0, 0 if len(bbox) >= 2: # bbox is [x1, y1, x2, y2] table_x, table_y = bbox[0], bbox[1] processed_cells = [] for cell_box in cell_boxes: # 確保格式正確 if isinstance(cell_box, (list, tuple)) and len(cell_box) >= 4: # 轉換為絕對座標: Cell x + 表格 x abs_cell_box = [ cell_box[0] + table_x, cell_box[1] + table_y, cell_box[2] + table_x, cell_box[3] + table_y ] processed_cells.append(abs_cell_box) # 將處理後的 Cell 座標存入 element element['cell_boxes'] = processed_cells element['raw_cell_boxes'] = cell_boxes element['cell_boxes_source'] = 'ppstructure' logger.info(f"[TABLE] Processed {len(processed_cells)} cell boxes with table offset ({table_x}, {table_y})") cell_boxes_extracted = True if not cell_boxes_extracted: logger.info(f"[TABLE] No cell boxes available. PPStructureV3 keys: {list(res_data.keys()) if res_data else 'empty'}") # 2.5 CV-based table line detection for wired tables if use_cv_table_detection and source_image_path and source_image_path.exists(): try: # Load image for CV processing cv_image = cv2.imread(str(source_image_path)) if cv_image is not None: cv_detector = CVTableDetector() ml_cell_boxes = element.get('cell_boxes', []) # Detect cells using CV line detection cv_cells = cv_detector.detect_and_merge_with_ml( cv_image, bbox, # Table bbox ml_cell_boxes ) if cv_cells: # Apply scaling if needed if scaling_info and scaling_info.was_scaled: cv_cells = [ [ c[0] * scaling_info.scale_x, c[1] * scaling_info.scale_y, c[2] * scaling_info.scale_x, c[3] * scaling_info.scale_y ] for c in cv_cells ] element['cell_boxes'] = cv_cells element['cell_boxes_source'] = 'cv_line_detection' logger.info(f"[TABLE] CV line detection found {len(cv_cells)} cells (ML had {len(ml_cell_boxes)})") except Exception as cv_error: logger.warning(f"[TABLE] CV line detection failed: {cv_error}") # 3. Image-in-Table 處理:檢測並嵌入表格內的圖片 if layout_det_res and source_image_path and output_dir: embedded_images = self._embed_images_in_table( element, bbox, layout_det_res, source_image_path, output_dir ) if embedded_images: element['embedded_images'] = embedded_images logger.info(f"[TABLE] Embedded {len(embedded_images)} images into table") # Special handling for images/figures/charts/stamps (visual elements that need cropping) elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART, ElementType.DIAGRAM, ElementType.STAMP, ElementType.LOGO]: # Save image if path provided if 'img_path' in item and output_dir: saved_path = self._save_image(item['img_path'], output_dir, element['element_id']) if saved_path: element['saved_path'] = saved_path element['img_path'] = item['img_path'] # Keep original for reference else: logger.warning(f"Failed to save image for element {element['element_id']}") # Crop image from source if no img_path but source image is available elif source_image_path and output_dir and bbox != [0, 0, 0, 0]: cropped_path = self._crop_and_save_image( source_image_path, bbox, output_dir, element['element_id'] ) if cropped_path: element['saved_path'] = cropped_path element['img_path'] = cropped_path logger.info(f"Cropped and saved image region for {element['element_id']}") else: logger.warning(f"Failed to crop image for element {element['element_id']}") # Add any additional metadata if 'metadata' in item: element['metadata'] = item['metadata'] elements.append(element) logger.debug(f"Processed element {idx}: type={mapped_type}, bbox={bbox}") return elements def _embed_images_in_table( self, table_element: Dict[str, Any], table_bbox: List[float], layout_det_res: Dict, source_image_path: Path, output_dir: Path ) -> List[Dict[str, Any]]: """ Detect and embed images that are inside a table region. This handles the case where layout detection finds an image inside a table, similar to how pp_demo embeds images in table HTML. Args: table_element: The table element being processed table_bbox: Table bounding box [x1, y1, x2, y2] layout_det_res: Layout detection result containing all detected boxes source_image_path: Path to source image for cropping output_dir: Output directory for saving cropped images Returns: List of embedded image info dicts with 'bbox', 'saved_path', 'html_tag' """ embedded_images = [] try: boxes = layout_det_res.get('boxes', []) table_x1, table_y1, table_x2, table_y2 = table_bbox for box in boxes: label = box.get('label', '').lower() if label != 'image': continue # Get image bbox img_coord = box.get('coordinate', []) if len(img_coord) < 4: continue img_x1, img_y1, img_x2, img_y2 = img_coord[:4] # Check if image is inside table (with some tolerance) tolerance = 5 # pixels if (img_x1 >= table_x1 - tolerance and img_y1 >= table_y1 - tolerance and img_x2 <= table_x2 + tolerance and img_y2 <= table_y2 + tolerance): logger.info(f"[IMAGE-IN-TABLE] Found image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}] inside table") # Crop and save the image img_element_id = f"img_in_table_{int(img_x1)}_{int(img_y1)}_{int(img_x2)}_{int(img_y2)}" cropped_path = self._crop_and_save_image( source_image_path, [img_x1, img_y1, img_x2, img_y2], output_dir, img_element_id ) if cropped_path: # Create relative path for HTML embedding rel_path = f"imgs/{Path(cropped_path).name}" # Create img tag similar to pp_demo img_html = f'
Image
' embedded_image = { 'bbox': [img_x1, img_y1, img_x2, img_y2], 'saved_path': str(cropped_path), 'relative_path': rel_path, 'html_tag': img_html, 'element_id': img_element_id } embedded_images.append(embedded_image) # Try to insert image into HTML content if 'html' in table_element and table_element['html']: # Insert image reference at the end of HTML before original_html = table_element['html'] if '' in original_html: # Insert before in a new row new_html = original_html.replace( '', f'Embedded Image' ) table_element['html'] = new_html logger.info(f"[IMAGE-IN-TABLE] Embedded image into table HTML") except Exception as e: logger.error(f"[IMAGE-IN-TABLE] Error processing images in table: {e}") return embedded_images def _extract_standalone_images( self, layout_det_res: Dict, table_bboxes: List[List[float]], source_image_path: Path, output_dir: Path, current_page: int, start_index: int, scaling_info: Optional['ScalingInfo'] = None ) -> List[Dict[str, Any]]: """ Extract standalone images from layout_det_res that are NOT inside tables. This handles images that PP-StructureV3 detects in layout_det_res but doesn't include in parsing_res_list (non-table images). Args: layout_det_res: Layout detection result containing all detected boxes table_bboxes: List of table bounding boxes to exclude images inside tables source_image_path: Path to source image for cropping output_dir: Output directory for saving cropped images current_page: Current page number start_index: Starting index for element IDs scaling_info: Optional scaling info for coordinate restoration Returns: List of standalone image elements """ standalone_images = [] try: boxes = layout_det_res.get('boxes', []) logger.info(f"[STANDALONE-IMAGE] Checking {len(boxes)} boxes for standalone images") for box_idx, box in enumerate(boxes): label = box.get('label', '').lower() if label != 'image': continue # Get image bbox img_coord = box.get('coordinate', []) if len(img_coord) < 4: continue img_x1, img_y1, img_x2, img_y2 = img_coord[:4] # Check if image is inside any table (skip if so) is_inside_table = False for table_bbox in table_bboxes: if len(table_bbox) < 4: continue tx1, ty1, tx2, ty2 = table_bbox[:4] tolerance = 5 # pixels if (img_x1 >= tx1 - tolerance and img_y1 >= ty1 - tolerance and img_x2 <= tx2 + tolerance and img_y2 <= ty2 + tolerance): is_inside_table = True logger.debug(f"[STANDALONE-IMAGE] Image at [{int(img_x1)},{int(img_y1)}] is inside table, skipping") break if is_inside_table: continue # Scale bbox back to original coordinates if needed if scaling_info and scaling_info.was_scaled: scale_factor = scaling_info.scale_factor img_x1 *= scale_factor img_y1 *= scale_factor img_x2 *= scale_factor img_y2 *= scale_factor logger.debug(f"[STANDALONE-IMAGE] Scaled bbox by {scale_factor:.3f}") logger.info(f"[STANDALONE-IMAGE] Found standalone image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}]") # Crop and save the image element_idx = start_index + len(standalone_images) img_element_id = f"standalone_img_{current_page}_{element_idx}" cropped_path = self._crop_and_save_image( source_image_path, [img_x1, img_y1, img_x2, img_y2], output_dir, img_element_id ) if cropped_path: element = { 'element_id': img_element_id, 'type': ElementType.IMAGE, 'original_type': 'image', 'content': '', 'page': current_page, 'bbox': [img_x1, img_y1, img_x2, img_y2], 'index': element_idx, 'confidence': box.get('score', 1.0), 'saved_path': cropped_path, 'img_path': cropped_path, 'source': 'layout_det_res' } standalone_images.append(element) logger.info(f"[STANDALONE-IMAGE] Extracted and saved: {cropped_path}") except Exception as e: logger.error(f"[STANDALONE-IMAGE] Error extracting standalone images: {e}") import traceback traceback.print_exc() return standalone_images def _process_markdown_fallback( self, page_result: Any, current_page: int, output_dir: Optional[Path] ) -> List[Dict[str, Any]]: """ Fallback to markdown processing if parsing_res_list not available. Args: page_result: PP-StructureV3 page result current_page: Current page number output_dir: Optional output directory Returns: List of elements extracted from markdown """ elements = [] # Extract from markdown if available if hasattr(page_result, 'markdown'): markdown_dict = page_result.markdown if isinstance(markdown_dict, dict): # Extract markdown texts markdown_texts = markdown_dict.get('markdown_texts', '') if markdown_texts: # Detect if it's a table is_table = ' List[int]: """ Determine reading order based on element positions. Args: elements: List of elements with bbox Returns: List of indices representing reading order """ if not elements: return [] # If elements have original indices, use them if all('index' in elem for elem in elements): # Sort by original index indexed_elements = [(i, elem['index']) for i, elem in enumerate(elements)] indexed_elements.sort(key=lambda x: x[1]) return [i for i, _ in indexed_elements] # Otherwise, sort by position (top to bottom, left to right) indexed_elements = [] for i, elem in enumerate(elements): bbox = elem.get('bbox', [0, 0, 0, 0]) if len(bbox) >= 2: # Use top-left corner for sorting indexed_elements.append((i, bbox[1], bbox[0])) # (index, y, x) else: indexed_elements.append((i, 0, 0)) # Sort by y first (top to bottom), then x (left to right) indexed_elements.sort(key=lambda x: (x[1], x[2])) return [i for i, _, _ in indexed_elements] def _count_element_types(self, elements: List[Dict]) -> Dict[str, int]: """ Count occurrences of each element type. Args: elements: List of elements Returns: Dictionary with element type counts """ type_counts = {} for elem in elements: elem_type = elem.get('type', ElementType.TEXT) type_counts[elem_type] = type_counts.get(elem_type, 0) + 1 return type_counts def _extract_text_from_html(self, html: str) -> str: """Extract plain text from HTML content.""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'html.parser') return soup.get_text(separator=' ', strip=True) except: # Fallback: just remove HTML tags import re text = re.sub(r'<[^>]+>', ' ', html) text = re.sub(r'\s+', ' ', text) return text.strip() def _extract_bbox_from_filename(self, filename: str) -> List[int]: """Extract bbox from filename if it contains coordinate information.""" import re match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', filename) if match: return list(map(int, match.groups())) return [0, 0, 0, 0] def _save_image(self, img_path: str, output_dir: Path, element_id: str) -> Optional[str]: """Save image file to output directory and return relative path. Args: img_path: Path to image file or image data output_dir: Base output directory for results element_id: Unique identifier for the element Returns: Relative path to saved image, or None if save failed """ import shutil import numpy as np from PIL import Image try: # Create imgs subdirectory img_dir = output_dir / "imgs" img_dir.mkdir(parents=True, exist_ok=True) # Determine output file path dst_path = img_dir / f"{element_id}.png" relative_path = f"imgs/{element_id}.png" # Handle different input types if isinstance(img_path, str): src_path = Path(img_path) if src_path.exists() and src_path.is_file(): # Copy existing file shutil.copy2(src_path, dst_path) logger.info(f"Copied image from {src_path} to {dst_path}") else: logger.warning(f"Image file not found: {img_path}") return None elif isinstance(img_path, np.ndarray): # Save numpy array as image Image.fromarray(img_path).save(dst_path) logger.info(f"Saved numpy array image to {dst_path}") else: logger.warning(f"Unknown image type: {type(img_path)}") return None # Return relative path for reference return relative_path except Exception as e: logger.error(f"Failed to save image for element {element_id}: {e}") return None def _save_pil_image(self, img_obj, output_dir: Path, element_id: str): """Save PIL image object to output directory.""" try: img_dir = output_dir / "imgs" img_dir.mkdir(parents=True, exist_ok=True) img_path = img_dir / f"{element_id}.png" img_obj.save(str(img_path)) logger.info(f"Saved image to {img_path}") except Exception as e: logger.warning(f"Failed to save PIL image: {e}") def _crop_and_save_image( self, source_image_path: Path, bbox: List[float], output_dir: Path, element_id: str ) -> Optional[str]: """ Crop image region from source image and save to output directory. Args: source_image_path: Path to the source image bbox: Bounding box [x1, y1, x2, y2] output_dir: Output directory for saving cropped image element_id: Element ID for naming Returns: Relative filename (not full path) to saved image, consistent with Direct Track which stores "filename.png" that gets joined with result_dir by pdf_generator_service. """ try: from PIL import Image # Open source image with Image.open(source_image_path) as img: # Ensure bbox values are integers x1, y1, x2, y2 = [int(v) for v in bbox[:4]] # Validate bbox img_width, img_height = img.size x1 = max(0, min(x1, img_width)) x2 = max(0, min(x2, img_width)) y1 = max(0, min(y1, img_height)) y2 = max(0, min(y2, img_height)) if x2 <= x1 or y2 <= y1: logger.warning(f"Invalid bbox for cropping: {bbox}") return None # Crop the region cropped = img.crop((x1, y1, x2, y2)) # Save directly to output directory (no subdirectory) # Consistent with Direct Track which saves to output_dir directly image_filename = f"{element_id}.png" img_path = output_dir / image_filename cropped.save(str(img_path), "PNG") # Return just the filename (relative to result_dir) # PDF generator will join with result_dir to get full path logger.info(f"Cropped image saved: {img_path} ({x2-x1}x{y2-y1} pixels)") return image_filename except Exception as e: logger.error(f"Failed to crop and save image for {element_id}: {e}") return None