Files
OCR/backend/app/services/pp_structure_enhanced.py
egg 0aff468c51 feat: implement Phase 1 of PDF layout restoration
Implement critical fixes for image and table rendering in PDF generation.

**Image Handling Fixes**:
- Implemented _save_image() in pp_structure_enhanced.py
  - Creates imgs/ subdirectory for saved images
  - Handles both file paths and numpy arrays
  - Returns relative path for reference
  - Adds proper error handling and logging
- Added saved_path field to image elements for path tracking
- Created _get_image_path() helper with fallback logic
  - Checks saved_path, path, image_path in content
  - Falls back to metadata fields
  - Logs warnings for missing paths

**Table Rendering Fixes**:
- Fixed table rendering to use element's own bbox directly
  - No longer depends on fake table_*.png references
  - Supports both bbox and bbox_polygon formats
  - Inline conversion for different bbox formats
- Maintains backward compatibility with legacy approach
- Improved error handling for missing bbox data

**Status**:
- Phase 1 tasks 1.1 and 1.2:  Completed
- Phase 1 tasks 2.1, 2.2, and 2.3:  Completed
- Testing pending due to backend availability

These fixes resolve the critical issues where images never appeared
and tables never rendered in generated PDFs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 07:16:31 +08:00

476 lines
18 KiB
Python

"""
Enhanced PP-StructureV3 processing with full element extraction
This module provides enhanced PP-StructureV3 processing that extracts all
23 element types with their bbox coordinates and reading order.
"""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
import json
import gc
# Optional torch import for additional GPU memory management
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
import paddle
from paddleocr import PPStructureV3
from app.models.unified_document import ElementType
logger = logging.getLogger(__name__)
class PPStructureEnhanced:
"""
Enhanced PP-StructureV3 processor that extracts all available element types
and structure information from parsing_res_list.
"""
# Mapping from PP-StructureV3 types to our ElementType
ELEMENT_TYPE_MAPPING = {
'title': ElementType.TITLE,
'text': ElementType.TEXT,
'paragraph': ElementType.PARAGRAPH,
'figure': ElementType.FIGURE,
'figure_caption': ElementType.CAPTION,
'table': ElementType.TABLE,
'table_caption': ElementType.TABLE_CAPTION,
'header': ElementType.HEADER,
'footer': ElementType.FOOTER,
'reference': ElementType.REFERENCE,
'equation': ElementType.EQUATION,
'formula': ElementType.FORMULA,
'list-item': ElementType.LIST_ITEM,
'list': ElementType.LIST,
'code': ElementType.CODE,
'footnote': ElementType.FOOTNOTE,
'page-number': ElementType.PAGE_NUMBER,
'watermark': ElementType.WATERMARK,
'signature': ElementType.SIGNATURE,
'stamp': ElementType.STAMP,
'logo': ElementType.LOGO,
'barcode': ElementType.BARCODE,
'qr-code': ElementType.QR_CODE,
# Default fallback
'image': ElementType.IMAGE,
'chart': ElementType.CHART,
'diagram': ElementType.DIAGRAM,
}
def __init__(self, structure_engine: PPStructureV3):
"""
Initialize with existing PP-StructureV3 engine.
Args:
structure_engine: Initialized PPStructureV3 instance
"""
self.structure_engine = structure_engine
def analyze_with_full_structure(
self,
image_path: Path,
output_dir: Optional[Path] = None,
current_page: int = 0
) -> Dict[str, Any]:
"""
Analyze document with full PP-StructureV3 capabilities.
Args:
image_path: Path to image file
output_dir: Optional output directory for saving extracted content
current_page: Current page number (0-based)
Returns:
Dictionary with complete structure information including:
- elements: List of all detected elements with types and bbox
- reading_order: Reading order indices
- images: Extracted images with metadata
- tables: Extracted tables with structure
"""
try:
logger.info(f"Enhanced PP-StructureV3 analysis on {image_path.name}")
# Perform structure analysis
results = self.structure_engine.predict(str(image_path))
all_elements = []
all_images = []
all_tables = []
# Process each page result
for page_idx, page_result in enumerate(results):
# Try to access parsing_res_list (the complete structure)
parsing_res_list = None
# Method 1: Direct access to json attribute
if hasattr(page_result, 'json'):
result_json = page_result.json
if isinstance(result_json, dict) and 'parsing_res_list' in result_json:
parsing_res_list = result_json['parsing_res_list']
logger.info(f"Found parsing_res_list with {len(parsing_res_list)} elements")
# Method 2: Try to access as attribute
elif hasattr(page_result, 'parsing_res_list'):
parsing_res_list = page_result.parsing_res_list
logger.info(f"Found parsing_res_list attribute with {len(parsing_res_list)} elements")
# Method 3: Check if result has to_dict method
elif hasattr(page_result, 'to_dict'):
result_dict = page_result.to_dict()
if 'parsing_res_list' in result_dict:
parsing_res_list = result_dict['parsing_res_list']
logger.info(f"Found parsing_res_list in to_dict with {len(parsing_res_list)} elements")
# Process parsing_res_list if found
if parsing_res_list:
elements = self._process_parsing_res_list(
parsing_res_list, current_page, output_dir
)
all_elements.extend(elements)
# Extract tables and images from elements
for elem in elements:
if elem['type'] == ElementType.TABLE:
all_tables.append(elem)
elif elem['type'] in [ElementType.IMAGE, ElementType.FIGURE]:
all_images.append(elem)
else:
# Fallback to markdown if parsing_res_list not available
logger.warning("parsing_res_list not found, falling back to markdown")
elements = self._process_markdown_fallback(
page_result, current_page, output_dir
)
all_elements.extend(elements)
# Create reading order based on element positions
reading_order = self._determine_reading_order(all_elements)
return {
'elements': all_elements,
'total_elements': len(all_elements),
'reading_order': reading_order,
'tables': all_tables,
'images': all_images,
'element_types': self._count_element_types(all_elements),
'has_parsing_res_list': parsing_res_list is not None
}
except Exception as e:
logger.error(f"Enhanced PP-StructureV3 analysis error: {e}")
import traceback
traceback.print_exc()
# Clean up GPU memory on error
try:
if TORCH_AVAILABLE and torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
if paddle.device.is_compiled_with_cuda():
paddle.device.cuda.empty_cache()
gc.collect()
except:
pass # Ignore cleanup errors
return {
'elements': [],
'total_elements': 0,
'reading_order': [],
'tables': [],
'images': [],
'element_types': {},
'has_parsing_res_list': False,
'error': str(e)
}
def _process_parsing_res_list(
self,
parsing_res_list: List[Dict],
current_page: int,
output_dir: Optional[Path]
) -> List[Dict[str, Any]]:
"""
Process parsing_res_list to extract all elements.
Args:
parsing_res_list: List of parsed elements from PP-StructureV3
current_page: Current page number
output_dir: Optional output directory
Returns:
List of processed elements with normalized structure
"""
elements = []
for idx, item in enumerate(parsing_res_list):
# Extract element type
element_type = item.get('type', 'text').lower()
mapped_type = self.ELEMENT_TYPE_MAPPING.get(
element_type, ElementType.TEXT
)
# Extract bbox (layout_bbox has the precise coordinates)
layout_bbox = item.get('layout_bbox', [])
if not layout_bbox and 'bbox' in item:
layout_bbox = item['bbox']
# Ensure bbox has 4 values
if len(layout_bbox) >= 4:
bbox = layout_bbox[:4] # [x1, y1, x2, y2]
else:
bbox = [0, 0, 0, 0] # Default if bbox missing
# Extract content
content = item.get('content', '')
if not content and 'res' in item:
# Some elements have content in 'res' field
res = item.get('res', {})
if isinstance(res, dict):
content = res.get('content', '') or res.get('text', '')
elif isinstance(res, str):
content = res
# Create element
element = {
'element_id': f"pp3_{current_page}_{idx}",
'type': mapped_type,
'original_type': element_type,
'content': content,
'page': current_page,
'bbox': bbox, # [x1, y1, x2, y2]
'index': idx, # Original index in reading order
'confidence': item.get('score', 1.0)
}
# Special handling for tables
if mapped_type == ElementType.TABLE:
# Extract table structure if available
if 'res' in item and isinstance(item['res'], dict):
html_content = item['res'].get('html', '')
if html_content:
element['html'] = html_content
element['extracted_text'] = self._extract_text_from_html(html_content)
# Special handling for images/figures
elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE]:
# Save image if path provided
if 'img_path' in item and output_dir:
saved_path = self._save_image(item['img_path'], output_dir, element['element_id'])
if saved_path:
element['saved_path'] = saved_path
element['img_path'] = item['img_path'] # Keep original for reference
else:
logger.warning(f"Failed to save image for element {element['element_id']}")
# Add any additional metadata
if 'metadata' in item:
element['metadata'] = item['metadata']
elements.append(element)
logger.debug(f"Processed element {idx}: type={mapped_type}, bbox={bbox}")
return elements
def _process_markdown_fallback(
self,
page_result: Any,
current_page: int,
output_dir: Optional[Path]
) -> List[Dict[str, Any]]:
"""
Fallback to markdown processing if parsing_res_list not available.
Args:
page_result: PP-StructureV3 page result
current_page: Current page number
output_dir: Optional output directory
Returns:
List of elements extracted from markdown
"""
elements = []
# Extract from markdown if available
if hasattr(page_result, 'markdown'):
markdown_dict = page_result.markdown
if isinstance(markdown_dict, dict):
# Extract markdown texts
markdown_texts = markdown_dict.get('markdown_texts', '')
if markdown_texts:
# Detect if it's a table
is_table = '<table' in markdown_texts.lower()
element = {
'element_id': f"md_{current_page}_0",
'type': ElementType.TABLE if is_table else ElementType.TEXT,
'content': markdown_texts,
'page': current_page,
'bbox': [0, 0, 0, 0], # No bbox in markdown
'index': 0,
'from_markdown': True
}
if is_table:
element['extracted_text'] = self._extract_text_from_html(markdown_texts)
elements.append(element)
# Process images
markdown_images = markdown_dict.get('markdown_images', {})
for img_idx, (img_path, img_obj) in enumerate(markdown_images.items()):
# Save image
if output_dir and hasattr(img_obj, 'save'):
self._save_pil_image(img_obj, output_dir, f"md_img_{current_page}_{img_idx}")
# Try to extract bbox from filename
bbox = self._extract_bbox_from_filename(img_path)
element = {
'element_id': f"md_img_{current_page}_{img_idx}",
'type': ElementType.IMAGE,
'content': img_path,
'page': current_page,
'bbox': bbox,
'index': img_idx + 1,
'from_markdown': True
}
elements.append(element)
return elements
def _determine_reading_order(self, elements: List[Dict]) -> List[int]:
"""
Determine reading order based on element positions.
Args:
elements: List of elements with bbox
Returns:
List of indices representing reading order
"""
if not elements:
return []
# If elements have original indices, use them
if all('index' in elem for elem in elements):
# Sort by original index
indexed_elements = [(i, elem['index']) for i, elem in enumerate(elements)]
indexed_elements.sort(key=lambda x: x[1])
return [i for i, _ in indexed_elements]
# Otherwise, sort by position (top to bottom, left to right)
indexed_elements = []
for i, elem in enumerate(elements):
bbox = elem.get('bbox', [0, 0, 0, 0])
if len(bbox) >= 2:
# Use top-left corner for sorting
indexed_elements.append((i, bbox[1], bbox[0])) # (index, y, x)
else:
indexed_elements.append((i, 0, 0))
# Sort by y first (top to bottom), then x (left to right)
indexed_elements.sort(key=lambda x: (x[1], x[2]))
return [i for i, _, _ in indexed_elements]
def _count_element_types(self, elements: List[Dict]) -> Dict[str, int]:
"""
Count occurrences of each element type.
Args:
elements: List of elements
Returns:
Dictionary with element type counts
"""
type_counts = {}
for elem in elements:
elem_type = elem.get('type', ElementType.TEXT)
type_counts[elem_type] = type_counts.get(elem_type, 0) + 1
return type_counts
def _extract_text_from_html(self, html: str) -> str:
"""Extract plain text from HTML content."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text(separator=' ', strip=True)
except:
# Fallback: just remove HTML tags
import re
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _extract_bbox_from_filename(self, filename: str) -> List[int]:
"""Extract bbox from filename if it contains coordinate information."""
import re
match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', filename)
if match:
return list(map(int, match.groups()))
return [0, 0, 0, 0]
def _save_image(self, img_path: str, output_dir: Path, element_id: str) -> Optional[str]:
"""Save image file to output directory and return relative path.
Args:
img_path: Path to image file or image data
output_dir: Base output directory for results
element_id: Unique identifier for the element
Returns:
Relative path to saved image, or None if save failed
"""
import shutil
import numpy as np
from PIL import Image
try:
# Create imgs subdirectory
img_dir = output_dir / "imgs"
img_dir.mkdir(parents=True, exist_ok=True)
# Determine output file path
dst_path = img_dir / f"{element_id}.png"
relative_path = f"imgs/{element_id}.png"
# Handle different input types
if isinstance(img_path, str):
src_path = Path(img_path)
if src_path.exists() and src_path.is_file():
# Copy existing file
shutil.copy2(src_path, dst_path)
logger.info(f"Copied image from {src_path} to {dst_path}")
else:
logger.warning(f"Image file not found: {img_path}")
return None
elif isinstance(img_path, np.ndarray):
# Save numpy array as image
Image.fromarray(img_path).save(dst_path)
logger.info(f"Saved numpy array image to {dst_path}")
else:
logger.warning(f"Unknown image type: {type(img_path)}")
return None
# Return relative path for reference
return relative_path
except Exception as e:
logger.error(f"Failed to save image for element {element_id}: {e}")
return None
def _save_pil_image(self, img_obj, output_dir: Path, element_id: str):
"""Save PIL image object to output directory."""
try:
img_dir = output_dir / "imgs"
img_dir.mkdir(parents=True, exist_ok=True)
img_path = img_dir / f"{element_id}.png"
img_obj.save(str(img_path))
logger.info(f"Saved image to {img_path}")
except Exception as e:
logger.warning(f"Failed to save PIL image: {e}")