Files
OCR/backend/app/services/pp_structure_enhanced.py
egg 95ae1f1bdb feat: add table detection options and scan artifact removal
- Add TableDetectionSelector component for wired/wireless/region detection
- Add CV-based table line detector module (disabled due to poor performance)
- Add scan artifact removal preprocessing step (removes faint horizontal lines)
- Add PreprocessingConfig schema with remove_scan_artifacts option
- Update frontend PreprocessingSettings with scan artifact toggle
- Integrate table detection config into ProcessingPage
- Archive extract-table-cell-boxes proposal

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-30 13:21:50 +08:00

1051 lines
47 KiB
Python

"""
Enhanced PP-StructureV3 processing with full element extraction
This module provides enhanced PP-StructureV3 processing that extracts all
23 element types with their bbox coordinates and reading order.
"""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, TYPE_CHECKING
import json
import gc
# Import ScalingInfo for type checking (avoid circular imports at runtime)
if TYPE_CHECKING:
from app.services.layout_preprocessing_service import ScalingInfo
# Optional torch import for additional GPU memory management
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
import paddle
from paddleocr import PPStructureV3
from PIL import Image
import numpy as np
import cv2
from app.models.unified_document import ElementType
from app.core.config import settings
from app.services.memory_manager import prediction_context
from app.services.cv_table_detector import CVTableDetector
logger = logging.getLogger(__name__)
class PPStructureEnhanced:
"""
Enhanced PP-StructureV3 processor that extracts all available element types
and structure information from parsing_res_list.
"""
# Mapping from PP-StructureV3 types to our ElementType
ELEMENT_TYPE_MAPPING = {
'title': ElementType.TITLE,
'paragraph_title': ElementType.TITLE, # PP-StructureV3 block_label
'text': ElementType.TEXT,
'paragraph': ElementType.PARAGRAPH,
'figure': ElementType.FIGURE,
'figure_caption': ElementType.CAPTION,
'table': ElementType.TABLE,
'table_caption': ElementType.TABLE_CAPTION,
'header': ElementType.HEADER,
'footer': ElementType.FOOTER,
'reference': ElementType.REFERENCE,
'equation': ElementType.EQUATION,
'formula': ElementType.FORMULA,
'list-item': ElementType.LIST_ITEM,
'list': ElementType.LIST,
'code': ElementType.CODE,
'footnote': ElementType.FOOTNOTE,
'page-number': ElementType.PAGE_NUMBER,
'watermark': ElementType.WATERMARK,
'signature': ElementType.SIGNATURE,
'stamp': ElementType.STAMP,
'seal': ElementType.STAMP, # PP-StructureV3 may use 'seal' label
'logo': ElementType.LOGO,
'barcode': ElementType.BARCODE,
'qr-code': ElementType.QR_CODE,
# Default fallback
'image': ElementType.IMAGE,
'chart': ElementType.CHART,
'diagram': ElementType.DIAGRAM,
}
def __init__(self, structure_engine: PPStructureV3):
"""
Initialize with existing PP-StructureV3 engine.
Args:
structure_engine: Initialized PPStructureV3 instance
"""
self.structure_engine = structure_engine
def analyze_with_full_structure(
self,
image_path: Path,
output_dir: Optional[Path] = None,
current_page: int = 0,
preprocessed_image: Optional[Image.Image] = None,
scaling_info: Optional['ScalingInfo'] = None,
save_visualization: bool = False,
use_cv_table_detection: bool = False
) -> Dict[str, Any]:
"""
Analyze document with full PP-StructureV3 capabilities.
Args:
image_path: Path to original image file (used for cropping extracted images)
output_dir: Optional output directory for saving extracted content
current_page: Current page number (0-based)
preprocessed_image: Optional preprocessed PIL Image for layout detection.
If provided, this is used for PP-Structure prediction,
but original image_path is still used for cropping images.
scaling_info: Optional ScalingInfo from preprocessing. If image was scaled
for layout detection, all bbox coordinates will be scaled back
to original image coordinates for proper cropping.
save_visualization: If True, save detection visualization images
(layout_det_res, layout_order_res, overall_ocr_res, etc.)
use_cv_table_detection: If True, use CV-based line detection for wired tables
instead of ML-based cell detection (RT-DETR-L)
Returns:
Dictionary with complete structure information including:
- elements: List of all detected elements with types and bbox (in original coords)
- reading_order: Reading order indices
- images: Extracted images with metadata
- tables: Extracted tables with structure
- visualization_dir: Path to visualization images (if save_visualization=True)
"""
try:
logger.info(f"Enhanced PP-StructureV3 analysis on {image_path.name}")
if preprocessed_image:
logger.info("Using preprocessed image for layout detection")
# Perform structure analysis with semaphore control
# This prevents OOM errors from multiple simultaneous predictions
with prediction_context(timeout=settings.service_acquire_timeout_seconds) as acquired:
if not acquired:
logger.error("Failed to acquire prediction slot (timeout), returning empty result")
return {
'has_parsing_res_list': False,
'elements': [],
'total_elements': 0,
'images': [],
'tables': [],
'element_types': {},
'error': 'Prediction slot timeout'
}
# Use preprocessed image if provided, otherwise use original path
if preprocessed_image is not None:
# Convert PIL to numpy array (BGR format for PP-Structure)
predict_input = np.array(preprocessed_image)
if len(predict_input.shape) == 3 and predict_input.shape[2] == 3:
# Convert RGB to BGR
predict_input = predict_input[:, :, ::-1]
results = self.structure_engine.predict(predict_input)
else:
results = self.structure_engine.predict(str(image_path))
all_elements = []
all_images = []
all_tables = []
visualization_dir = None
# Process each page result
for page_idx, page_result in enumerate(results):
# Save visualization images if requested
if save_visualization and output_dir and hasattr(page_result, 'save_to_img'):
try:
vis_dir = output_dir / 'visualization'
vis_dir.mkdir(parents=True, exist_ok=True)
page_result.save_to_img(str(vis_dir))
visualization_dir = vis_dir
logger.info(f"Saved visualization images to {vis_dir}")
except Exception as e:
logger.warning(f"Failed to save visualization images: {e}")
# Try to access parsing_res_list and table_res_list (the complete structure)
parsing_res_list = None
table_res_list = None
result_dict = None
# Method 1: Direct access to json attribute (check both top-level and res)
if hasattr(page_result, 'json'):
result_json = page_result.json
if isinstance(result_json, dict):
result_dict = result_json
# Check top-level
if 'parsing_res_list' in result_json:
parsing_res_list = result_json['parsing_res_list']
logger.info(f"Found parsing_res_list at top level with {len(parsing_res_list)} elements")
# Check inside 'res' (new structure in paddlex)
elif 'res' in result_json and isinstance(result_json['res'], dict):
result_dict = result_json['res']
if 'parsing_res_list' in result_json['res']:
parsing_res_list = result_json['res']['parsing_res_list']
logger.info(f"Found parsing_res_list inside 'res' with {len(parsing_res_list)} elements")
# Method 2: Try direct dict access (LayoutParsingResultV2 inherits from dict)
elif isinstance(page_result, dict):
result_dict = page_result
if 'parsing_res_list' in page_result:
parsing_res_list = page_result['parsing_res_list']
logger.info(f"Found parsing_res_list via dict access with {len(parsing_res_list)} elements")
elif 'res' in page_result and isinstance(page_result['res'], dict):
result_dict = page_result['res']
if 'parsing_res_list' in page_result['res']:
parsing_res_list = page_result['res']['parsing_res_list']
logger.info(f"Found parsing_res_list inside page_result['res'] with {len(parsing_res_list)} elements")
# Method 3: Try to access as attribute
elif hasattr(page_result, 'parsing_res_list'):
parsing_res_list = page_result.parsing_res_list
logger.info(f"Found parsing_res_list attribute with {len(parsing_res_list)} elements")
if hasattr(page_result, '__dict__'):
result_dict = page_result.__dict__
# Method 4: Check if result has to_dict method
elif hasattr(page_result, 'to_dict'):
result_dict = page_result.to_dict()
if 'parsing_res_list' in result_dict:
parsing_res_list = result_dict['parsing_res_list']
logger.info(f"Found parsing_res_list in to_dict with {len(parsing_res_list)} elements")
elif 'res' in result_dict and isinstance(result_dict['res'], dict):
result_dict = result_dict['res']
if 'parsing_res_list' in result_dict:
parsing_res_list = result_dict['parsing_res_list']
logger.info(f"Found parsing_res_list in to_dict['res'] with {len(parsing_res_list)} elements")
# Extract table_res_list which contains cell_box_list
layout_det_res = None
if result_dict:
if 'table_res_list' in result_dict:
table_res_list = result_dict['table_res_list']
logger.info(f"Found table_res_list with {len(table_res_list)} tables")
for i, tbl in enumerate(table_res_list):
if 'cell_box_list' in tbl:
logger.info(f" Table {i}: {len(tbl['cell_box_list'])} cell boxes")
# Extract layout_det_res for Image-in-Table processing
if 'layout_det_res' in result_dict:
layout_det_res = result_dict['layout_det_res']
logger.info(f"Found layout_det_res with {len(layout_det_res.get('boxes', []))} boxes")
# Process parsing_res_list if found
if parsing_res_list:
elements = self._process_parsing_res_list(
parsing_res_list, current_page, output_dir, image_path, scaling_info,
table_res_list=table_res_list, # Pass table_res_list for cell_box_list
layout_det_res=layout_det_res, # Pass layout_det_res for Image-in-Table
use_cv_table_detection=use_cv_table_detection # Use CV for wired tables
)
all_elements.extend(elements)
# Extract tables and images from elements
table_bboxes = [] # Collect table bboxes for standalone image filtering
for elem in elements:
if elem['type'] == ElementType.TABLE:
all_tables.append(elem)
table_bboxes.append(elem.get('bbox', [0, 0, 0, 0]))
elif elem['type'] in [ElementType.IMAGE, ElementType.FIGURE]:
all_images.append(elem)
# Extract standalone images from layout_det_res (images NOT inside tables)
if layout_det_res and image_path and output_dir:
standalone_images = self._extract_standalone_images(
layout_det_res, table_bboxes, image_path, output_dir,
current_page, len(elements), scaling_info
)
if standalone_images:
all_elements.extend(standalone_images)
all_images.extend(standalone_images)
logger.info(f"Extracted {len(standalone_images)} standalone images from layout_det_res")
else:
# Fallback to markdown if parsing_res_list not available
logger.warning("parsing_res_list not found, falling back to markdown")
elements = self._process_markdown_fallback(
page_result, current_page, output_dir
)
all_elements.extend(elements)
# Create reading order based on element positions
reading_order = self._determine_reading_order(all_elements)
result = {
'elements': all_elements,
'total_elements': len(all_elements),
'reading_order': reading_order,
'tables': all_tables,
'images': all_images,
'element_types': self._count_element_types(all_elements),
'has_parsing_res_list': parsing_res_list is not None
}
# Add visualization directory if available
if visualization_dir:
result['visualization_dir'] = str(visualization_dir)
return result
except Exception as e:
logger.error(f"Enhanced PP-StructureV3 analysis error: {e}")
import traceback
traceback.print_exc()
# Clean up GPU memory on error
try:
if TORCH_AVAILABLE and torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
if paddle.device.is_compiled_with_cuda():
paddle.device.cuda.empty_cache()
gc.collect()
except:
pass # Ignore cleanup errors
return {
'elements': [],
'total_elements': 0,
'reading_order': [],
'tables': [],
'images': [],
'element_types': {},
'has_parsing_res_list': False,
'error': str(e)
}
def _process_parsing_res_list(
self,
parsing_res_list: List[Dict],
current_page: int,
output_dir: Optional[Path],
source_image_path: Optional[Path] = None,
scaling_info: Optional['ScalingInfo'] = None,
table_res_list: Optional[List[Dict]] = None,
layout_det_res: Optional[Dict] = None,
use_cv_table_detection: bool = False
) -> List[Dict[str, Any]]:
"""
Process parsing_res_list to extract all elements.
Args:
parsing_res_list: List of parsed elements from PP-StructureV3
scaling_info: Scaling information for bbox coordinate restoration
current_page: Current page number
output_dir: Optional output directory
source_image_path: Path to source image for cropping image regions
table_res_list: Optional list of table results containing cell_box_list
layout_det_res: Optional layout detection result for Image-in-Table processing
use_cv_table_detection: If True, use CV line detection for wired tables
Returns:
List of processed elements with normalized structure
"""
elements = []
for idx, item in enumerate(parsing_res_list):
# Debug: log the structure of the first item
if idx == 0:
logger.info(f"First parsing_res_list item structure: {list(item.keys()) if isinstance(item, dict) else type(item)}")
logger.info(f"First parsing_res_list item sample: {str(item)[:500]}")
# Extract element type (check both 'type' and 'block_label')
element_type = item.get('type', '') or item.get('block_label', 'text')
element_type = element_type.lower()
mapped_type = self.ELEMENT_TYPE_MAPPING.get(
element_type, ElementType.TEXT
)
# Extract bbox (check multiple possible keys)
layout_bbox = (
item.get('layout_bbox', []) or
item.get('block_bbox', []) or
item.get('bbox', [])
)
# Ensure bbox has 4 values
if len(layout_bbox) >= 4:
bbox = list(layout_bbox[:4]) # [x1, y1, x2, y2]
else:
bbox = [0, 0, 0, 0] # Default if bbox missing
logger.warning(f"Element {idx} has invalid bbox: {layout_bbox}")
# Scale bbox back to original image coordinates if image was scaled
# This is critical for proper cropping from original high-resolution image
if scaling_info and scaling_info.was_scaled and bbox != [0, 0, 0, 0]:
scale_factor = scaling_info.scale_factor
bbox = [
bbox[0] * scale_factor, # x1
bbox[1] * scale_factor, # y1
bbox[2] * scale_factor, # x2
bbox[3] * scale_factor # y2
]
if idx == 0: # Log only for first element to avoid spam
logger.info(
f"Scaled bbox to original coords: "
f"{[round(x, 1) for x in layout_bbox[:4]]} -> {[round(x, 1) for x in bbox]} "
f"(factor={scale_factor:.3f})"
)
# Extract content (check multiple possible keys)
content = (
item.get('content', '') or
item.get('block_content', '') or
''
)
# Additional fallback for content in 'res' field
if not content and 'res' in item:
res = item.get('res', {})
if isinstance(res, dict):
content = res.get('content', '') or res.get('text', '')
elif isinstance(res, str):
content = res
# Content-based HTML table detection: PP-StructureV3 sometimes
# classifies tables as 'text' but returns HTML table content
html_table_content = None
if content and '<table' in content.lower():
if mapped_type == ElementType.TEXT or element_type == 'text':
logger.info(f"Element {idx}: Detected HTML table content in 'text' type, reclassifying to TABLE")
mapped_type = ElementType.TABLE
html_table_content = content # Store for later use
# Create element
element = {
'element_id': f"pp3_{current_page}_{idx}",
'type': mapped_type,
'original_type': element_type,
'content': content,
'page': current_page,
'bbox': bbox, # [x1, y1, x2, y2]
'index': idx, # Original index in reading order
'confidence': item.get('score', 1.0)
}
# Special handling for tables
if mapped_type == ElementType.TABLE:
# 1. 提取 HTML (原有邏輯)
html_content = html_table_content
res_data = {}
# 獲取 res 字典 (包含 html 和 boxes)
if 'res' in item and isinstance(item['res'], dict):
res_data = item['res']
logger.info(f"[TABLE] Found 'res' dict with keys: {list(res_data.keys())}")
if not html_content:
html_content = res_data.get('html', '')
else:
logger.info(f"[TABLE] No 'res' key in item. Available keys: {list(item.keys())}")
if html_content:
element['html'] = html_content
element['extracted_text'] = self._extract_text_from_html(html_content)
# 2. 提取 Cell 座標 (boxes)
# 優先順序: table_res_list > res_data['boxes'] > SLANeXt 補充
cell_boxes_extracted = False
# First, try to get cell_box_list from table_res_list (pp_demo style)
if table_res_list and not cell_boxes_extracted:
# Match table by HTML content or find closest bbox
for tbl_res in table_res_list:
if 'cell_box_list' in tbl_res and tbl_res['cell_box_list']:
# Check if HTML matches
tbl_html = tbl_res.get('pred_html', '')
if html_content and tbl_html:
# Simple check: if both have same structure
if tbl_html[:100] == html_content[:100]:
cell_boxes = tbl_res['cell_box_list']
# cell_box_list is already in absolute coordinates
element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes]
element['cell_boxes_source'] = 'table_res_list'
cell_boxes_extracted = True
logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes from table_res_list (HTML match)")
break
# If no HTML match, use first available table_res with cell_box_list
if not cell_boxes_extracted:
for tbl_res in table_res_list:
if 'cell_box_list' in tbl_res and tbl_res['cell_box_list']:
cell_boxes = tbl_res['cell_box_list']
element['cell_boxes'] = [[float(c) for c in box] for box in cell_boxes]
element['cell_boxes_source'] = 'table_res_list'
cell_boxes_extracted = True
logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes from table_res_list (first available)")
# Remove used table_res to avoid reuse
table_res_list.remove(tbl_res)
break
if not cell_boxes_extracted and 'boxes' in res_data:
# PPStructureV3 returned cell boxes in res (unlikely in PaddleX 3.x)
cell_boxes = res_data['boxes']
logger.info(f"[TABLE] Found {len(cell_boxes)} cell boxes in res_data")
# 獲取表格自身的偏移量 (用於將 Cell 的相對座標轉為絕對座標)
table_x, table_y = 0, 0
if len(bbox) >= 2: # bbox is [x1, y1, x2, y2]
table_x, table_y = bbox[0], bbox[1]
processed_cells = []
for cell_box in cell_boxes:
# 確保格式正確
if isinstance(cell_box, (list, tuple)) and len(cell_box) >= 4:
# 轉換為絕對座標: Cell x + 表格 x
abs_cell_box = [
cell_box[0] + table_x,
cell_box[1] + table_y,
cell_box[2] + table_x,
cell_box[3] + table_y
]
processed_cells.append(abs_cell_box)
# 將處理後的 Cell 座標存入 element
element['cell_boxes'] = processed_cells
element['raw_cell_boxes'] = cell_boxes
element['cell_boxes_source'] = 'ppstructure'
logger.info(f"[TABLE] Processed {len(processed_cells)} cell boxes with table offset ({table_x}, {table_y})")
cell_boxes_extracted = True
if not cell_boxes_extracted:
logger.info(f"[TABLE] No cell boxes available. PPStructureV3 keys: {list(res_data.keys()) if res_data else 'empty'}")
# 2.5 CV-based table line detection for wired tables
if use_cv_table_detection and source_image_path and source_image_path.exists():
try:
# Load image for CV processing
cv_image = cv2.imread(str(source_image_path))
if cv_image is not None:
cv_detector = CVTableDetector()
ml_cell_boxes = element.get('cell_boxes', [])
# Detect cells using CV line detection
cv_cells = cv_detector.detect_and_merge_with_ml(
cv_image,
bbox, # Table bbox
ml_cell_boxes
)
if cv_cells:
# Apply scaling if needed
if scaling_info and scaling_info.was_scaled:
cv_cells = [
[
c[0] * scaling_info.scale_x,
c[1] * scaling_info.scale_y,
c[2] * scaling_info.scale_x,
c[3] * scaling_info.scale_y
]
for c in cv_cells
]
element['cell_boxes'] = cv_cells
element['cell_boxes_source'] = 'cv_line_detection'
logger.info(f"[TABLE] CV line detection found {len(cv_cells)} cells (ML had {len(ml_cell_boxes)})")
except Exception as cv_error:
logger.warning(f"[TABLE] CV line detection failed: {cv_error}")
# 3. Image-in-Table 處理:檢測並嵌入表格內的圖片
if layout_det_res and source_image_path and output_dir:
embedded_images = self._embed_images_in_table(
element, bbox, layout_det_res, source_image_path, output_dir
)
if embedded_images:
element['embedded_images'] = embedded_images
logger.info(f"[TABLE] Embedded {len(embedded_images)} images into table")
# Special handling for images/figures/stamps (visual elements that need cropping)
elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.STAMP, ElementType.LOGO]:
# Save image if path provided
if 'img_path' in item and output_dir:
saved_path = self._save_image(item['img_path'], output_dir, element['element_id'])
if saved_path:
element['saved_path'] = saved_path
element['img_path'] = item['img_path'] # Keep original for reference
else:
logger.warning(f"Failed to save image for element {element['element_id']}")
# Crop image from source if no img_path but source image is available
elif source_image_path and output_dir and bbox != [0, 0, 0, 0]:
cropped_path = self._crop_and_save_image(
source_image_path, bbox, output_dir, element['element_id']
)
if cropped_path:
element['saved_path'] = cropped_path
element['img_path'] = cropped_path
logger.info(f"Cropped and saved image region for {element['element_id']}")
else:
logger.warning(f"Failed to crop image for element {element['element_id']}")
# Add any additional metadata
if 'metadata' in item:
element['metadata'] = item['metadata']
elements.append(element)
logger.debug(f"Processed element {idx}: type={mapped_type}, bbox={bbox}")
return elements
def _embed_images_in_table(
self,
table_element: Dict[str, Any],
table_bbox: List[float],
layout_det_res: Dict,
source_image_path: Path,
output_dir: Path
) -> List[Dict[str, Any]]:
"""
Detect and embed images that are inside a table region.
This handles the case where layout detection finds an image inside a table,
similar to how pp_demo embeds images in table HTML.
Args:
table_element: The table element being processed
table_bbox: Table bounding box [x1, y1, x2, y2]
layout_det_res: Layout detection result containing all detected boxes
source_image_path: Path to source image for cropping
output_dir: Output directory for saving cropped images
Returns:
List of embedded image info dicts with 'bbox', 'saved_path', 'html_tag'
"""
embedded_images = []
try:
boxes = layout_det_res.get('boxes', [])
table_x1, table_y1, table_x2, table_y2 = table_bbox
for box in boxes:
label = box.get('label', '').lower()
if label != 'image':
continue
# Get image bbox
img_coord = box.get('coordinate', [])
if len(img_coord) < 4:
continue
img_x1, img_y1, img_x2, img_y2 = img_coord[:4]
# Check if image is inside table (with some tolerance)
tolerance = 5 # pixels
if (img_x1 >= table_x1 - tolerance and
img_y1 >= table_y1 - tolerance and
img_x2 <= table_x2 + tolerance and
img_y2 <= table_y2 + tolerance):
logger.info(f"[IMAGE-IN-TABLE] Found image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}] inside table")
# Crop and save the image
img_element_id = f"img_in_table_{int(img_x1)}_{int(img_y1)}_{int(img_x2)}_{int(img_y2)}"
cropped_path = self._crop_and_save_image(
source_image_path,
[img_x1, img_y1, img_x2, img_y2],
output_dir,
img_element_id
)
if cropped_path:
# Create relative path for HTML embedding
rel_path = f"imgs/{Path(cropped_path).name}"
# Create img tag similar to pp_demo
img_html = f'<div style="text-align: center;"><img src="{rel_path}" alt="Image" /></div>'
embedded_image = {
'bbox': [img_x1, img_y1, img_x2, img_y2],
'saved_path': str(cropped_path),
'relative_path': rel_path,
'html_tag': img_html,
'element_id': img_element_id
}
embedded_images.append(embedded_image)
# Try to insert image into HTML content
if 'html' in table_element and table_element['html']:
# Insert image reference at the end of HTML before </table>
original_html = table_element['html']
if '</tbody>' in original_html:
# Insert before </tbody> in a new row
new_html = original_html.replace(
'</tbody>',
f'<tr><td colspan="99" style="text-align:center;"><img src="{rel_path}" alt="Embedded Image" /></td></tr></tbody>'
)
table_element['html'] = new_html
logger.info(f"[IMAGE-IN-TABLE] Embedded image into table HTML")
except Exception as e:
logger.error(f"[IMAGE-IN-TABLE] Error processing images in table: {e}")
return embedded_images
def _extract_standalone_images(
self,
layout_det_res: Dict,
table_bboxes: List[List[float]],
source_image_path: Path,
output_dir: Path,
current_page: int,
start_index: int,
scaling_info: Optional['ScalingInfo'] = None
) -> List[Dict[str, Any]]:
"""
Extract standalone images from layout_det_res that are NOT inside tables.
This handles images that PP-StructureV3 detects in layout_det_res but
doesn't include in parsing_res_list (non-table images).
Args:
layout_det_res: Layout detection result containing all detected boxes
table_bboxes: List of table bounding boxes to exclude images inside tables
source_image_path: Path to source image for cropping
output_dir: Output directory for saving cropped images
current_page: Current page number
start_index: Starting index for element IDs
scaling_info: Optional scaling info for coordinate restoration
Returns:
List of standalone image elements
"""
standalone_images = []
try:
boxes = layout_det_res.get('boxes', [])
logger.info(f"[STANDALONE-IMAGE] Checking {len(boxes)} boxes for standalone images")
for box_idx, box in enumerate(boxes):
label = box.get('label', '').lower()
if label != 'image':
continue
# Get image bbox
img_coord = box.get('coordinate', [])
if len(img_coord) < 4:
continue
img_x1, img_y1, img_x2, img_y2 = img_coord[:4]
# Check if image is inside any table (skip if so)
is_inside_table = False
for table_bbox in table_bboxes:
if len(table_bbox) < 4:
continue
tx1, ty1, tx2, ty2 = table_bbox[:4]
tolerance = 5 # pixels
if (img_x1 >= tx1 - tolerance and
img_y1 >= ty1 - tolerance and
img_x2 <= tx2 + tolerance and
img_y2 <= ty2 + tolerance):
is_inside_table = True
logger.debug(f"[STANDALONE-IMAGE] Image at [{int(img_x1)},{int(img_y1)}] is inside table, skipping")
break
if is_inside_table:
continue
# Scale bbox back to original coordinates if needed
if scaling_info and scaling_info.was_scaled:
scale_factor = scaling_info.scale_factor
img_x1 *= scale_factor
img_y1 *= scale_factor
img_x2 *= scale_factor
img_y2 *= scale_factor
logger.debug(f"[STANDALONE-IMAGE] Scaled bbox by {scale_factor:.3f}")
logger.info(f"[STANDALONE-IMAGE] Found standalone image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}]")
# Crop and save the image
element_idx = start_index + len(standalone_images)
img_element_id = f"standalone_img_{current_page}_{element_idx}"
cropped_path = self._crop_and_save_image(
source_image_path,
[img_x1, img_y1, img_x2, img_y2],
output_dir,
img_element_id
)
if cropped_path:
element = {
'element_id': img_element_id,
'type': ElementType.IMAGE,
'original_type': 'image',
'content': '',
'page': current_page,
'bbox': [img_x1, img_y1, img_x2, img_y2],
'index': element_idx,
'confidence': box.get('score', 1.0),
'saved_path': cropped_path,
'img_path': cropped_path,
'source': 'layout_det_res'
}
standalone_images.append(element)
logger.info(f"[STANDALONE-IMAGE] Extracted and saved: {cropped_path}")
except Exception as e:
logger.error(f"[STANDALONE-IMAGE] Error extracting standalone images: {e}")
import traceback
traceback.print_exc()
return standalone_images
def _process_markdown_fallback(
self,
page_result: Any,
current_page: int,
output_dir: Optional[Path]
) -> List[Dict[str, Any]]:
"""
Fallback to markdown processing if parsing_res_list not available.
Args:
page_result: PP-StructureV3 page result
current_page: Current page number
output_dir: Optional output directory
Returns:
List of elements extracted from markdown
"""
elements = []
# Extract from markdown if available
if hasattr(page_result, 'markdown'):
markdown_dict = page_result.markdown
if isinstance(markdown_dict, dict):
# Extract markdown texts
markdown_texts = markdown_dict.get('markdown_texts', '')
if markdown_texts:
# Detect if it's a table
is_table = '<table' in markdown_texts.lower()
element = {
'element_id': f"md_{current_page}_0",
'type': ElementType.TABLE if is_table else ElementType.TEXT,
'content': markdown_texts,
'page': current_page,
'bbox': [0, 0, 0, 0], # No bbox in markdown
'index': 0,
'from_markdown': True
}
if is_table:
element['extracted_text'] = self._extract_text_from_html(markdown_texts)
elements.append(element)
# Process images
markdown_images = markdown_dict.get('markdown_images', {})
for img_idx, (img_path, img_obj) in enumerate(markdown_images.items()):
# Save image
if output_dir and hasattr(img_obj, 'save'):
self._save_pil_image(img_obj, output_dir, f"md_img_{current_page}_{img_idx}")
# Try to extract bbox from filename
bbox = self._extract_bbox_from_filename(img_path)
element = {
'element_id': f"md_img_{current_page}_{img_idx}",
'type': ElementType.IMAGE,
'content': img_path,
'page': current_page,
'bbox': bbox,
'index': img_idx + 1,
'from_markdown': True
}
elements.append(element)
return elements
def _determine_reading_order(self, elements: List[Dict]) -> List[int]:
"""
Determine reading order based on element positions.
Args:
elements: List of elements with bbox
Returns:
List of indices representing reading order
"""
if not elements:
return []
# If elements have original indices, use them
if all('index' in elem for elem in elements):
# Sort by original index
indexed_elements = [(i, elem['index']) for i, elem in enumerate(elements)]
indexed_elements.sort(key=lambda x: x[1])
return [i for i, _ in indexed_elements]
# Otherwise, sort by position (top to bottom, left to right)
indexed_elements = []
for i, elem in enumerate(elements):
bbox = elem.get('bbox', [0, 0, 0, 0])
if len(bbox) >= 2:
# Use top-left corner for sorting
indexed_elements.append((i, bbox[1], bbox[0])) # (index, y, x)
else:
indexed_elements.append((i, 0, 0))
# Sort by y first (top to bottom), then x (left to right)
indexed_elements.sort(key=lambda x: (x[1], x[2]))
return [i for i, _, _ in indexed_elements]
def _count_element_types(self, elements: List[Dict]) -> Dict[str, int]:
"""
Count occurrences of each element type.
Args:
elements: List of elements
Returns:
Dictionary with element type counts
"""
type_counts = {}
for elem in elements:
elem_type = elem.get('type', ElementType.TEXT)
type_counts[elem_type] = type_counts.get(elem_type, 0) + 1
return type_counts
def _extract_text_from_html(self, html: str) -> str:
"""Extract plain text from HTML content."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text(separator=' ', strip=True)
except:
# Fallback: just remove HTML tags
import re
text = re.sub(r'<[^>]+>', ' ', html)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _extract_bbox_from_filename(self, filename: str) -> List[int]:
"""Extract bbox from filename if it contains coordinate information."""
import re
match = re.search(r'box_(\d+)_(\d+)_(\d+)_(\d+)', filename)
if match:
return list(map(int, match.groups()))
return [0, 0, 0, 0]
def _save_image(self, img_path: str, output_dir: Path, element_id: str) -> Optional[str]:
"""Save image file to output directory and return relative path.
Args:
img_path: Path to image file or image data
output_dir: Base output directory for results
element_id: Unique identifier for the element
Returns:
Relative path to saved image, or None if save failed
"""
import shutil
import numpy as np
from PIL import Image
try:
# Create imgs subdirectory
img_dir = output_dir / "imgs"
img_dir.mkdir(parents=True, exist_ok=True)
# Determine output file path
dst_path = img_dir / f"{element_id}.png"
relative_path = f"imgs/{element_id}.png"
# Handle different input types
if isinstance(img_path, str):
src_path = Path(img_path)
if src_path.exists() and src_path.is_file():
# Copy existing file
shutil.copy2(src_path, dst_path)
logger.info(f"Copied image from {src_path} to {dst_path}")
else:
logger.warning(f"Image file not found: {img_path}")
return None
elif isinstance(img_path, np.ndarray):
# Save numpy array as image
Image.fromarray(img_path).save(dst_path)
logger.info(f"Saved numpy array image to {dst_path}")
else:
logger.warning(f"Unknown image type: {type(img_path)}")
return None
# Return relative path for reference
return relative_path
except Exception as e:
logger.error(f"Failed to save image for element {element_id}: {e}")
return None
def _save_pil_image(self, img_obj, output_dir: Path, element_id: str):
"""Save PIL image object to output directory."""
try:
img_dir = output_dir / "imgs"
img_dir.mkdir(parents=True, exist_ok=True)
img_path = img_dir / f"{element_id}.png"
img_obj.save(str(img_path))
logger.info(f"Saved image to {img_path}")
except Exception as e:
logger.warning(f"Failed to save PIL image: {e}")
def _crop_and_save_image(
self,
source_image_path: Path,
bbox: List[float],
output_dir: Path,
element_id: str
) -> Optional[str]:
"""
Crop image region from source image and save to output directory.
Args:
source_image_path: Path to the source image
bbox: Bounding box [x1, y1, x2, y2]
output_dir: Output directory for saving cropped image
element_id: Element ID for naming
Returns:
Relative filename (not full path) to saved image, consistent with
Direct Track which stores "filename.png" that gets joined with
result_dir by pdf_generator_service.
"""
try:
from PIL import Image
# Open source image
with Image.open(source_image_path) as img:
# Ensure bbox values are integers
x1, y1, x2, y2 = [int(v) for v in bbox[:4]]
# Validate bbox
img_width, img_height = img.size
x1 = max(0, min(x1, img_width))
x2 = max(0, min(x2, img_width))
y1 = max(0, min(y1, img_height))
y2 = max(0, min(y2, img_height))
if x2 <= x1 or y2 <= y1:
logger.warning(f"Invalid bbox for cropping: {bbox}")
return None
# Crop the region
cropped = img.crop((x1, y1, x2, y2))
# Save directly to output directory (no subdirectory)
# Consistent with Direct Track which saves to output_dir directly
image_filename = f"{element_id}.png"
img_path = output_dir / image_filename
cropped.save(str(img_path), "PNG")
# Return just the filename (relative to result_dir)
# PDF generator will join with result_dir to get full path
logger.info(f"Cropped image saved: {img_path} ({x2-x1}x{y2-y1} pixels)")
return image_filename
except Exception as e:
logger.error(f"Failed to crop and save image for {element_id}: {e}")
return None