Files
OCR/backend/app/services/pp_structure_enhanced.py
egg b997f9355a fix: make torch import optional and add PaddlePaddle GPU memory management
Problem:
- Backend failed to start with ModuleNotFoundError for torch module
- torch was imported as hard dependency but not in requirements.txt
- Project uses PaddlePaddle which has its own CUDA implementation

Changes:
- Make torch import optional with try/except in ocr_service.py
- Make torch import optional in pp_structure_enhanced.py
- Add cleanup_gpu_memory() method using PaddlePaddle's memory management
- Add check_gpu_memory() method to monitor available GPU memory
- Use paddle.device.cuda.empty_cache() for GPU cleanup
- Use torch.cuda only if TORCH_AVAILABLE flag is True
- Add cleanup calls after OCR processing to prevent OOM errors
- Add memory checks before GPU-intensive operations

Benefits:
- Backend can start without torch installed
- GPU memory is properly managed using PaddlePaddle
- Optional torch support provides additional memory monitoring
- Prevents GPU OOM errors during document processing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 16:40:44 +08:00

431 lines
16 KiB
Python

"""
Enhanced PP-StructureV3 processing with full element extraction
This module provides enhanced PP-StructureV3 processing that extracts all
23 element types with their bbox coordinates and reading order.
"""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
import json
import gc
# Optional torch import for additional GPU memory management
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
import paddle
from paddleocr import PPStructureV3
from app.models.unified_document import ElementType
logger = logging.getLogger(__name__)
class PPStructureEnhanced:
"""
Enhanced PP-StructureV3 processor that extracts all available element types
and structure information from parsing_res_list.
"""
# Mapping from PP-StructureV3 types to our ElementType
ELEMENT_TYPE_MAPPING = {
'title': ElementType.TITLE,
'text': ElementType.TEXT,
'paragraph': ElementType.PARAGRAPH,
'figure': ElementType.FIGURE,
'figure_caption': ElementType.CAPTION,
'table': ElementType.TABLE,
'table_caption': ElementType.TABLE_CAPTION,
'header': ElementType.HEADER,
'footer': ElementType.FOOTER,
'reference': ElementType.REFERENCE,
'equation': ElementType.EQUATION,
'formula': ElementType.FORMULA,
'list-item': ElementType.LIST_ITEM,
'list': ElementType.LIST,
'code': ElementType.CODE,
'footnote': ElementType.FOOTNOTE,
'page-number': ElementType.PAGE_NUMBER,
'watermark': ElementType.WATERMARK,
'signature': ElementType.SIGNATURE,
'stamp': ElementType.STAMP,
'logo': ElementType.LOGO,
'barcode': ElementType.BARCODE,
'qr-code': ElementType.QR_CODE,
# Default fallback
'image': ElementType.IMAGE,
'chart': ElementType.CHART,
'diagram': ElementType.DIAGRAM,
}
def __init__(self, structure_engine: PPStructureV3):
"""
Initialize with existing PP-StructureV3 engine.
Args:
structure_engine: Initialized PPStructureV3 instance
"""
self.structure_engine = structure_engine
def analyze_with_full_structure(
self,
image_path: Path,
output_dir: Optional[Path] = None,
current_page: int = 0
) -> Dict[str, Any]:
"""
Analyze document with full PP-StructureV3 capabilities.
Args:
image_path: Path to image file
output_dir: Optional output directory for saving extracted content
current_page: Current page number (0-based)
Returns:
Dictionary with complete structure information including:
- elements: List of all detected elements with types and bbox
- reading_order: Reading order indices
- images: Extracted images with metadata
- tables: Extracted tables with structure
"""
try:
logger.info(f"Enhanced PP-StructureV3 analysis on {image_path.name}")
# Perform structure analysis
results = self.structure_engine.predict(str(image_path))
all_elements = []
all_images = []
all_tables = []
# Process each page result
for page_idx, page_result in enumerate(results):
# Try to access parsing_res_list (the complete structure)
parsing_res_list = None
# Method 1: Direct access to json attribute
if hasattr(page_result, 'json'):
result_json = page_result.json
if isinstance(result_json, dict) and 'parsing_res_list' in result_json:
parsing_res_list = result_json['parsing_res_list']
logger.info(f"Found parsing_res_list with {len(parsing_res_list)} elements")
# Method 2: Try to access as attribute
elif hasattr(page_result, 'parsing_res_list'):
parsing_res_list = page_result.parsing_res_list
logger.info(f"Found parsing_res_list attribute with {len(parsing_res_list)} elements")
# Method 3: Check if result has to_dict method
elif hasattr(page_result, 'to_dict'):
result_dict = page_result.to_dict()
if 'parsing_res_list' in result_dict:
parsing_res_list = result_dict['parsing_res_list']
logger.info(f"Found parsing_res_list in to_dict with {len(parsing_res_list)} elements")
# Process parsing_res_list if found
if parsing_res_list:
elements = self._process_parsing_res_list(
parsing_res_list, current_page, output_dir
)
all_elements.extend(elements)
# Extract tables and images from elements
for elem in elements:
if elem['type'] == ElementType.TABLE:
all_tables.append(elem)
elif elem['type'] in [ElementType.IMAGE, ElementType.FIGURE]:
all_images.append(elem)
else:
# Fallback to markdown if parsing_res_list not available
logger.warning("parsing_res_list not found, falling back to markdown")
elements = self._process_markdown_fallback(
page_result, current_page, output_dir
)
all_elements.extend(elements)
# Create reading order based on element positions
reading_order = self._determine_reading_order(all_elements)
return {
'elements': all_elements,
'total_elements': len(all_elements),
'reading_order': reading_order,
'tables': all_tables,
'images': all_images,
'element_types': self._count_element_types(all_elements),
'has_parsing_res_list': parsing_res_list is not None
}
except Exception as e:
logger.error(f"Enhanced PP-StructureV3 analysis error: {e}")
import traceback
traceback.print_exc()
# Clean up GPU memory on error
try:
if TORCH_AVAILABLE and torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
if paddle.device.is_compiled_with_cuda():
paddle.device.cuda.empty_cache()
gc.collect()
except:
pass # Ignore cleanup errors
return {
'elements': [],
'total_elements': 0,
'reading_order': [],
'tables': [],
'images': [],
'element_types': {},
'has_parsing_res_list': False,
'error': str(e)
}
def _process_parsing_res_list(
self,
parsing_res_list: List[Dict],
current_page: int,
output_dir: Optional[Path]
) -> List[Dict[str, Any]]:
"""
Process parsing_res_list to extract all elements.
Args:
parsing_res_list: List of parsed elements from PP-StructureV3
current_page: Current page number
output_dir: Optional output directory
Returns:
List of processed elements with normalized structure
"""
elements = []
for idx, item in enumerate(parsing_res_list):
# Extract element type
element_type = item.get('type', 'text').lower()
mapped_type = self.ELEMENT_TYPE_MAPPING.get(
element_type, ElementType.TEXT
)
# Extract bbox (layout_bbox has the precise coordinates)
layout_bbox = item.get('layout_bbox', [])
if not layout_bbox and 'bbox' in item:
layout_bbox = item['bbox']
# Ensure bbox has 4 values
if len(layout_bbox) >= 4:
bbox = layout_bbox[:4] # [x1, y1, x2, y2]
else:
bbox = [0, 0, 0, 0] # Default if bbox missing
# Extract content
content = item.get('content', '')
if not content and 'res' in item:
# Some elements have content in 'res' field
res = item.get('res', {})
if isinstance(res, dict):
content = res.get('content', '') or res.get('text', '')
elif isinstance(res, str):
content = res
# Create element
element = {
'element_id': f"pp3_{current_page}_{idx}",
'type': mapped_type,
'original_type': element_type,
'content': content,
'page': current_page,
'bbox': bbox, # [x1, y1, x2, y2]
'index': idx, # Original index in reading order
'confidence': item.get('score', 1.0)
}
# Special handling for tables
if mapped_type == ElementType.TABLE:
# Extract table structure if available
if 'res' in item and isinstance(item['res'], dict):
html_content = item['res'].get('html', '')
if html_content:
element['html'] = html_content
element['extracted_text'] = self._extract_text_from_html(html_content)
# Special handling for images/figures
elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE]:
# Save image if path provided
if 'img_path' in item and output_dir:
self._save_image(item['img_path'], output_dir, element['element_id'])
element['img_path'] = item['img_path']
# Add any additional metadata
if 'metadata' in item:
element['metadata'] = item['metadata']
elements.append(element)
logger.debug(f"Processed element {idx}: type={mapped_type}, bbox={bbox}")
return elements
def _process_markdown_fallback(
self,
page_result: Any,
current_page: int,
output_dir: Optional[Path]
) -> List[Dict[str, Any]]:
"""
Fallback to markdown processing if parsing_res_list not available.
Args:
page_result: PP-StructureV3 page result
current_page: Current page number
output_dir: Optional output directory
Returns:
List of elements extracted from markdown
"""
elements = []
# Extract from markdown if available
if hasattr(page_result, 'markdown'):
markdown_dict = page_result.markdown
if isinstance(markdown_dict, dict):
# Extract markdown texts
markdown_texts = markdown_dict.get('markdown_texts', '')
if markdown_texts:
# Detect if it's a table
is_table = '<table' in markdown_texts.lower()
element = {
'element_id': f"md_{current_page}_0",
'type': ElementType.TABLE if is_table else ElementType.TEXT,
'content': markdown_texts,
'page': current_page,
'bbox': [0, 0, 0, 0], # No bbox in markdown
'index': 0,
'from_markdown': True
}
if is_table:
element['extracted_text'] = self._extract_text_from_html(markdown_texts)
elements.append(element)
# Process images
markdown_images = markdown_dict.get('markdown_images', {})
for img_idx, (img_path, img_obj) in enumerate(markdown_images.items()):
# Save image
if output_dir and hasattr(img_obj, 'save'):
self._save_pil_image(img_obj, output_dir, f"md_img_{current_page}_{img_idx}")
# Try to extract bbox from filename
bbox = self._extract_bbox_from_filename(img_path)
element = {
'element_id': f"md_img_{current_page}_{img_idx}",
'type': ElementType.IMAGE,
'content': img_path,
'page': current_page,
'bbox': bbox,
'index': img_idx + 1,
'from_markdown': True
}
elements.append(element)
return elements
def _determine_reading_order(self, elements: List[Dict]) -> List[int]:
"""
Determine reading order based on element positions.
Args:
elements: List of elements with bbox
Returns:
List of indices representing reading order
"""
if not elements:
return []
# If elements have original indices, use them
if all('index' in elem for elem in elements):
# Sort by original index
indexed_elements = [(i, elem['index']) for i, elem in enumerate(elements)]
indexed_elements.sort(key=lambda x: x[1])
return [i for i, _ in indexed_elements]
# Otherwise, sort by position (top to bottom, left to right)
indexed_elements = []
for i, elem in enumerate(elements):
bbox = elem.get('bbox', [0, 0, 0, 0])
if len(bbox) >= 2:
# Use top-left corner for sorting
indexed_elements.append((i, bbox[1], bbox[0])) # (index, y, x)
else:
indexed_elements.append((i, 0, 0))
# Sort by y first (top to bottom), then x (left to right)
indexed_elements.sort(key=lambda x: (x[1], x[2]))
return [i for i, _, _ in indexed_elements]
def _count_element_types(self, elements: List[Dict]) -> Dict[str, int]:
"""
Count occurrences of each element type.
Args:
elements: List of elements
Returns:
Dictionary with element type counts
"""
type_counts = {}
for elem in elements:
elem_type = elem.get('type', ElementType.TEXT)
type_counts[elem_type] = type_counts.get(elem_type, 0) + 1
return type_counts
def _extract_text_from_html(self, html: str) -> str:
"""Extract plain text from HTML content."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text(separator=' ', strip=True)
except:
# Fallback: just remove HTML tags
import re
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _extract_bbox_from_filename(self, filename: str) -> List[int]:
"""Extract bbox from filename if it contains coordinate information."""
import re
match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', filename)
if match:
return list(map(int, match.groups()))
return [0, 0, 0, 0]
def _save_image(self, img_path: str, output_dir: Path, element_id: str):
"""Save image file to output directory."""
try:
# Implementation depends on how images are provided
pass
except Exception as e:
logger.warning(f"Failed to save image {img_path}: {e}")
def _save_pil_image(self, img_obj, output_dir: Path, element_id: str):
"""Save PIL image object to output directory."""
try:
img_dir = output_dir / "imgs"
img_dir.mkdir(parents=True, exist_ok=True)
img_path = img_dir / f"{element_id}.png"
img_obj.save(str(img_path))
logger.info(f"Saved image to {img_path}")
except Exception as e:
logger.warning(f"Failed to save PIL image: {e}")