- Enable PP-StructureV3's use_doc_orientation_classify feature - Detect rotation angle from doc_preprocessor_res.angle - Swap page dimensions (width <-> height) for 90°/270° rotations - Output PDF now correctly displays landscape-scanned content Also includes: - Archive completed openspec proposals - Add simplify-frontend-ocr-config proposal (pending) - Code cleanup and frontend simplification 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1323 lines
61 KiB
Python
1323 lines
61 KiB
Python
"""
|
||
Enhanced PP-StructureV3 processing with full element extraction
|
||
|
||
This module provides enhanced PP-StructureV3 processing that extracts all
|
||
23 element types with their bbox coordinates and reading order.
|
||
"""
|
||
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Any, TYPE_CHECKING
|
||
import json
|
||
import gc
|
||
|
||
# Import ScalingInfo for type checking (avoid circular imports at runtime)
|
||
if TYPE_CHECKING:
|
||
from app.services.layout_preprocessing_service import ScalingInfo
|
||
|
||
# Optional torch import for additional GPU memory management
|
||
try:
|
||
import torch
|
||
TORCH_AVAILABLE = True
|
||
except ImportError:
|
||
TORCH_AVAILABLE = False
|
||
|
||
import paddle
|
||
from paddleocr import PPStructureV3
|
||
from PIL import Image
|
||
import numpy as np
|
||
import cv2
|
||
from app.models.unified_document import ElementType
|
||
from app.core.config import settings
|
||
from app.services.memory_manager import prediction_context
|
||
from app.services.cv_table_detector import CVTableDetector
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class PPStructureEnhanced:
|
||
"""
|
||
Enhanced PP-StructureV3 processor that extracts all available element types
|
||
and structure information from parsing_res_list.
|
||
"""
|
||
|
||
# Mapping from PP-StructureV3 types to our ElementType
|
||
ELEMENT_TYPE_MAPPING = {
|
||
'title': ElementType.TITLE,
|
||
'paragraph_title': ElementType.TITLE, # PP-StructureV3 block_label
|
||
'text': ElementType.TEXT,
|
||
'paragraph': ElementType.PARAGRAPH,
|
||
'figure': ElementType.FIGURE,
|
||
'figure_caption': ElementType.CAPTION,
|
||
'table': ElementType.TABLE,
|
||
'table_caption': ElementType.TABLE_CAPTION,
|
||
'header': ElementType.HEADER,
|
||
'footer': ElementType.FOOTER,
|
||
'reference': ElementType.REFERENCE,
|
||
'equation': ElementType.EQUATION,
|
||
'formula': ElementType.FORMULA,
|
||
'list-item': ElementType.LIST_ITEM,
|
||
'list': ElementType.LIST,
|
||
'code': ElementType.CODE,
|
||
'footnote': ElementType.FOOTNOTE,
|
||
'page-number': ElementType.PAGE_NUMBER,
|
||
'watermark': ElementType.WATERMARK,
|
||
'signature': ElementType.SIGNATURE,
|
||
'stamp': ElementType.STAMP,
|
||
'seal': ElementType.STAMP, # PP-StructureV3 may use 'seal' label
|
||
'logo': ElementType.LOGO,
|
||
'barcode': ElementType.BARCODE,
|
||
'qr-code': ElementType.QR_CODE,
|
||
# Default fallback
|
||
'image': ElementType.IMAGE,
|
||
'chart': ElementType.CHART,
|
||
'diagram': ElementType.DIAGRAM,
|
||
}
|
||
|
||
def __init__(self, structure_engine: PPStructureV3):
|
||
"""
|
||
Initialize with existing PP-StructureV3 engine.
|
||
|
||
Args:
|
||
structure_engine: Initialized PPStructureV3 instance
|
||
"""
|
||
self.structure_engine = structure_engine
|
||
|
||
def analyze_with_full_structure(
|
||
self,
|
||
image_path: Path,
|
||
output_dir: Optional[Path] = None,
|
||
current_page: int = 0,
|
||
preprocessed_image: Optional[Image.Image] = None,
|
||
scaling_info: Optional['ScalingInfo'] = None,
|
||
save_visualization: bool = False,
|
||
use_cv_table_detection: bool = False,
|
||
raw_ocr_regions: Optional[List[Dict[str, Any]]] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Analyze document with full PP-StructureV3 capabilities.
|
||
|
||
Args:
|
||
image_path: Path to original image file (used for cropping extracted images)
|
||
output_dir: Optional output directory for saving extracted content
|
||
current_page: Current page number (0-based)
|
||
preprocessed_image: Optional preprocessed PIL Image for layout detection.
|
||
If provided, this is used for PP-Structure prediction,
|
||
but original image_path is still used for cropping images.
|
||
scaling_info: Optional ScalingInfo from preprocessing. If image was scaled
|
||
for layout detection, all bbox coordinates will be scaled back
|
||
to original image coordinates for proper cropping.
|
||
save_visualization: If True, save detection visualization images
|
||
(layout_det_res, layout_order_res, overall_ocr_res, etc.)
|
||
use_cv_table_detection: If True, use CV-based line detection for wired tables
|
||
instead of ML-based cell detection (RT-DETR-L)
|
||
raw_ocr_regions: Optional list of raw OCR text regions for table content
|
||
rebuilding. Used when PP-StructureV3's table HTML is incorrect.
|
||
|
||
Returns:
|
||
Dictionary with complete structure information including:
|
||
- elements: List of all detected elements with types and bbox (in original coords)
|
||
- reading_order: Reading order indices
|
||
- images: Extracted images with metadata
|
||
- tables: Extracted tables with structure
|
||
- visualization_dir: Path to visualization images (if save_visualization=True)
|
||
"""
|
||
try:
|
||
logger.info(f"Enhanced PP-StructureV3 analysis on {image_path.name}")
|
||
if preprocessed_image:
|
||
logger.info("Using preprocessed image for layout detection")
|
||
|
||
# Perform structure analysis with semaphore control
|
||
# This prevents OOM errors from multiple simultaneous predictions
|
||
with prediction_context(timeout=settings.service_acquire_timeout_seconds) as acquired:
|
||
if not acquired:
|
||
logger.error("Failed to acquire prediction slot (timeout), returning empty result")
|
||
return {
|
||
'has_parsing_res_list': False,
|
||
'elements': [],
|
||
'total_elements': 0,
|
||
'images': [],
|
||
'tables': [],
|
||
'element_types': {},
|
||
'error': 'Prediction slot timeout'
|
||
}
|
||
|
||
# Use preprocessed image if provided, otherwise use original path
|
||
if preprocessed_image is not None:
|
||
# Convert PIL to numpy array (BGR format for PP-Structure)
|
||
predict_input = np.array(preprocessed_image)
|
||
if len(predict_input.shape) == 3 and predict_input.shape[2] == 3:
|
||
# Convert RGB to BGR
|
||
predict_input = predict_input[:, :, ::-1]
|
||
results = self.structure_engine.predict(predict_input)
|
||
else:
|
||
results = self.structure_engine.predict(str(image_path))
|
||
|
||
all_elements = []
|
||
all_images = []
|
||
all_tables = []
|
||
visualization_dir = None
|
||
detected_rotation = "0" # Default: no rotation
|
||
|
||
# Process each page result
|
||
for page_idx, page_result in enumerate(results):
|
||
# Save visualization images if requested
|
||
if save_visualization and output_dir and hasattr(page_result, 'save_to_img'):
|
||
try:
|
||
vis_dir = output_dir / 'visualization'
|
||
vis_dir.mkdir(parents=True, exist_ok=True)
|
||
page_result.save_to_img(str(vis_dir))
|
||
visualization_dir = vis_dir
|
||
logger.info(f"Saved visualization images to {vis_dir}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to save visualization images: {e}")
|
||
|
||
# Try to access parsing_res_list and table_res_list (the complete structure)
|
||
parsing_res_list = None
|
||
table_res_list = None
|
||
result_dict = None
|
||
|
||
# Method 1: Direct access to json attribute (check both top-level and res)
|
||
if hasattr(page_result, 'json'):
|
||
result_json = page_result.json
|
||
if isinstance(result_json, dict):
|
||
result_dict = result_json
|
||
# Check top-level
|
||
if 'parsing_res_list' in result_json:
|
||
parsing_res_list = result_json['parsing_res_list']
|
||
logger.info(f"Found parsing_res_list at top level with {len(parsing_res_list)} elements")
|
||
# Check inside 'res' (new structure in paddlex)
|
||
elif 'res' in result_json and isinstance(result_json['res'], dict):
|
||
result_dict = result_json['res']
|
||
if 'parsing_res_list' in result_json['res']:
|
||
parsing_res_list = result_json['res']['parsing_res_list']
|
||
logger.info(f"Found parsing_res_list inside 'res' with {len(parsing_res_list)} elements")
|
||
|
||
# Method 2: Try direct dict access (LayoutParsingResultV2 inherits from dict)
|
||
elif isinstance(page_result, dict):
|
||
result_dict = page_result
|
||
if 'parsing_res_list' in page_result:
|
||
parsing_res_list = page_result['parsing_res_list']
|
||
logger.info(f"Found parsing_res_list via dict access with {len(parsing_res_list)} elements")
|
||
elif 'res' in page_result and isinstance(page_result['res'], dict):
|
||
result_dict = page_result['res']
|
||
if 'parsing_res_list' in page_result['res']:
|
||
parsing_res_list = page_result['res']['parsing_res_list']
|
||
logger.info(f"Found parsing_res_list inside page_result['res'] with {len(parsing_res_list)} elements")
|
||
|
||
# Method 3: Try to access as attribute
|
||
elif hasattr(page_result, 'parsing_res_list'):
|
||
parsing_res_list = page_result.parsing_res_list
|
||
logger.info(f"Found parsing_res_list attribute with {len(parsing_res_list)} elements")
|
||
if hasattr(page_result, '__dict__'):
|
||
result_dict = page_result.__dict__
|
||
|
||
# Method 4: Check if result has to_dict method
|
||
elif hasattr(page_result, 'to_dict'):
|
||
result_dict = page_result.to_dict()
|
||
if 'parsing_res_list' in result_dict:
|
||
parsing_res_list = result_dict['parsing_res_list']
|
||
logger.info(f"Found parsing_res_list in to_dict with {len(parsing_res_list)} elements")
|
||
elif 'res' in result_dict and isinstance(result_dict['res'], dict):
|
||
result_dict = result_dict['res']
|
||
if 'parsing_res_list' in result_dict:
|
||
parsing_res_list = result_dict['parsing_res_list']
|
||
logger.info(f"Found parsing_res_list in to_dict['res'] with {len(parsing_res_list)} elements")
|
||
|
||
# Extract table_res_list which contains cell_box_list
|
||
layout_det_res = None
|
||
overall_ocr_res = None
|
||
if result_dict:
|
||
if 'table_res_list' in result_dict:
|
||
table_res_list = result_dict['table_res_list']
|
||
logger.info(f"Found table_res_list with {len(table_res_list)} tables")
|
||
for i, tbl in enumerate(table_res_list):
|
||
if 'cell_box_list' in tbl:
|
||
logger.info(f" Table {i}: {len(tbl['cell_box_list'])} cell boxes")
|
||
|
||
# Extract layout_det_res for Image-in-Table processing
|
||
if 'layout_det_res' in result_dict:
|
||
layout_det_res = result_dict['layout_det_res']
|
||
logger.info(f"Found layout_det_res with {len(layout_det_res.get('boxes', []))} boxes")
|
||
|
||
# Extract overall_ocr_res for gap filling (avoid separate Raw OCR inference)
|
||
if 'overall_ocr_res' in result_dict:
|
||
overall_ocr_res = result_dict['overall_ocr_res']
|
||
ocr_count = len(overall_ocr_res.get('rec_texts', []))
|
||
logger.info(f"Found overall_ocr_res with {ocr_count} text regions")
|
||
|
||
# Extract doc_preprocessor_res for orientation detection
|
||
# When use_doc_orientation_classify=True, this contains the detected rotation angle
|
||
# Note: doc_preprocessor_res may be at top-level result_json OR inside 'res'
|
||
doc_preprocessor_res = None
|
||
|
||
# First, check result_dict (might be result_json['res'])
|
||
if 'doc_preprocessor_res' in result_dict:
|
||
doc_preprocessor_res = result_dict['doc_preprocessor_res']
|
||
logger.info("Found doc_preprocessor_res in result_dict")
|
||
# Also check top-level result_json if it exists and differs from result_dict
|
||
elif hasattr(page_result, 'json') and isinstance(page_result.json, dict):
|
||
if 'doc_preprocessor_res' in page_result.json:
|
||
doc_preprocessor_res = page_result.json['doc_preprocessor_res']
|
||
logger.info("Found doc_preprocessor_res at top-level result_json")
|
||
|
||
# Debug: Log available keys to help diagnose structure issues
|
||
if doc_preprocessor_res is None:
|
||
logger.warning(f"doc_preprocessor_res NOT found. result_dict keys: {list(result_dict.keys()) if result_dict else 'None'}")
|
||
if hasattr(page_result, 'json') and isinstance(page_result.json, dict):
|
||
logger.warning(f"result_json keys: {list(page_result.json.keys())}")
|
||
|
||
if doc_preprocessor_res:
|
||
# Debug: Log the complete structure of doc_preprocessor_res
|
||
logger.info(f"doc_preprocessor_res keys: {list(doc_preprocessor_res.keys()) if isinstance(doc_preprocessor_res, dict) else type(doc_preprocessor_res)}")
|
||
logger.info(f"doc_preprocessor_res content: {doc_preprocessor_res}")
|
||
|
||
# Try multiple possible key names for rotation info
|
||
# PaddleOCR may use different structures depending on version
|
||
label_names = doc_preprocessor_res.get('label_names', [])
|
||
class_ids = doc_preprocessor_res.get('class_ids', [])
|
||
labels = doc_preprocessor_res.get('labels', [])
|
||
angle = doc_preprocessor_res.get('angle', None)
|
||
|
||
# Determine rotation from available data
|
||
detected_rotation = "0"
|
||
if label_names:
|
||
detected_rotation = str(label_names[0])
|
||
elif class_ids:
|
||
# class_ids: 0=0°, 1=90°, 2=180°, 3=270°
|
||
rotation_map = {0: "0", 1: "90", 2: "180", 3: "270"}
|
||
detected_rotation = rotation_map.get(class_ids[0], "0")
|
||
elif labels:
|
||
detected_rotation = str(labels[0])
|
||
elif angle is not None:
|
||
detected_rotation = str(angle)
|
||
|
||
logger.info(f"Document orientation detected: {detected_rotation}° (label_names={label_names}, class_ids={class_ids}, labels={labels}, angle={angle})")
|
||
else:
|
||
detected_rotation = "0" # Default: no rotation
|
||
|
||
# Process parsing_res_list if found
|
||
if parsing_res_list:
|
||
elements = self._process_parsing_res_list(
|
||
parsing_res_list, current_page, output_dir, image_path, scaling_info,
|
||
table_res_list=table_res_list, # Pass table_res_list for cell_box_list
|
||
layout_det_res=layout_det_res, # Pass layout_det_res for Image-in-Table
|
||
use_cv_table_detection=use_cv_table_detection, # Use CV for wired tables
|
||
raw_ocr_regions=raw_ocr_regions # Pass raw OCR for table content rebuilding
|
||
)
|
||
all_elements.extend(elements)
|
||
|
||
# Extract tables and images from elements
|
||
table_bboxes = [] # Collect table bboxes for standalone image filtering
|
||
for elem in elements:
|
||
if elem['type'] == ElementType.TABLE:
|
||
all_tables.append(elem)
|
||
table_bboxes.append(elem.get('bbox', [0, 0, 0, 0]))
|
||
elif elem['type'] in [ElementType.IMAGE, ElementType.FIGURE]:
|
||
all_images.append(elem)
|
||
|
||
# Extract standalone images from layout_det_res (images NOT inside tables)
|
||
if layout_det_res and image_path and output_dir:
|
||
standalone_images = self._extract_standalone_images(
|
||
layout_det_res, table_bboxes, image_path, output_dir,
|
||
current_page, len(elements), scaling_info
|
||
)
|
||
if standalone_images:
|
||
all_elements.extend(standalone_images)
|
||
all_images.extend(standalone_images)
|
||
logger.info(f"Extracted {len(standalone_images)} standalone images from layout_det_res")
|
||
else:
|
||
# Fallback to markdown if parsing_res_list not available
|
||
logger.warning("parsing_res_list not found, falling back to markdown")
|
||
elements = self._process_markdown_fallback(
|
||
page_result, current_page, output_dir
|
||
)
|
||
all_elements.extend(elements)
|
||
|
||
# Create reading order based on element positions
|
||
reading_order = self._determine_reading_order(all_elements)
|
||
|
||
result = {
|
||
'elements': all_elements,
|
||
'total_elements': len(all_elements),
|
||
'reading_order': reading_order,
|
||
'tables': all_tables,
|
||
'images': all_images,
|
||
'element_types': self._count_element_types(all_elements),
|
||
'has_parsing_res_list': parsing_res_list is not None,
|
||
'detected_rotation': detected_rotation # Document orientation: "0", "90", "180", "270"
|
||
}
|
||
|
||
# Add visualization directory if available
|
||
if visualization_dir:
|
||
result['visualization_dir'] = str(visualization_dir)
|
||
|
||
# Add overall_ocr_res for gap filling (converted to standard format)
|
||
# This allows gap_filling_service to use PP-StructureV3's internal OCR
|
||
# instead of running a separate Raw OCR inference
|
||
if overall_ocr_res:
|
||
result['overall_ocr_res'] = self._convert_overall_ocr_to_regions(
|
||
overall_ocr_res, scaling_info
|
||
)
|
||
logger.info(f"Converted {len(result['overall_ocr_res'])} OCR regions from overall_ocr_res")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Enhanced PP-StructureV3 analysis error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# Clean up GPU memory on error
|
||
try:
|
||
if TORCH_AVAILABLE and torch.cuda.is_available():
|
||
torch.cuda.empty_cache()
|
||
torch.cuda.synchronize()
|
||
if paddle.device.is_compiled_with_cuda():
|
||
paddle.device.cuda.empty_cache()
|
||
gc.collect()
|
||
except:
|
||
pass # Ignore cleanup errors
|
||
|
||
return {
|
||
'elements': [],
|
||
'total_elements': 0,
|
||
'reading_order': [],
|
||
'tables': [],
|
||
'images': [],
|
||
'element_types': {},
|
||
'has_parsing_res_list': False,
|
||
'error': str(e)
|
||
}
|
||
|
||
def _process_parsing_res_list(
|
||
self,
|
||
parsing_res_list: List[Dict],
|
||
current_page: int,
|
||
output_dir: Optional[Path],
|
||
source_image_path: Optional[Path] = None,
|
||
scaling_info: Optional['ScalingInfo'] = None,
|
||
table_res_list: Optional[List[Dict]] = None,
|
||
layout_det_res: Optional[Dict] = None,
|
||
use_cv_table_detection: bool = False,
|
||
raw_ocr_regions: Optional[List[Dict[str, Any]]] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Process parsing_res_list to extract all elements.
|
||
|
||
Args:
|
||
parsing_res_list: List of parsed elements from PP-StructureV3
|
||
scaling_info: Scaling information for bbox coordinate restoration
|
||
current_page: Current page number
|
||
output_dir: Optional output directory
|
||
source_image_path: Path to source image for cropping image regions
|
||
table_res_list: Optional list of table results containing cell_box_list
|
||
layout_det_res: Optional layout detection result for Image-in-Table processing
|
||
use_cv_table_detection: If True, use CV line detection for wired tables
|
||
raw_ocr_regions: Optional list of raw OCR text regions for table content rebuilding
|
||
|
||
Returns:
|
||
List of processed elements with normalized structure
|
||
"""
|
||
elements = []
|
||
|
||
for idx, item in enumerate(parsing_res_list):
|
||
# Debug: log the structure of the first item
|
||
if idx == 0:
|
||
logger.info(f"First parsing_res_list item structure: {list(item.keys()) if isinstance(item, dict) else type(item)}")
|
||
logger.info(f"First parsing_res_list item sample: {str(item)[:500]}")
|
||
|
||
# Extract element type (check both 'type' and 'block_label')
|
||
element_type = item.get('type', '') or item.get('block_label', 'text')
|
||
element_type = element_type.lower()
|
||
mapped_type = self.ELEMENT_TYPE_MAPPING.get(
|
||
element_type, ElementType.TEXT
|
||
)
|
||
|
||
# Extract bbox (check multiple possible keys)
|
||
layout_bbox = (
|
||
item.get('layout_bbox', []) or
|
||
item.get('block_bbox', []) or
|
||
item.get('bbox', [])
|
||
)
|
||
|
||
# Ensure bbox has 4 values
|
||
if len(layout_bbox) >= 4:
|
||
bbox = list(layout_bbox[:4]) # [x1, y1, x2, y2]
|
||
else:
|
||
bbox = [0, 0, 0, 0] # Default if bbox missing
|
||
logger.warning(f"Element {idx} has invalid bbox: {layout_bbox}")
|
||
|
||
# Scale bbox back to original image coordinates if image was scaled
|
||
# This is critical for proper cropping from original high-resolution image
|
||
if scaling_info and scaling_info.was_scaled and bbox != [0, 0, 0, 0]:
|
||
scale_factor = scaling_info.scale_factor
|
||
bbox = [
|
||
bbox[0] * scale_factor, # x1
|
||
bbox[1] * scale_factor, # y1
|
||
bbox[2] * scale_factor, # x2
|
||
bbox[3] * scale_factor # y2
|
||
]
|
||
if idx == 0: # Log only for first element to avoid spam
|
||
logger.info(
|
||
f"Scaled bbox to original coords: "
|
||
f"{[round(x, 1) for x in layout_bbox[:4]]} -> {[round(x, 1) for x in bbox]} "
|
||
f"(factor={scale_factor:.3f})"
|
||
)
|
||
|
||
# Extract content (check multiple possible keys)
|
||
content = (
|
||
item.get('content', '') or
|
||
item.get('block_content', '') or
|
||
''
|
||
)
|
||
|
||
# Additional fallback for content in 'res' field
|
||
if not content and 'res' in item:
|
||
res = item.get('res', {})
|
||
if isinstance(res, dict):
|
||
content = res.get('content', '') or res.get('text', '')
|
||
elif isinstance(res, str):
|
||
content = res
|
||
|
||
# Content-based HTML table detection: PP-StructureV3 sometimes
|
||
# classifies tables as 'text' but returns HTML table content
|
||
html_table_content = None
|
||
if content and '<table' in content.lower():
|
||
if mapped_type == ElementType.TEXT or element_type == 'text':
|
||
logger.info(f"Element {idx}: Detected HTML table content in 'text' type, reclassifying to TABLE")
|
||
mapped_type = ElementType.TABLE
|
||
html_table_content = content # Store for later use
|
||
|
||
# Strip LaTeX math formatting from text content (PP-Structure formula detection)
|
||
if content and mapped_type in [ElementType.TEXT, ElementType.TITLE, ElementType.HEADER]:
|
||
if '$' in content and '\\' in content:
|
||
content = self._strip_latex_math(content)
|
||
|
||
# Create element
|
||
element = {
|
||
'element_id': f"pp3_{current_page}_{idx}",
|
||
'type': mapped_type,
|
||
'original_type': element_type,
|
||
'content': content,
|
||
'page': current_page,
|
||
'bbox': bbox, # [x1, y1, x2, y2]
|
||
'index': idx, # Original index in reading order
|
||
'confidence': item.get('score', 1.0)
|
||
}
|
||
|
||
# Special handling for tables
|
||
if mapped_type == ElementType.TABLE:
|
||
# 1. 提取 HTML (原有邏輯)
|
||
html_content = html_table_content
|
||
res_data = {}
|
||
|
||
# 獲取 res 字典 (包含 html 和 boxes)
|
||
if 'res' in item and isinstance(item['res'], dict):
|
||
res_data = item['res']
|
||
logger.info(f"[TABLE] Found 'res' dict with keys: {list(res_data.keys())}")
|
||
if not html_content:
|
||
html_content = res_data.get('html', '')
|
||
else:
|
||
logger.info(f"[TABLE] No 'res' key in item. Available keys: {list(item.keys())}")
|
||
|
||
if html_content:
|
||
element['html'] = html_content
|
||
element['extracted_text'] = self._extract_text_from_html(html_content)
|
||
|
||
# 2. 提取 Cell 座標 (boxes)
|
||
# 優先順序: table_res_list > res_data['boxes'] > SLANeXt 補充
|
||
cell_boxes_extracted = False
|
||
|
||
# First, try to get cell_box_list from table_res_list (pp_demo style)
|
||
if table_res_list and not cell_boxes_extracted:
|
||
# Match table by HTML content or find closest bbox
|
||
for tbl_res in table_res_list:
|
||
if 'cell_box_list' in tbl_res and tbl_res['cell_box_list']:
|
||
# Check if HTML matches
|
||
tbl_html = tbl_res.get('pred_html', '')
|
||
if html_content and tbl_html:
|
||
# Simple check: if both have same structure
|
||
if tbl_html[:100] == html_content[:100]:
|
||
cell_boxes = tbl_res['cell_box_list']
|
||
# cell_box_list is already in absolute coordinates
|
||
element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes]
|
||
element['cell_boxes_source'] = 'table_res_list'
|
||
cell_boxes_extracted = True
|
||
logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes from table_res_list (HTML match)")
|
||
break
|
||
|
||
# If no HTML match, find best matching table_res by bbox overlap
|
||
if not cell_boxes_extracted:
|
||
best_match = None
|
||
best_overlap = 0.0
|
||
|
||
for tbl_res in table_res_list:
|
||
if 'cell_box_list' not in tbl_res or not tbl_res['cell_box_list']:
|
||
continue
|
||
|
||
# Get table_res bbox from its cell_box_list
|
||
cell_boxes_temp = tbl_res['cell_box_list']
|
||
if not cell_boxes_temp:
|
||
continue
|
||
|
||
# Calculate bounding box of all cells
|
||
tbl_x1 = min(cb[0] for cb in cell_boxes_temp)
|
||
tbl_y1 = min(cb[1] for cb in cell_boxes_temp)
|
||
tbl_x2 = max(cb[2] for cb in cell_boxes_temp)
|
||
tbl_y2 = max(cb[3] for cb in cell_boxes_temp)
|
||
|
||
# Calculate IoU (Intersection over Union) with element bbox
|
||
# bbox is [x1, y1, x2, y2]
|
||
elem_x1, elem_y1, elem_x2, elem_y2 = bbox[0], bbox[1], bbox[2], bbox[3]
|
||
|
||
# Intersection
|
||
inter_x1 = max(tbl_x1, elem_x1)
|
||
inter_y1 = max(tbl_y1, elem_y1)
|
||
inter_x2 = min(tbl_x2, elem_x2)
|
||
inter_y2 = min(tbl_y2, elem_y2)
|
||
|
||
if inter_x1 < inter_x2 and inter_y1 < inter_y2:
|
||
inter_area = (inter_x2 - inter_x1) * (inter_y2 - inter_y1)
|
||
elem_area = (elem_x2 - elem_x1) * (elem_y2 - elem_y1)
|
||
tbl_area = (tbl_x2 - tbl_x1) * (tbl_y2 - tbl_y1)
|
||
|
||
# Use overlap ratio with element bbox (how much of element is covered)
|
||
overlap_ratio = inter_area / elem_area if elem_area > 0 else 0
|
||
|
||
if overlap_ratio > best_overlap:
|
||
best_overlap = overlap_ratio
|
||
best_match = tbl_res
|
||
|
||
# Use best match if overlap is significant (>10%)
|
||
if best_match and best_overlap > 0.1:
|
||
cell_boxes = best_match['cell_box_list']
|
||
element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes]
|
||
element['cell_boxes_source'] = 'table_res_list'
|
||
cell_boxes_extracted = True
|
||
logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes from table_res_list (bbox match, overlap={best_overlap:.2f})")
|
||
|
||
# Extract pred_html if not already set
|
||
if not html_content and 'pred_html' in best_match:
|
||
html_content = best_match['pred_html']
|
||
element['html'] = html_content
|
||
element['extracted_text'] = self._extract_text_from_html(html_content)
|
||
logger.info(f"[TABLE] Extracted HTML from table_res_list (bbox match, {len(html_content)} chars)")
|
||
|
||
# Remove used table_res to avoid reuse
|
||
table_res_list.remove(best_match)
|
||
elif table_res_list:
|
||
# Fallback to first available if no bbox match found
|
||
for tbl_res in table_res_list:
|
||
if 'cell_box_list' in tbl_res and tbl_res['cell_box_list']:
|
||
cell_boxes = tbl_res['cell_box_list']
|
||
element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes]
|
||
element['cell_boxes_source'] = 'table_res_list'
|
||
cell_boxes_extracted = True
|
||
logger.warning(f"[TABLE] Using first available table_res (no bbox match, {len(cell_boxes)} cells)")
|
||
|
||
# Extract pred_html if not already set
|
||
if not html_content and 'pred_html' in tbl_res:
|
||
html_content = tbl_res['pred_html']
|
||
element['html'] = html_content
|
||
element['extracted_text'] = self._extract_text_from_html(html_content)
|
||
logger.info(f"[TABLE] Extracted HTML from table_res_list (fallback, {len(html_content)} chars)")
|
||
|
||
table_res_list.remove(tbl_res)
|
||
break
|
||
|
||
if not cell_boxes_extracted and 'boxes' in res_data:
|
||
# PPStructureV3 returned cell boxes in res (unlikely in PaddleX 3.x)
|
||
cell_boxes = res_data['boxes']
|
||
logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes in res_data")
|
||
|
||
# 獲取表格自身的偏移量 (用於將 Cell 的相對座標轉為絕對座標)
|
||
table_x, table_y = 0, 0
|
||
if len(bbox) >= 2: # bbox is [x1, y1, x2, y2]
|
||
table_x, table_y = bbox[0], bbox[1]
|
||
|
||
processed_cells = []
|
||
for cell_box in cell_boxes:
|
||
# 確保格式正確
|
||
if isinstance(cell_box, (list, tuple)) and len(cell_box) >= 4:
|
||
# 轉換為絕對座標: Cell x + 表格 x
|
||
abs_cell_box = [
|
||
cell_box[0] + table_x,
|
||
cell_box[1] + table_y,
|
||
cell_box[2] + table_x,
|
||
cell_box[3] + table_y
|
||
]
|
||
processed_cells.append(abs_cell_box)
|
||
|
||
# 將處理後的 Cell 座標存入 element
|
||
element['cell_boxes'] = processed_cells
|
||
element['raw_cell_boxes'] = cell_boxes
|
||
element['cell_boxes_source'] = 'ppstructure'
|
||
logger.info(f"[TABLE] Processed {len(processed_cells)} cell boxes with table offset ({table_x}, {table_y})")
|
||
cell_boxes_extracted = True
|
||
|
||
if not cell_boxes_extracted:
|
||
logger.info(f"[TABLE] No cell boxes available. PPStructureV3 keys: {list(res_data.keys()) if res_data else 'empty'}")
|
||
|
||
# 2.5 CV-based table line detection for wired tables
|
||
if use_cv_table_detection and source_image_path and source_image_path.exists():
|
||
try:
|
||
# Load image for CV processing
|
||
cv_image = cv2.imread(str(source_image_path))
|
||
if cv_image is not None:
|
||
cv_detector = CVTableDetector()
|
||
ml_cell_boxes = element.get('cell_boxes', [])
|
||
|
||
# Detect cells using CV line detection
|
||
cv_cells = cv_detector.detect_and_merge_with_ml(
|
||
cv_image,
|
||
bbox, # Table bbox
|
||
ml_cell_boxes
|
||
)
|
||
|
||
if cv_cells:
|
||
# Apply scaling if needed
|
||
if scaling_info and scaling_info.was_scaled:
|
||
cv_cells = [
|
||
[
|
||
c[0] * scaling_info.scale_x,
|
||
c[1] * scaling_info.scale_y,
|
||
c[2] * scaling_info.scale_x,
|
||
c[3] * scaling_info.scale_y
|
||
]
|
||
for c in cv_cells
|
||
]
|
||
|
||
element['cell_boxes'] = cv_cells
|
||
element['cell_boxes_source'] = 'cv_line_detection'
|
||
logger.info(f"[TABLE] CV line detection found {len(cv_cells)} cells (ML had {len(ml_cell_boxes)})")
|
||
except Exception as cv_error:
|
||
logger.warning(f"[TABLE] CV line detection failed: {cv_error}")
|
||
|
||
# 3. Image-in-Table 處理:檢測並嵌入表格內的圖片
|
||
if layout_det_res and source_image_path and output_dir:
|
||
embedded_images = self._embed_images_in_table(
|
||
element, bbox, layout_det_res, source_image_path, output_dir
|
||
)
|
||
if embedded_images:
|
||
element['embedded_images'] = embedded_images
|
||
logger.info(f"[TABLE] Embedded {len(embedded_images)} images into table")
|
||
|
||
# Special handling for images/figures/charts/stamps (visual elements that need cropping)
|
||
elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART, ElementType.DIAGRAM, ElementType.STAMP, ElementType.LOGO]:
|
||
# Save image if path provided
|
||
if 'img_path' in item and output_dir:
|
||
saved_path = self._save_image(item['img_path'], output_dir, element['element_id'])
|
||
if saved_path:
|
||
element['saved_path'] = saved_path
|
||
element['img_path'] = item['img_path'] # Keep original for reference
|
||
else:
|
||
logger.warning(f"Failed to save image for element {element['element_id']}")
|
||
# Crop image from source if no img_path but source image is available
|
||
elif source_image_path and output_dir and bbox != [0, 0, 0, 0]:
|
||
cropped_path = self._crop_and_save_image(
|
||
source_image_path, bbox, output_dir, element['element_id']
|
||
)
|
||
if cropped_path:
|
||
element['saved_path'] = cropped_path
|
||
element['img_path'] = cropped_path
|
||
logger.info(f"Cropped and saved image region for {element['element_id']}")
|
||
else:
|
||
logger.warning(f"Failed to crop image for element {element['element_id']}")
|
||
|
||
# Add any additional metadata
|
||
if 'metadata' in item:
|
||
element['metadata'] = item['metadata']
|
||
|
||
elements.append(element)
|
||
logger.debug(f"Processed element {idx}: type={mapped_type}, bbox={bbox}")
|
||
|
||
return elements
|
||
|
||
def _embed_images_in_table(
|
||
self,
|
||
table_element: Dict[str, Any],
|
||
table_bbox: List[float],
|
||
layout_det_res: Dict,
|
||
source_image_path: Path,
|
||
output_dir: Path
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Detect and embed images that are inside a table region.
|
||
|
||
This handles the case where layout detection finds an image inside a table,
|
||
similar to how pp_demo embeds images in table HTML.
|
||
|
||
Args:
|
||
table_element: The table element being processed
|
||
table_bbox: Table bounding box [x1, y1, x2, y2]
|
||
layout_det_res: Layout detection result containing all detected boxes
|
||
source_image_path: Path to source image for cropping
|
||
output_dir: Output directory for saving cropped images
|
||
|
||
Returns:
|
||
List of embedded image info dicts with 'bbox', 'saved_path', 'html_tag'
|
||
"""
|
||
embedded_images = []
|
||
|
||
try:
|
||
boxes = layout_det_res.get('boxes', [])
|
||
table_x1, table_y1, table_x2, table_y2 = table_bbox
|
||
|
||
for box in boxes:
|
||
label = box.get('label', '').lower()
|
||
if label != 'image':
|
||
continue
|
||
|
||
# Get image bbox
|
||
img_coord = box.get('coordinate', [])
|
||
if len(img_coord) < 4:
|
||
continue
|
||
|
||
img_x1, img_y1, img_x2, img_y2 = img_coord[:4]
|
||
|
||
# Check if image is inside table (with some tolerance)
|
||
tolerance = 5 # pixels
|
||
if (img_x1 >= table_x1 - tolerance and
|
||
img_y1 >= table_y1 - tolerance and
|
||
img_x2 <= table_x2 + tolerance and
|
||
img_y2 <= table_y2 + tolerance):
|
||
|
||
logger.info(f"[IMAGE-IN-TABLE] Found image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}] inside table")
|
||
|
||
# Crop and save the image
|
||
img_element_id = f"img_in_table_{int(img_x1)}_{int(img_y1)}_{int(img_x2)}_{int(img_y2)}"
|
||
cropped_path = self._crop_and_save_image(
|
||
source_image_path,
|
||
[img_x1, img_y1, img_x2, img_y2],
|
||
output_dir,
|
||
img_element_id
|
||
)
|
||
|
||
if cropped_path:
|
||
# Create relative path for HTML embedding
|
||
rel_path = f"imgs/{Path(cropped_path).name}"
|
||
|
||
# Create img tag similar to pp_demo
|
||
img_html = f'<div style="text-align: center;"><img src="{rel_path}" alt="Image" /></div>'
|
||
|
||
embedded_image = {
|
||
'bbox': [img_x1, img_y1, img_x2, img_y2],
|
||
'saved_path': str(cropped_path),
|
||
'relative_path': rel_path,
|
||
'html_tag': img_html,
|
||
'element_id': img_element_id
|
||
}
|
||
embedded_images.append(embedded_image)
|
||
|
||
# Try to insert image into HTML content
|
||
if 'html' in table_element and table_element['html']:
|
||
# Insert image reference at the end of HTML before </table>
|
||
original_html = table_element['html']
|
||
if '</tbody>' in original_html:
|
||
# Insert before </tbody> in a new row
|
||
new_html = original_html.replace(
|
||
'</tbody>',
|
||
f'<tr><td colspan="99" style="text-align:center;"><img src="{rel_path}" alt="Embedded Image" /></td></tr></tbody>'
|
||
)
|
||
table_element['html'] = new_html
|
||
logger.info(f"[IMAGE-IN-TABLE] Embedded image into table HTML")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[IMAGE-IN-TABLE] Error processing images in table: {e}")
|
||
|
||
return embedded_images
|
||
|
||
def _extract_standalone_images(
|
||
self,
|
||
layout_det_res: Dict,
|
||
table_bboxes: List[List[float]],
|
||
source_image_path: Path,
|
||
output_dir: Path,
|
||
current_page: int,
|
||
start_index: int,
|
||
scaling_info: Optional['ScalingInfo'] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Extract standalone images from layout_det_res that are NOT inside tables.
|
||
|
||
This handles images that PP-StructureV3 detects in layout_det_res but
|
||
doesn't include in parsing_res_list (non-table images).
|
||
|
||
Args:
|
||
layout_det_res: Layout detection result containing all detected boxes
|
||
table_bboxes: List of table bounding boxes to exclude images inside tables
|
||
source_image_path: Path to source image for cropping
|
||
output_dir: Output directory for saving cropped images
|
||
current_page: Current page number
|
||
start_index: Starting index for element IDs
|
||
scaling_info: Optional scaling info for coordinate restoration
|
||
|
||
Returns:
|
||
List of standalone image elements
|
||
"""
|
||
standalone_images = []
|
||
|
||
try:
|
||
boxes = layout_det_res.get('boxes', [])
|
||
logger.info(f"[STANDALONE-IMAGE] Checking {len(boxes)} boxes for standalone images")
|
||
|
||
for box_idx, box in enumerate(boxes):
|
||
label = box.get('label', '').lower()
|
||
if label != 'image':
|
||
continue
|
||
|
||
# Get image bbox
|
||
img_coord = box.get('coordinate', [])
|
||
if len(img_coord) < 4:
|
||
continue
|
||
|
||
img_x1, img_y1, img_x2, img_y2 = img_coord[:4]
|
||
|
||
# Check if image is inside any table (skip if so)
|
||
is_inside_table = False
|
||
for table_bbox in table_bboxes:
|
||
if len(table_bbox) < 4:
|
||
continue
|
||
tx1, ty1, tx2, ty2 = table_bbox[:4]
|
||
tolerance = 5 # pixels
|
||
if (img_x1 >= tx1 - tolerance and
|
||
img_y1 >= ty1 - tolerance and
|
||
img_x2 <= tx2 + tolerance and
|
||
img_y2 <= ty2 + tolerance):
|
||
is_inside_table = True
|
||
logger.debug(f"[STANDALONE-IMAGE] Image at [{int(img_x1)},{int(img_y1)}] is inside table, skipping")
|
||
break
|
||
|
||
if is_inside_table:
|
||
continue
|
||
|
||
# Scale bbox back to original coordinates if needed
|
||
if scaling_info and scaling_info.was_scaled:
|
||
scale_factor = scaling_info.scale_factor
|
||
img_x1 *= scale_factor
|
||
img_y1 *= scale_factor
|
||
img_x2 *= scale_factor
|
||
img_y2 *= scale_factor
|
||
logger.debug(f"[STANDALONE-IMAGE] Scaled bbox by {scale_factor:.3f}")
|
||
|
||
logger.info(f"[STANDALONE-IMAGE] Found standalone image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}]")
|
||
|
||
# Crop and save the image
|
||
element_idx = start_index + len(standalone_images)
|
||
img_element_id = f"standalone_img_{current_page}_{element_idx}"
|
||
cropped_path = self._crop_and_save_image(
|
||
source_image_path,
|
||
[img_x1, img_y1, img_x2, img_y2],
|
||
output_dir,
|
||
img_element_id
|
||
)
|
||
|
||
if cropped_path:
|
||
element = {
|
||
'element_id': img_element_id,
|
||
'type': ElementType.IMAGE,
|
||
'original_type': 'image',
|
||
'content': '',
|
||
'page': current_page,
|
||
'bbox': [img_x1, img_y1, img_x2, img_y2],
|
||
'index': element_idx,
|
||
'confidence': box.get('score', 1.0),
|
||
'saved_path': cropped_path,
|
||
'img_path': cropped_path,
|
||
'source': 'layout_det_res'
|
||
}
|
||
standalone_images.append(element)
|
||
logger.info(f"[STANDALONE-IMAGE] Extracted and saved: {cropped_path}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[STANDALONE-IMAGE] Error extracting standalone images: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
return standalone_images
|
||
|
||
def _process_markdown_fallback(
|
||
self,
|
||
page_result: Any,
|
||
current_page: int,
|
||
output_dir: Optional[Path]
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Fallback to markdown processing if parsing_res_list not available.
|
||
|
||
Args:
|
||
page_result: PP-StructureV3 page result
|
||
current_page: Current page number
|
||
output_dir: Optional output directory
|
||
|
||
Returns:
|
||
List of elements extracted from markdown
|
||
"""
|
||
elements = []
|
||
|
||
# Extract from markdown if available
|
||
if hasattr(page_result, 'markdown'):
|
||
markdown_dict = page_result.markdown
|
||
|
||
if isinstance(markdown_dict, dict):
|
||
# Extract markdown texts
|
||
markdown_texts = markdown_dict.get('markdown_texts', '')
|
||
if markdown_texts:
|
||
# Detect if it's a table
|
||
is_table = '<table' in markdown_texts.lower()
|
||
|
||
element = {
|
||
'element_id': f"md_{current_page}_0",
|
||
'type': ElementType.TABLE if is_table else ElementType.TEXT,
|
||
'content': markdown_texts,
|
||
'page': current_page,
|
||
'bbox': [0, 0, 0, 0], # No bbox in markdown
|
||
'index': 0,
|
||
'from_markdown': True
|
||
}
|
||
|
||
if is_table:
|
||
element['extracted_text'] = self._extract_text_from_html(markdown_texts)
|
||
|
||
elements.append(element)
|
||
|
||
# Process images
|
||
markdown_images = markdown_dict.get('markdown_images', {})
|
||
for img_idx, (img_path, img_obj) in enumerate(markdown_images.items()):
|
||
# Save image
|
||
if output_dir and hasattr(img_obj, 'save'):
|
||
self._save_pil_image(img_obj, output_dir, f"md_img_{current_page}_{img_idx}")
|
||
|
||
# Try to extract bbox from filename
|
||
bbox = self._extract_bbox_from_filename(img_path)
|
||
|
||
element = {
|
||
'element_id': f"md_img_{current_page}_{img_idx}",
|
||
'type': ElementType.IMAGE,
|
||
'content': img_path,
|
||
'page': current_page,
|
||
'bbox': bbox,
|
||
'index': img_idx + 1,
|
||
'from_markdown': True
|
||
}
|
||
elements.append(element)
|
||
|
||
return elements
|
||
|
||
def _determine_reading_order(self, elements: List[Dict]) -> List[int]:
|
||
"""
|
||
Determine reading order based on element positions.
|
||
|
||
Args:
|
||
elements: List of elements with bbox
|
||
|
||
Returns:
|
||
List of indices representing reading order
|
||
"""
|
||
if not elements:
|
||
return []
|
||
|
||
# If elements have original indices, use them
|
||
if all('index' in elem for elem in elements):
|
||
# Sort by original index
|
||
indexed_elements = [(i, elem['index']) for i, elem in enumerate(elements)]
|
||
indexed_elements.sort(key=lambda x: x[1])
|
||
return [i for i, _ in indexed_elements]
|
||
|
||
# Otherwise, sort by position (top to bottom, left to right)
|
||
indexed_elements = []
|
||
for i, elem in enumerate(elements):
|
||
bbox = elem.get('bbox', [0, 0, 0, 0])
|
||
if len(bbox) >= 2:
|
||
# Use top-left corner for sorting
|
||
indexed_elements.append((i, bbox[1], bbox[0])) # (index, y, x)
|
||
else:
|
||
indexed_elements.append((i, 0, 0))
|
||
|
||
# Sort by y first (top to bottom), then x (left to right)
|
||
indexed_elements.sort(key=lambda x: (x[1], x[2]))
|
||
|
||
return [i for i, _, _ in indexed_elements]
|
||
|
||
def _count_element_types(self, elements: List[Dict]) -> Dict[str, int]:
|
||
"""
|
||
Count occurrences of each element type.
|
||
|
||
Args:
|
||
elements: List of elements
|
||
|
||
Returns:
|
||
Dictionary with element type counts
|
||
"""
|
||
type_counts = {}
|
||
for elem in elements:
|
||
elem_type = elem.get('type', ElementType.TEXT)
|
||
type_counts[elem_type] = type_counts.get(elem_type, 0) + 1
|
||
return type_counts
|
||
|
||
def _convert_overall_ocr_to_regions(
|
||
self,
|
||
overall_ocr_res: Dict[str, Any],
|
||
scaling_info: Optional['ScalingInfo'] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Convert PP-StructureV3's overall_ocr_res to standard OCR region format.
|
||
|
||
This allows gap_filling_service to use PP-StructureV3's internal OCR results
|
||
instead of running a separate Raw OCR inference, saving approximately 50%
|
||
of total inference time.
|
||
|
||
The overall_ocr_res structure:
|
||
- dt_polys: List of polygon coordinates [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
|
||
- rec_texts: List of recognized text strings
|
||
- rec_scores: List of confidence scores
|
||
|
||
Args:
|
||
overall_ocr_res: Dictionary containing OCR results from PP-StructureV3
|
||
scaling_info: Optional scaling info for coordinate restoration
|
||
|
||
Returns:
|
||
List of OCR region dictionaries in standard format:
|
||
[{'text': str, 'bbox': [[x1,y1],...], 'confidence': float}, ...]
|
||
"""
|
||
regions = []
|
||
|
||
dt_polys = overall_ocr_res.get('dt_polys', [])
|
||
rec_texts = overall_ocr_res.get('rec_texts', [])
|
||
rec_scores = overall_ocr_res.get('rec_scores', [])
|
||
|
||
# Ensure all lists have the same length
|
||
num_regions = min(len(dt_polys), len(rec_texts))
|
||
if len(rec_scores) < num_regions:
|
||
# Pad with default confidence if scores are missing
|
||
rec_scores = list(rec_scores) + [0.9] * (num_regions - len(rec_scores))
|
||
|
||
for i in range(num_regions):
|
||
text = rec_texts[i]
|
||
if not text or not text.strip():
|
||
continue
|
||
|
||
poly = dt_polys[i]
|
||
confidence = rec_scores[i] if i < len(rec_scores) else 0.9
|
||
|
||
# Apply scaling restoration if needed
|
||
if scaling_info and hasattr(scaling_info, 'scale_factor') and scaling_info.scale_factor != 1.0:
|
||
scale = scaling_info.scale_factor
|
||
poly = [[pt[0] / scale, pt[1] / scale] for pt in poly]
|
||
|
||
regions.append({
|
||
'text': text,
|
||
'bbox': poly, # Keep polygon format for compatibility
|
||
'confidence': confidence
|
||
})
|
||
|
||
return regions
|
||
|
||
def _extract_text_from_html(self, html: str) -> str:
|
||
"""Extract plain text from HTML content."""
|
||
try:
|
||
from bs4 import BeautifulSoup
|
||
soup = BeautifulSoup(html, 'html.parser')
|
||
text = soup.get_text(separator=' ', strip=True)
|
||
except:
|
||
# Fallback: just remove HTML tags
|
||
import re
|
||
text = re.sub(r'<[^>]+>', ' ', html)
|
||
text = re.sub(r'\s+', ' ', text)
|
||
text = text.strip()
|
||
|
||
# Strip LaTeX math formatting if present
|
||
return self._strip_latex_math(text)
|
||
|
||
def _strip_latex_math(self, text: str) -> str:
|
||
"""
|
||
Convert LaTeX math notation to plain text.
|
||
|
||
PP-StructureV3 outputs formulas in LaTeX format like:
|
||
$N\\cdot m\\times8.851=|b\\cdot|$
|
||
|
||
This converts them to readable plain text.
|
||
"""
|
||
import re
|
||
|
||
if not text or '$' not in text:
|
||
return text
|
||
|
||
# Remove $...$ delimiters but keep content
|
||
text = re.sub(r'\$([^$]+)\$', r'\1', text)
|
||
|
||
# Convert common LaTeX math commands to plain text
|
||
replacements = [
|
||
(r'\\cdot', '·'), # Multiplication dot
|
||
(r'\\times', '×'), # Multiplication sign
|
||
(r'\\div', '÷'), # Division sign
|
||
(r'\\pm', '±'), # Plus-minus
|
||
(r'\\leq', '≤'), # Less than or equal
|
||
(r'\\geq', '≥'), # Greater than or equal
|
||
(r'\\neq', '≠'), # Not equal
|
||
(r'\\approx', '≈'), # Approximately equal
|
||
(r'\\circ', '°'), # Degree symbol
|
||
(r'\\degree', '°'), # Degree symbol
|
||
(r'\\alpha', 'α'),
|
||
(r'\\beta', 'β'),
|
||
(r'\\gamma', 'γ'),
|
||
(r'\\delta', 'δ'),
|
||
(r'\\mu', 'μ'),
|
||
(r'\\Omega', 'Ω'),
|
||
(r'\\infty', '∞'),
|
||
(r'\^\\{2\\}', '²'), # Superscript 2
|
||
(r'\^\\{3\\}', '³'), # Superscript 3
|
||
(r'\^2', '²'),
|
||
(r'\^3', '³'),
|
||
(r'_\\{([^}]+)\\}', r'_\1'), # Subscript
|
||
(r'\\mathrm\{([^}]+)\}', r'\1'), # Roman text
|
||
(r'\\mathsf\{([^}]+)\}', r'\1'), # Sans-serif text
|
||
(r'\\mathbf\{([^}]+)\}', r'\1'), # Bold text
|
||
(r'\\text\{([^}]+)\}', r'\1'), # Text mode
|
||
(r'\\left', ''),
|
||
(r'\\right', ''),
|
||
(r'\\[|]', '|'), # Pipe symbols
|
||
(r'\\ ', ' '), # Escaped space
|
||
(r'\\,', ' '), # Thin space
|
||
(r'\\;', ' '), # Medium space
|
||
(r'\\quad', ' '), # Quad space
|
||
(r'\\qquad', ' '), # Double quad space
|
||
]
|
||
|
||
for pattern, replacement in replacements:
|
||
text = re.sub(pattern, replacement, text)
|
||
|
||
# Clean up any remaining backslashes followed by letters (unknown commands)
|
||
text = re.sub(r'\\[a-zA-Z]+', '', text)
|
||
|
||
# Clean up multiple spaces
|
||
text = re.sub(r'\s+', ' ', text)
|
||
|
||
return text.strip()
|
||
|
||
def _extract_bbox_from_filename(self, filename: str) -> List[int]:
|
||
"""Extract bbox from filename if it contains coordinate information."""
|
||
import re
|
||
match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', filename)
|
||
if match:
|
||
return list(map(int, match.groups()))
|
||
return [0, 0, 0, 0]
|
||
|
||
def _save_image(self, img_path: str, output_dir: Path, element_id: str) -> Optional[str]:
|
||
"""Save image file to output directory and return relative path.
|
||
|
||
Args:
|
||
img_path: Path to image file or image data
|
||
output_dir: Base output directory for results
|
||
element_id: Unique identifier for the element
|
||
|
||
Returns:
|
||
Relative path to saved image, or None if save failed
|
||
"""
|
||
import shutil
|
||
import numpy as np
|
||
from PIL import Image
|
||
|
||
try:
|
||
# Create imgs subdirectory
|
||
img_dir = output_dir / "imgs"
|
||
img_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Determine output file path
|
||
dst_path = img_dir / f"{element_id}.png"
|
||
relative_path = f"imgs/{element_id}.png"
|
||
|
||
# Handle different input types
|
||
if isinstance(img_path, str):
|
||
src_path = Path(img_path)
|
||
if src_path.exists() and src_path.is_file():
|
||
# Copy existing file
|
||
shutil.copy2(src_path, dst_path)
|
||
logger.info(f"Copied image from {src_path} to {dst_path}")
|
||
else:
|
||
logger.warning(f"Image file not found: {img_path}")
|
||
return None
|
||
elif isinstance(img_path, np.ndarray):
|
||
# Save numpy array as image
|
||
Image.fromarray(img_path).save(dst_path)
|
||
logger.info(f"Saved numpy array image to {dst_path}")
|
||
else:
|
||
logger.warning(f"Unknown image type: {type(img_path)}")
|
||
return None
|
||
|
||
# Return relative path for reference
|
||
return relative_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to save image for element {element_id}: {e}")
|
||
return None
|
||
|
||
def _save_pil_image(self, img_obj, output_dir: Path, element_id: str):
|
||
"""Save PIL image object to output directory."""
|
||
try:
|
||
img_dir = output_dir / "imgs"
|
||
img_dir.mkdir(parents=True, exist_ok=True)
|
||
img_path = img_dir / f"{element_id}.png"
|
||
img_obj.save(str(img_path))
|
||
logger.info(f"Saved image to {img_path}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to save PIL image: {e}")
|
||
|
||
def _crop_and_save_image(
|
||
self,
|
||
source_image_path: Path,
|
||
bbox: List[float],
|
||
output_dir: Path,
|
||
element_id: str
|
||
) -> Optional[str]:
|
||
"""
|
||
Crop image region from source image and save to output directory.
|
||
|
||
Args:
|
||
source_image_path: Path to the source image
|
||
bbox: Bounding box [x1, y1, x2, y2]
|
||
output_dir: Output directory for saving cropped image
|
||
element_id: Element ID for naming
|
||
|
||
Returns:
|
||
Relative filename (not full path) to saved image, consistent with
|
||
Direct Track which stores "filename.png" that gets joined with
|
||
result_dir by pdf_generator_service.
|
||
"""
|
||
try:
|
||
from PIL import Image
|
||
|
||
# Open source image
|
||
with Image.open(source_image_path) as img:
|
||
# Ensure bbox values are integers
|
||
x1, y1, x2, y2 = [int(v) for v in bbox[:4]]
|
||
|
||
# Validate bbox
|
||
img_width, img_height = img.size
|
||
x1 = max(0, min(x1, img_width))
|
||
x2 = max(0, min(x2, img_width))
|
||
y1 = max(0, min(y1, img_height))
|
||
y2 = max(0, min(y2, img_height))
|
||
|
||
if x2 <= x1 or y2 <= y1:
|
||
logger.warning(f"Invalid bbox for cropping: {bbox}")
|
||
return None
|
||
|
||
# Crop the region
|
||
cropped = img.crop((x1, y1, x2, y2))
|
||
|
||
# Save directly to output directory (no subdirectory)
|
||
# Consistent with Direct Track which saves to output_dir directly
|
||
image_filename = f"{element_id}.png"
|
||
img_path = output_dir / image_filename
|
||
cropped.save(str(img_path), "PNG")
|
||
|
||
# Return just the filename (relative to result_dir)
|
||
# PDF generator will join with result_dir to get full path
|
||
logger.info(f"Cropped image saved: {img_path} ({x2-x1}x{y2-y1} pixels)")
|
||
return image_filename
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to crop and save image for {element_id}: {e}")
|
||
return None |