feat: add table detection options and scan artifact removal

- Add TableDetectionSelector component for wired/wireless/region detection
- Add CV-based table line detector module (disabled due to poor performance)
- Add scan artifact removal preprocessing step (removes faint horizontal lines)
- Add PreprocessingConfig schema with remove_scan_artifacts option
- Update frontend PreprocessingSettings with scan artifact toggle
- Integrate table detection config into ProcessingPage
- Archive extract-table-cell-boxes proposal

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
egg
2025-11-30 13:21:50 +08:00
parent f5a2c8a750
commit 95ae1f1bdb
17 changed files with 1906 additions and 344 deletions

View File

@@ -0,0 +1,362 @@
"""
CV-based Table Line Detection Module
Uses OpenCV morphological operations to detect table lines and extract cell boundaries.
This is more reliable for wired/bordered tables than ML-based cell detection.
"""
import cv2
import numpy as np
from typing import List, Tuple, Optional
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class CVTableDetector:
"""
Detects table cell boundaries using computer vision techniques.
Works by detecting horizontal and vertical lines in the image.
"""
def __init__(
self,
min_line_length: int = 30,
line_thickness: int = 2,
min_cell_width: int = 20,
min_cell_height: int = 15
):
"""
Initialize the CV table detector.
Args:
min_line_length: Minimum length of lines to detect (in pixels)
line_thickness: Expected thickness of table lines
min_cell_width: Minimum width of a valid cell
min_cell_height: Minimum height of a valid cell
"""
self.min_line_length = min_line_length
self.line_thickness = line_thickness
self.min_cell_width = min_cell_width
self.min_cell_height = min_cell_height
def detect_cells(
self,
image: np.ndarray,
table_bbox: Optional[List[float]] = None
) -> List[List[float]]:
"""
Detect cell boundaries in a table image.
Args:
image: Input image (BGR format)
table_bbox: Optional [x1, y1, x2, y2] to crop table region first
Returns:
List of cell bounding boxes [[x1, y1, x2, y2], ...]
"""
# Crop to table region if bbox provided
offset_x, offset_y = 0, 0
if table_bbox:
x1, y1, x2, y2 = [int(v) for v in table_bbox]
offset_x, offset_y = x1, y1
image = image[y1:y2, x1:x2]
if image.size == 0:
logger.warning("Empty image after cropping")
return []
# Convert to grayscale
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
# Detect lines
horizontal_lines, vertical_lines = self._detect_lines(gray)
if horizontal_lines is None or vertical_lines is None:
logger.warning("Failed to detect table lines")
return []
# Find intersections to build grid
cells = self._build_cell_grid(horizontal_lines, vertical_lines, gray.shape)
# Convert to absolute coordinates
absolute_cells = []
for cell in cells:
abs_cell = [
cell[0] + offset_x,
cell[1] + offset_y,
cell[2] + offset_x,
cell[3] + offset_y
]
absolute_cells.append(abs_cell)
logger.info(f"[CV] Detected {len(absolute_cells)} cells from table lines")
return absolute_cells
def _detect_lines(
self,
gray: np.ndarray
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
"""
Detect horizontal and vertical lines using morphological operations.
Args:
gray: Grayscale image
Returns:
Tuple of (horizontal_lines_mask, vertical_lines_mask)
"""
# Adaptive threshold for better line detection
binary = cv2.adaptiveThreshold(
gray, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
11, 2
)
# Detect horizontal lines
h_kernel_length = max(self.min_line_length, gray.shape[1] // 30)
horizontal_kernel = cv2.getStructuringElement(
cv2.MORPH_RECT, (h_kernel_length, 1)
)
horizontal_lines = cv2.morphologyEx(
binary, cv2.MORPH_OPEN, horizontal_kernel, iterations=2
)
# Detect vertical lines
v_kernel_length = max(self.min_line_length, gray.shape[0] // 30)
vertical_kernel = cv2.getStructuringElement(
cv2.MORPH_RECT, (1, v_kernel_length)
)
vertical_lines = cv2.morphologyEx(
binary, cv2.MORPH_OPEN, vertical_kernel, iterations=2
)
return horizontal_lines, vertical_lines
def _build_cell_grid(
self,
horizontal_mask: np.ndarray,
vertical_mask: np.ndarray,
image_shape: Tuple[int, int]
) -> List[List[float]]:
"""
Build cell grid from detected line masks.
Args:
horizontal_mask: Binary mask of horizontal lines
vertical_mask: Binary mask of vertical lines
image_shape: (height, width) of the image
Returns:
List of cell bounding boxes
"""
height, width = image_shape[:2]
# Combine masks to find table structure
table_mask = cv2.add(horizontal_mask, vertical_mask)
# Find contours (cells are enclosed regions)
contours, hierarchy = cv2.findContours(
table_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE
)
# Method 1: Use contours to find cells
cells_from_contours = self._cells_from_contours(contours, hierarchy)
# Method 2: Use line intersections to build grid
cells_from_grid = self._cells_from_line_intersections(
horizontal_mask, vertical_mask, height, width
)
# Use whichever method found more valid cells
if len(cells_from_grid) >= len(cells_from_contours):
return cells_from_grid
return cells_from_contours
def _cells_from_contours(
self,
contours,
hierarchy
) -> List[List[float]]:
"""Extract cell bounding boxes from contours."""
cells = []
for i, contour in enumerate(contours):
x, y, w, h = cv2.boundingRect(contour)
# Filter by minimum size
if w >= self.min_cell_width and h >= self.min_cell_height:
# Check if this is an inner contour (cell) not the outer table
if hierarchy is not None and hierarchy[0][i][3] != -1:
cells.append([float(x), float(y), float(x + w), float(y + h)])
return cells
def _cells_from_line_intersections(
self,
horizontal_mask: np.ndarray,
vertical_mask: np.ndarray,
height: int,
width: int
) -> List[List[float]]:
"""Build cells from line intersections (grid-based approach)."""
# Find horizontal line y-coordinates
h_projection = np.sum(horizontal_mask, axis=1)
h_lines = self._find_line_positions(h_projection, min_gap=self.min_cell_height)
# Find vertical line x-coordinates
v_projection = np.sum(vertical_mask, axis=0)
v_lines = self._find_line_positions(v_projection, min_gap=self.min_cell_width)
if len(h_lines) < 2 or len(v_lines) < 2:
logger.debug(f"Insufficient lines: {len(h_lines)} horizontal, {len(v_lines)} vertical")
return []
# Build cells from grid
cells = []
for i in range(len(h_lines) - 1):
for j in range(len(v_lines) - 1):
y1, y2 = h_lines[i], h_lines[i + 1]
x1, x2 = v_lines[j], v_lines[j + 1]
# Validate cell size
if (x2 - x1) >= self.min_cell_width and (y2 - y1) >= self.min_cell_height:
cells.append([float(x1), float(y1), float(x2), float(y2)])
return cells
def _find_line_positions(
self,
projection: np.ndarray,
min_gap: int
) -> List[int]:
"""
Find line positions from projection profile.
Args:
projection: 1D array of pixel sums
min_gap: Minimum gap between lines
Returns:
List of line positions
"""
# Threshold to find peaks (lines)
threshold = np.max(projection) * 0.3
peaks = projection > threshold
# Find transitions (line positions)
positions = []
in_peak = False
peak_start = 0
for i, is_peak in enumerate(peaks):
if is_peak and not in_peak:
peak_start = i
in_peak = True
elif not is_peak and in_peak:
# End of peak - use center
peak_center = (peak_start + i) // 2
if not positions or (peak_center - positions[-1]) >= min_gap:
positions.append(peak_center)
in_peak = False
return positions
def detect_and_merge_with_ml(
self,
image: np.ndarray,
table_bbox: List[float],
ml_cell_boxes: List[List[float]]
) -> List[List[float]]:
"""
Detect cells using CV and merge/validate with ML-detected boxes.
CV detection is used as the primary source for wired tables,
with ML boxes used to fill gaps or validate.
Args:
image: Input image
table_bbox: Table bounding box [x1, y1, x2, y2]
ml_cell_boxes: Cell boxes from ML model (RT-DETR-L)
Returns:
Merged/validated cell boxes
"""
cv_cells = self.detect_cells(image, table_bbox)
if not cv_cells:
# CV detection failed, fall back to ML
logger.info("[CV] No cells detected by CV, using ML cells")
return ml_cell_boxes
if not ml_cell_boxes:
# Only CV cells available
return cv_cells
# Validate: CV should find structured grid
# If CV found significantly fewer cells, there might be merged cells
cv_count = len(cv_cells)
ml_count = len(ml_cell_boxes)
logger.info(f"[CV] CV detected {cv_count} cells, ML detected {ml_count} cells")
# For wired tables, prefer CV detection (cleaner grid)
if cv_count >= ml_count * 0.5:
# CV found reasonable number of cells
return cv_cells
else:
# CV might have missed cells (possibly due to merged cells)
# Try to use ML boxes that don't overlap with CV cells
merged = list(cv_cells)
for ml_box in ml_cell_boxes:
if not self._has_significant_overlap(ml_box, cv_cells):
merged.append(ml_box)
return merged
def _has_significant_overlap(
self,
box: List[float],
boxes: List[List[float]],
threshold: float = 0.5
) -> bool:
"""Check if box significantly overlaps with any box in the list."""
for other in boxes:
iou = self._calculate_iou(box, other)
if iou > threshold:
return True
return False
def _calculate_iou(
self,
box1: List[float],
box2: List[float]
) -> float:
"""Calculate Intersection over Union of two boxes."""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
if x2 <= x1 or y2 <= y1:
return 0.0
intersection = (x2 - x1) * (y2 - y1)
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0.0
def load_image(image_path: str) -> Optional[np.ndarray]:
"""Load image from path."""
path = Path(image_path)
if not path.exists():
logger.error(f"Image not found: {image_path}")
return None
return cv2.imread(str(path))

View File

@@ -212,7 +212,8 @@ class GapFillingService:
def _is_region_covered(
self,
region: TextRegion,
pp_structure_elements: List[DocumentElement]
pp_structure_elements: List[DocumentElement],
skip_table_coverage: bool = True
) -> bool:
"""
Check if a raw OCR region is covered by any PP-StructureV3 element.
@@ -220,6 +221,9 @@ class GapFillingService:
Args:
region: Raw OCR text region
pp_structure_elements: List of PP-StructureV3 elements
skip_table_coverage: If True, don't consider TABLE elements as covering
(allows raw OCR text inside tables to pass through
for layered rendering)
Returns:
True if the region is covered
@@ -228,6 +232,12 @@ class GapFillingService:
region_bbox = region.normalized_bbox
for element in pp_structure_elements:
# Skip TABLE elements when checking coverage
# This allows raw OCR text inside tables to be preserved
# PDF generator will render: table borders + raw text positions
if skip_table_coverage and element.type == ElementType.TABLE:
continue
elem_bbox = (
element.bbox.x0, element.bbox.y0,
element.bbox.x1, element.bbox.y1

View File

@@ -184,6 +184,99 @@ class LayoutPreprocessingService:
return normalized
def remove_scan_artifacts(
self,
image: np.ndarray,
line_thickness: int = 5,
min_line_length_ratio: float = 0.3,
faint_threshold: int = 30
) -> np.ndarray:
"""
Remove horizontal scan line artifacts from scanned documents.
Scanner light bar artifacts appear as FAINT horizontal lines across the image.
Key distinction from table borders:
- Scan artifacts are LIGHT/FAINT (close to background color)
- Table borders are DARK/BOLD (high contrast)
Method:
1. Detect horizontal edges using Sobel filter
2. Filter to keep only FAINT edges (low contrast)
3. Find continuous horizontal segments
4. Remove only faint horizontal lines while preserving bold table borders
Args:
image: Input image (BGR)
line_thickness: Maximum thickness of lines to remove (pixels)
min_line_length_ratio: Minimum line length as ratio of image width (0.0-1.0)
faint_threshold: Maximum edge strength for "faint" lines (0-255)
Returns:
Image with scan artifacts removed (BGR)
"""
h, w = image.shape[:2]
min_line_length = int(w * min_line_length_ratio)
# Convert to grayscale for detection
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy()
# Step 1: Detect horizontal edges using Sobel (vertical gradient)
# Scan artifacts will have weak gradients, table borders will have strong gradients
sobel_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3)
sobel_abs = np.abs(sobel_y).astype(np.uint8)
# Step 2: Find FAINT horizontal edges only (low gradient magnitude)
# Strong edges (table borders) have high sobel values
# Faint edges (scan artifacts) have low sobel values
faint_edges = (sobel_abs > 5) & (sobel_abs < faint_threshold)
faint_edges = faint_edges.astype(np.uint8) * 255
# Step 3: Use horizontal morphological operations to find continuous lines
horizontal_kernel = cv2.getStructuringElement(
cv2.MORPH_RECT,
(min_line_length, 1)
)
# Opening removes short segments, keeping only long horizontal lines
horizontal_lines = cv2.morphologyEx(
faint_edges, cv2.MORPH_OPEN, horizontal_kernel, iterations=1
)
# Dilate slightly to cover the full artifact width
dilate_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, line_thickness))
line_mask = cv2.dilate(horizontal_lines, dilate_kernel, iterations=1)
# Check if any artifacts were detected
artifact_pixels = np.sum(line_mask > 0)
if artifact_pixels < 100:
logger.debug("No faint scan artifacts detected")
return image
# Calculate artifact coverage
total_pixels = h * w
coverage_ratio = artifact_pixels / total_pixels
# Faint artifacts should cover a small portion of the image
if coverage_ratio > 0.05: # More than 5% is suspicious
logger.debug(f"Faint artifact detection: coverage={coverage_ratio:.2%} (processing anyway)")
# Only process if coverage is not excessive
if coverage_ratio > 0.15: # More than 15% is definitely too much
logger.debug(f"Artifact detection rejected: coverage too high ({coverage_ratio:.2%})")
return image
# Use inpainting to remove artifacts
result = cv2.inpaint(image, line_mask, inpaintRadius=3, flags=cv2.INPAINT_TELEA)
logger.info(
f"Scan artifacts removed: {artifact_pixels} pixels ({coverage_ratio:.2%}), faint_threshold={faint_threshold}"
)
return result
def scale_for_layout_detection(
self,
image: np.ndarray,
@@ -346,9 +439,13 @@ class LayoutPreprocessingService:
# Only enable for extremely low contrast (< 15) which indicates a scan quality issue
binarize = False # Disabled by default
# Scan artifact removal is always enabled in auto mode for scanned documents
remove_scan_artifacts = True
logger.debug(
f"Auto config: contrast={contrast} strength={contrast_strength:.2f}, "
f"sharpen={sharpen} strength={sharpen_strength:.2f}, binarize={binarize}"
f"sharpen={sharpen} strength={sharpen_strength:.2f}, binarize={binarize}, "
f"remove_scan_artifacts={remove_scan_artifacts}"
)
return PreprocessingConfig(
@@ -356,7 +453,8 @@ class LayoutPreprocessingService:
contrast_strength=round(contrast_strength, 2),
sharpen=sharpen,
sharpen_strength=round(sharpen_strength, 2),
binarize=binarize
binarize=binarize,
remove_scan_artifacts=remove_scan_artifacts
)
def apply_contrast_enhancement(
@@ -550,7 +648,8 @@ class LayoutPreprocessingService:
config_used=PreprocessingConfig(
contrast=PreprocessingContrastEnum.NONE,
sharpen=False,
binarize=False
binarize=False,
remove_scan_artifacts=False
),
quality_metrics=metrics,
was_processed=scaling_info.was_scaled, # True if scaling was applied
@@ -568,6 +667,13 @@ class LayoutPreprocessingService:
processed = scaled_image.copy()
was_processed = scaling_info.was_scaled # Start with True if already scaled
# Step 0: Remove scan artifacts BEFORE any enhancement
# This prevents scanner light bar lines from being enhanced and misdetected as table borders
if getattr(config, 'remove_scan_artifacts', True): # Default True for backwards compatibility
processed = self.remove_scan_artifacts(processed)
was_processed = True
logger.debug("Applied scan artifact removal")
# Step 1: Contrast enhancement
if config.contrast != PreprocessingContrastEnum.NONE:
processed = self.apply_contrast_enhancement(

View File

@@ -30,7 +30,7 @@ from app.services.layout_preprocessing_service import (
get_layout_preprocessing_service,
LayoutPreprocessingService,
)
from app.schemas.task import PreprocessingModeEnum, PreprocessingConfig
from app.schemas.task import PreprocessingModeEnum, PreprocessingConfig, TableDetectionConfig
# Import dual-track components
try:
@@ -454,7 +454,11 @@ class OCRService:
return self.ocr_engines[lang]
def _ensure_structure_engine(self, layout_model: Optional[str] = None) -> PPStructureV3:
def _ensure_structure_engine(
self,
layout_model: Optional[str] = None,
table_detection_config: Optional[TableDetectionConfig] = None
) -> PPStructureV3:
"""
Get or create PP-Structure engine for layout analysis with GPU support.
Supports layout model selection for different document types.
@@ -465,6 +469,10 @@ class OCRService:
- "default": PubLayNet-based (best for English documents)
- "cdla": CDLA model (alternative for Chinese layout)
- None: Use config default
table_detection_config: Table detection configuration
- enable_wired_table: Enable bordered table detection
- enable_wireless_table: Enable borderless table detection
- enable_region_detection: Enable region detection
Returns:
PPStructure engine instance
@@ -492,6 +500,19 @@ class OCRService:
logger.info(f"Layout model changed from {current_model} to {layout_model}, recreating engine")
self.structure_engine = None # Force recreation
# Check if we need to recreate the engine due to different table detection config
current_table_config = getattr(self, '_current_table_detection_config', None)
if self.structure_engine is not None and table_detection_config:
# Compare table detection settings
new_config_tuple = (
table_detection_config.enable_wired_table,
table_detection_config.enable_wireless_table,
table_detection_config.enable_region_detection
)
if current_table_config != new_config_tuple:
logger.info(f"Table detection config changed from {current_table_config} to {new_config_tuple}, recreating engine")
self.structure_engine = None # Force recreation
# Use cached engine or create new one
if self.structure_engine is None:
logger.info(f"Initializing PP-StructureV3 engine (GPU: {self.use_gpu})")
@@ -504,6 +525,15 @@ class OCRService:
use_table = settings.enable_table_recognition
use_seal = settings.enable_seal_recognition
use_region = settings.enable_region_detection
# Apply table detection config overrides if provided
if table_detection_config:
# If both wired and wireless are disabled, disable table recognition entirely
if not table_detection_config.enable_wired_table and not table_detection_config.enable_wireless_table:
use_table = False
use_region = table_detection_config.enable_region_detection
logger.info(f"Table detection config applied: wired={table_detection_config.enable_wired_table}, "
f"wireless={table_detection_config.enable_wireless_table}, region={use_region}")
layout_threshold = settings.layout_detection_threshold
layout_nms = settings.layout_nms_threshold
layout_merge = settings.layout_merge_mode
@@ -538,6 +568,17 @@ class OCRService:
formula_model = settings.formula_recognition_model_name
chart_model = settings.chart_recognition_model_name
# Apply table detection config overrides for individual table types
if table_detection_config:
if not table_detection_config.enable_wired_table:
wired_table_model = None
wired_cell_det_model = None
logger.info("Wired table detection disabled by config")
if not table_detection_config.enable_wireless_table:
wireless_table_model = None
wireless_cell_det_model = None
logger.info("Wireless table detection disabled by config")
# Text detection/recognition model configuration
text_det_model = settings.text_detection_model_name
text_rec_model = settings.text_recognition_model_name
@@ -641,6 +682,15 @@ class OCRService:
# Track model loading for cache management
self._model_last_used['structure'] = datetime.now()
self._current_layout_model = layout_model # Track current model for recreation check
# Track table detection config for recreation check
if table_detection_config:
self._current_table_detection_config = (
table_detection_config.enable_wired_table,
table_detection_config.enable_wireless_table,
table_detection_config.enable_region_detection
)
else:
self._current_table_detection_config = None
logger.info(f"PP-StructureV3 engine ready (PaddlePaddle {paddle.__version__}, {'GPU' if self.use_gpu else 'CPU'} mode)")
@@ -712,6 +762,15 @@ class OCRService:
self.structure_engine = PPStructureV3(**cpu_kwargs)
self._current_layout_model = layout_model # Track current model for recreation check
# Track table detection config for recreation check
if table_detection_config:
self._current_table_detection_config = (
table_detection_config.enable_wired_table,
table_detection_config.enable_wireless_table,
table_detection_config.enable_region_detection
)
else:
self._current_table_detection_config = None
logger.info(f"PP-StructureV3 engine ready (CPU mode - fallback, layout_model={settings.layout_detection_model_name})")
else:
raise
@@ -956,7 +1015,8 @@ class OCRService:
current_page: int = 0,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
preprocessing_config: Optional[PreprocessingConfig] = None,
table_detection_config: Optional[TableDetectionConfig] = None
) -> Dict:
"""
Process single image with OCR and layout analysis
@@ -971,6 +1031,7 @@ class OCRService:
layout_model: Layout detection model ('chinese', 'default', 'cdla')
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
table_detection_config: Table detection config (wired/wireless/region options)
Returns:
Dictionary with OCR results and metadata
@@ -1041,7 +1102,8 @@ class OCRService:
current_page=page_num - 1, # Convert to 0-based page number for layout data
layout_model=layout_model,
preprocessing_mode=preprocessing_mode,
preprocessing_config=preprocessing_config
preprocessing_config=preprocessing_config,
table_detection_config=table_detection_config
)
# Accumulate results
@@ -1189,7 +1251,8 @@ class OCRService:
current_page=current_page,
layout_model=layout_model,
preprocessing_mode=preprocessing_mode,
preprocessing_config=preprocessing_config
preprocessing_config=preprocessing_config,
table_detection_config=table_detection_config
)
# Generate Markdown
@@ -1347,7 +1410,8 @@ class OCRService:
current_page: int = 0,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
preprocessing_config: Optional[PreprocessingConfig] = None,
table_detection_config: Optional[TableDetectionConfig] = None
) -> Tuple[Optional[Dict], List[Dict]]:
"""
Analyze document layout using PP-StructureV3 with enhanced element extraction
@@ -1359,6 +1423,7 @@ class OCRService:
layout_model: Layout detection model ('chinese', 'default', 'cdla')
preprocessing_mode: Preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
table_detection_config: Table detection config (wired/wireless/region options)
Returns:
Tuple of (layout_data, images_metadata)
@@ -1376,7 +1441,7 @@ class OCRService:
f"Mode: {'CPU fallback' if self._cpu_fallback_active else 'GPU'}"
)
structure_engine = self._ensure_structure_engine(layout_model)
structure_engine = self._ensure_structure_engine(layout_model, table_detection_config)
# Apply image preprocessing for layout detection
# Preprocessing includes:
@@ -1432,10 +1497,19 @@ class OCRService:
# Get scaling info for bbox coordinate restoration
scaling_info = preprocessing_result.scaling_info if preprocessing_result else None
# CV table detection is disabled due to poor performance on complex tables
# Issues: 1) Detected boundaries smaller than content
# 2) Incorrectly splits merged cells
# The ML-based RT-DETR-L detection is currently more reliable.
# TODO: Improve CV algorithm with better line detection and grid alignment
use_cv_table_detection = False
result = enhanced_processor.analyze_with_full_structure(
image_path, output_dir, current_page,
preprocessed_image=preprocessed_image,
scaling_info=scaling_info
scaling_info=scaling_info,
save_visualization=True, # Save layout detection visualization images
use_cv_table_detection=use_cv_table_detection
)
if result.get('has_parsing_res_list'):
@@ -1673,7 +1747,8 @@ class OCRService:
force_track: Optional[str] = None,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
preprocessing_config: Optional[PreprocessingConfig] = None,
table_detection_config: Optional[TableDetectionConfig] = None
) -> Union[UnifiedDocument, Dict]:
"""
Process document using dual-track approach.
@@ -1688,6 +1763,7 @@ class OCRService:
layout_model: Layout detection model ('chinese', 'default', 'cdla') (used for OCR track only)
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
table_detection_config: Table detection config (wired/wireless/region options)
Returns:
UnifiedDocument if dual-track is enabled, Dict otherwise
@@ -1696,7 +1772,7 @@ class OCRService:
# Fallback to traditional OCR processing
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
start_time = datetime.now()
@@ -1770,7 +1846,8 @@ class OCRService:
confidence_threshold=confidence_threshold,
output_dir=output_dir, layout_model=layout_model,
preprocessing_mode=preprocessing_mode,
preprocessing_config=preprocessing_config
preprocessing_config=preprocessing_config,
table_detection_config=table_detection_config
)
# Convert OCR result to extract images
@@ -1804,7 +1881,7 @@ class OCRService:
logger.info("Using OCR track (PaddleOCR)")
ocr_result = self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
# Convert OCR result to UnifiedDocument using the converter
@@ -1835,7 +1912,7 @@ class OCRService:
# Fallback to traditional OCR
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
def _merge_ocr_images_into_direct(
@@ -1916,7 +1993,8 @@ class OCRService:
output_dir: Optional[Path] = None,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
preprocessing_config: Optional[PreprocessingConfig] = None,
table_detection_config: Optional[TableDetectionConfig] = None
) -> Dict:
"""
Traditional OCR processing (legacy method).
@@ -1930,6 +2008,7 @@ class OCRService:
layout_model: Layout detection model ('chinese', 'default', 'cdla')
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
table_detection_config: Table detection config (wired/wireless/region options)
Returns:
Dictionary with OCR results in legacy format
@@ -1943,7 +2022,7 @@ class OCRService:
for i, image_path in enumerate(image_paths):
result = self.process_image(
image_path, lang, detect_layout, confidence_threshold, output_dir, i, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
all_results.append(result)
@@ -1960,7 +2039,7 @@ class OCRService:
# Single image or other file
return self.process_image(
file_path, lang, detect_layout, confidence_threshold, output_dir, 0, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
def _combine_results(self, results: List[Dict]) -> Dict:
@@ -2047,7 +2126,8 @@ class OCRService:
force_track: Optional[str] = None,
layout_model: Optional[str] = None,
preprocessing_mode: Optional[PreprocessingModeEnum] = None,
preprocessing_config: Optional[PreprocessingConfig] = None
preprocessing_config: Optional[PreprocessingConfig] = None,
table_detection_config: Optional[TableDetectionConfig] = None
) -> Union[UnifiedDocument, Dict]:
"""
Main processing method with dual-track support.
@@ -2063,6 +2143,7 @@ class OCRService:
layout_model: Layout detection model ('chinese', 'default', 'cdla') (used for OCR track only)
preprocessing_mode: Layout preprocessing mode ('auto', 'manual', 'disabled')
preprocessing_config: Manual preprocessing config (used when mode='manual')
table_detection_config: Table detection config (wired/wireless/region options)
Returns:
UnifiedDocument if dual-track is enabled and use_dual_track=True,
@@ -2075,13 +2156,13 @@ class OCRService:
# Use dual-track processing (or forced track)
return self.process_with_dual_track(
file_path, lang, detect_layout, confidence_threshold, output_dir, force_track, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
else:
# Use traditional OCR processing (no force_track support)
return self.process_file_traditional(
file_path, lang, detect_layout, confidence_threshold, output_dir, layout_model,
preprocessing_mode, preprocessing_config
preprocessing_mode, preprocessing_config, table_detection_config
)
def process_legacy(

View File

@@ -590,8 +590,17 @@ class OCRToUnifiedConverter:
# Prepare content based on element type
if element_type == ElementType.TABLE:
# For tables, use TableData as content
# Pass cell_boxes for accurate cell positioning
table_data = self._extract_table_data(elem_data)
content = table_data if table_data else elem_data.get('content', '')
# Preserve cell_boxes and embedded_images in metadata for PDF generation
# These are extracted by PP-StructureV3 and provide accurate cell positioning
if 'cell_boxes' in elem_data:
elem_data.setdefault('metadata', {})['cell_boxes'] = elem_data['cell_boxes']
elem_data['metadata']['cell_boxes_source'] = elem_data.get('cell_boxes_source', 'table_res_list')
if 'embedded_images' in elem_data:
elem_data.setdefault('metadata', {})['embedded_images'] = elem_data['embedded_images']
elif element_type in [ElementType.IMAGE, ElementType.FIGURE]:
# For images, use metadata dict as content
content = {

View File

@@ -447,7 +447,8 @@ class PDFGeneratorService:
'text': text_content,
'bbox': bbox_polygon,
'confidence': element.confidence or 1.0,
'page': page_num
'page': page_num,
'element_type': element.type.value # Include element type for styling
}
# Include style information if available (for Direct track)
@@ -466,13 +467,24 @@ class PDFGeneratorService:
else:
html_content = str(element.content)
layout_elements.append({
table_element = {
'type': 'table',
'content': html_content,
'bbox': [element.bbox.x0, element.bbox.y0,
element.bbox.x1, element.bbox.y1],
'page': page_num - 1 # layout uses 0-based
})
}
# Preserve cell_boxes and embedded_images from metadata
# These are extracted by PP-StructureV3 and used for accurate table rendering
if element.metadata:
if 'cell_boxes' in element.metadata:
table_element['cell_boxes'] = element.metadata['cell_boxes']
table_element['cell_boxes_source'] = element.metadata.get('cell_boxes_source', 'metadata')
if 'embedded_images' in element.metadata:
table_element['embedded_images'] = element.metadata['embedded_images']
layout_elements.append(table_element)
# Add bbox to images_metadata for text overlap filtering
# (no actual image file, just bbox for filtering)
@@ -484,10 +496,10 @@ class PDFGeneratorService:
'element_id': element.element_id
})
# Handle image/visual elements
# Handle image/visual elements (including stamps/seals)
elif element.is_visual or element.type in [
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
ElementType.DIAGRAM, ElementType.LOGO
ElementType.DIAGRAM, ElementType.LOGO, ElementType.STAMP
]:
# Get image path using fallback logic
image_path = self._get_image_path(element)
@@ -729,13 +741,13 @@ class PDFGeneratorService:
regions_to_avoid.append(element) # Tables are exclusion regions
elif element.is_visual or element.type in [
ElementType.IMAGE, ElementType.FIGURE,
ElementType.CHART, ElementType.DIAGRAM, ElementType.LOGO
ElementType.CHART, ElementType.DIAGRAM, ElementType.LOGO, ElementType.STAMP
]:
image_elements.append(element)
# Only add real images to exclusion regions, NOT charts/diagrams
# Charts often have large bounding boxes that include text labels
# which should be rendered as selectable text on top
if element.type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.LOGO]:
if element.type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.LOGO, ElementType.STAMP]:
regions_to_avoid.append(element)
elif element.type == ElementType.LIST_ITEM:
list_elements.append(element)
@@ -934,11 +946,14 @@ class PDFGeneratorService:
# Create PDF canvas with initial page size (will be updated per page)
pdf_canvas = canvas.Canvas(str(output_path), pagesize=(target_width, target_height))
# Filter text regions to avoid overlap with tables/images
regions_to_avoid = images_metadata
# LAYERED RENDERING: Exclude tables from regions_to_avoid
# Text inside tables will be rendered at raw OCR positions (via GapFillingService)
# while table borders are drawn separately using cell_boxes
# Only avoid overlap with actual images/figures/charts
regions_to_avoid = [img for img in images_metadata if img.get('type') != 'table']
table_count = len([img for img in images_metadata if img.get('type') == 'table'])
logger.info(f"過濾文字區域: {len(regions_to_avoid)} 個區域需要避免 ( {table_count} 個表格)")
logger.info(f"過濾文字區域: {len(regions_to_avoid)} 個區域需要避免 (不含表格), {table_count} 個表格使用分層渲染")
filtered_text_regions = self._filter_text_in_regions(text_regions, regions_to_avoid)
@@ -1042,7 +1057,8 @@ class PDFGeneratorService:
for table_elem in page_table_regions:
self.draw_table_region(
pdf_canvas, table_elem, images_metadata,
current_target_h, current_scale_w, current_scale_h
current_target_h, current_scale_w, current_scale_h,
result_dir=json_parent_dir
)
# 3. Draw text (top layer)
@@ -1542,8 +1558,8 @@ class PDFGeneratorService:
logger.info(f"[文字] '{text[:30]}' → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 字體:{font_size:.1f}pt, 寬x高:{bbox_width:.0f}x{bbox_height:.0f}, 行數:{num_lines}")
# Set font with track-specific styling
# Note: OCR track has no StyleInfo (extracted from images), so no advanced formatting
style_info = region.get('style')
element_type = region.get('element_type', 'text')
is_direct_track = (self.current_processing_track == ProcessingTrack.DIRECT or
self.current_processing_track == ProcessingTrack.HYBRID)
@@ -1555,9 +1571,25 @@ class PDFGeneratorService:
font_size = pdf_canvas._fontsize
logger.debug(f"Applied Direct track style: font={font_name}, size={font_size}")
else:
# OCR track or no style: Use simple font selection
# OCR track or no style: Use simple font selection with element-type based styling
font_name = self.font_name if self.font_registered else 'Helvetica'
pdf_canvas.setFont(font_name, font_size)
# Apply element-type specific styling (for OCR track)
if element_type == 'title':
# Titles: use larger, bold font
font_size = min(font_size * 1.3, 36) # 30% larger, max 36pt
pdf_canvas.setFont(font_name, font_size)
logger.debug(f"Applied title style: size={font_size:.1f}")
elif element_type == 'header':
# Headers: slightly larger
font_size = min(font_size * 1.15, 24) # 15% larger, max 24pt
pdf_canvas.setFont(font_name, font_size)
elif element_type == 'caption':
# Captions: slightly smaller, italic if available
font_size = max(font_size * 0.9, 6) # 10% smaller, min 6pt
pdf_canvas.setFont(font_name, font_size)
else:
pdf_canvas.setFont(font_name, font_size)
# Handle line breaks (split text by newlines)
# OCR track: simple left-aligned rendering
@@ -1726,7 +1758,8 @@ class PDFGeneratorService:
images_metadata: List[Dict],
page_height: float,
scale_w: float = 1.0,
scale_h: float = 1.0
scale_h: float = 1.0,
result_dir: Optional[Path] = None
):
"""
Draw a table region by parsing HTML and rebuilding with ReportLab Table
@@ -1738,13 +1771,27 @@ class PDFGeneratorService:
page_height: Height of page
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
result_dir: Directory containing result files (for embedded images)
"""
try:
html_content = table_element.get('content', '')
if not html_content:
return
# Parse HTML to extract table structure
# Try to use cell_boxes for direct rendering first (more accurate)
cell_boxes = table_element.get('cell_boxes', [])
if cell_boxes:
logger.info(f"[TABLE] Using cell_boxes direct rendering ({len(cell_boxes)} cells)")
success = self._draw_table_with_cell_boxes(
pdf_canvas, table_element, page_height,
scale_w, scale_h, result_dir
)
if success:
return # Successfully rendered with cell_boxes
logger.info("[TABLE] Falling back to ReportLab Table")
# Fallback: Parse HTML to extract table structure and use ReportLab Table
parser = HTMLTableParser()
parser.feed(html_content)
@@ -1901,14 +1948,18 @@ class PDFGeneratorService:
logger.info(f"[TABLE] Using cell_boxes col widths (scaled)")
else:
col_widths = [table_width / max_cols] * max_cols
logger.info(f"[TABLE] Using equal distribution col widths")
logger.info(f"[TABLE] Using equal distribution col widths: {table_width/max_cols:.1f} each")
# Row heights are used optionally (ReportLab can auto-size)
row_heights = None
# Row heights - ALWAYS use to ensure table fits bbox properly
# Use computed heights from cell_boxes, or uniform distribution as fallback
if computed_row_heights:
# Scale row_heights to PDF coordinates
row_heights = [h * scale_h for h in computed_row_heights]
logger.debug(f"[TABLE] Cell_boxes row heights available (scaled)")
logger.info(f"[TABLE] Using cell_boxes row heights (scaled)")
else:
# Uniform distribution based on table bbox - ensures table fills its allocated space
row_heights = [table_height / num_rows] * num_rows
logger.info(f"[TABLE] Using uniform row heights: {table_height/num_rows:.1f} each")
# Create ReportLab Table
# Use smaller font to fit content with auto-wrap
@@ -1932,12 +1983,10 @@ class PDFGeneratorService:
escaped_text = cell_text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
reportlab_data[row_idx][col_idx] = Paragraph(escaped_text, cell_style)
# Create table with computed col widths
# Note: We don't use row_heights even when available from cell_boxes because:
# 1. ReportLab's auto-sizing handles content overflow better
# 2. Fixed heights can cause text clipping when content exceeds cell size
# 3. The col_widths from cell_boxes provide the main layout benefit
table = Table(reportlab_data, colWidths=col_widths)
# Create table with col widths and row heights
# Always use row_heights to ensure table fits bbox properly
table = Table(reportlab_data, colWidths=col_widths, rowHeights=row_heights)
logger.info(f"[TABLE] Created with {len(col_widths)} cols, {len(row_heights)} rows")
# Apply table style
style = TableStyle([
@@ -1974,26 +2023,303 @@ class PDFGeneratorService:
scale_y = table_height / actual_height if actual_height > table_height else 1.0
scale_factor = min(scale_x, scale_y) # Use smaller scale to fit both dimensions
# Calculate the table top position in PDF coordinates
# ReportLab uses bottom-left origin, so we need to position from TOP
pdf_y_top = page_height - ocr_y_top # Top of table in PDF coords
# Calculate the actual bottom position based on scaled height
# Table should be positioned so its TOP aligns with the bbox top
scaled_height = actual_height * scale_factor
pdf_y_bottom = pdf_y_top - scaled_height # Bottom of scaled table
logger.info(f"[表格] PDF座標: top={pdf_y_top:.0f}, bottom={pdf_y_bottom:.0f}, scaled_height={scaled_height:.0f}")
if scale_factor < 1.0:
logger.info(f"[表格] 縮放比例: {scale_factor:.2f} (需要縮小以適應 bbox)")
# Apply scaling transformation
pdf_canvas.saveState()
pdf_canvas.translate(pdf_x, pdf_y)
pdf_canvas.translate(pdf_x, pdf_y_bottom)
pdf_canvas.scale(scale_factor, scale_factor)
# Draw at origin since we've already translated
table.drawOn(pdf_canvas, 0, 0)
pdf_canvas.restoreState()
else:
# Draw table at position without scaling
table.drawOn(pdf_canvas, pdf_x, pdf_y)
# pdf_y should be the bottom of the table
table.drawOn(pdf_canvas, pdf_x, pdf_y_bottom)
logger.info(f"Drew table at ({pdf_x:.0f}, {pdf_y:.0f}) size {table_width:.0f}x{table_height:.0f} with {len(rows)} rows")
logger.info(f"Drew table at ({pdf_x:.0f}, {pdf_y_bottom:.0f}) size {table_width:.0f}x{scaled_height:.0f} with {len(rows)} rows")
# Draw embedded images (images detected inside the table region)
embedded_images = table_element.get('embedded_images', [])
if embedded_images and result_dir:
logger.info(f"[TABLE] Drawing {len(embedded_images)} embedded images")
for emb_img in embedded_images:
self._draw_embedded_image(
pdf_canvas, emb_img, page_height, result_dir, scale_w, scale_h
)
except Exception as e:
logger.warning(f"Failed to draw table region: {e}")
import traceback
traceback.print_exc()
def _draw_embedded_image(
self,
pdf_canvas: canvas.Canvas,
emb_img: Dict,
page_height: float,
result_dir: Path,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""Draw an embedded image inside a table region."""
try:
# Get image path
saved_path = emb_img.get('saved_path', '')
if not saved_path:
return
# Construct full path
image_path = result_dir / saved_path
if not image_path.exists():
image_path = result_dir / Path(saved_path).name
if not image_path.exists():
logger.warning(f"Embedded image not found: {saved_path}")
return
# Get bbox from embedded image data
bbox = emb_img.get('bbox', [])
if not bbox or len(bbox) < 4:
logger.warning(f"No bbox for embedded image: {saved_path}")
return
# Calculate position (bbox is [x0, y0, x1, y1])
x0, y0, x1, y1 = bbox[0], bbox[1], bbox[2], bbox[3]
# Apply scaling
x0_scaled = x0 * scale_w
y0_scaled = y0 * scale_h
x1_scaled = x1 * scale_w
y1_scaled = y1 * scale_h
width = x1_scaled - x0_scaled
height = y1_scaled - y0_scaled
# Transform Y coordinate (ReportLab uses bottom-left origin)
pdf_x = x0_scaled
pdf_y = page_height - y1_scaled
# Draw the image
from reportlab.lib.utils import ImageReader
img_reader = ImageReader(str(image_path))
pdf_canvas.drawImage(
img_reader, pdf_x, pdf_y, width, height,
preserveAspectRatio=True, mask='auto'
)
logger.info(f"Drew embedded image at ({pdf_x:.0f}, {pdf_y:.0f}) size {width:.0f}x{height:.0f}")
except Exception as e:
logger.warning(f"Failed to draw embedded image: {e}")
def _normalize_cell_boxes_to_grid(
self,
cell_boxes: List[List[float]],
threshold: float = 10.0
) -> List[List[float]]:
"""
Normalize cell boxes to create a proper aligned grid.
Groups nearby coordinates and snaps them to a common value,
eliminating the 2-11 pixel variations that cause skewed tables.
Args:
cell_boxes: List of cell bboxes [[x1,y1,x2,y2], ...]
threshold: Maximum distance to consider coordinates as "same line"
Returns:
Normalized cell_boxes with aligned coordinates
"""
if not cell_boxes or len(cell_boxes) < 2:
return cell_boxes
# Collect all X and Y coordinates
x_coords = [] # (value, box_idx, is_x1)
y_coords = [] # (value, box_idx, is_y1)
for i, box in enumerate(cell_boxes):
x1, y1, x2, y2 = box[0], box[1], box[2], box[3]
x_coords.append((x1, i, True)) # x1 (left)
x_coords.append((x2, i, False)) # x2 (right)
y_coords.append((y1, i, True)) # y1 (top)
y_coords.append((y2, i, False)) # y2 (bottom)
def cluster_and_normalize(coords, threshold):
"""Cluster nearby coordinates and return mapping to normalized values."""
if not coords:
return {}
# Sort by value
sorted_coords = sorted(coords, key=lambda x: x[0])
# Cluster nearby values
clusters = []
current_cluster = [sorted_coords[0]]
for coord in sorted_coords[1:]:
if coord[0] - current_cluster[-1][0] <= threshold:
current_cluster.append(coord)
else:
clusters.append(current_cluster)
current_cluster = [coord]
clusters.append(current_cluster)
# Create mapping: (box_idx, is_first) -> normalized value
mapping = {}
for cluster in clusters:
# Use average of cluster as normalized value
avg_value = sum(c[0] for c in cluster) / len(cluster)
for _, box_idx, is_first in cluster:
mapping[(box_idx, is_first)] = avg_value
return mapping
x_mapping = cluster_and_normalize(x_coords, threshold)
y_mapping = cluster_and_normalize(y_coords, threshold)
# Create normalized cell boxes
normalized_boxes = []
for i, box in enumerate(cell_boxes):
x1_norm = x_mapping.get((i, True), box[0])
x2_norm = x_mapping.get((i, False), box[2])
y1_norm = y_mapping.get((i, True), box[1])
y2_norm = y_mapping.get((i, False), box[3])
normalized_boxes.append([x1_norm, y1_norm, x2_norm, y2_norm])
logger.debug(f"[TABLE] Normalized {len(cell_boxes)} cell boxes to grid")
return normalized_boxes
def _draw_table_with_cell_boxes(
self,
pdf_canvas: canvas.Canvas,
table_element: Dict,
page_height: float,
scale_w: float = 1.0,
scale_h: float = 1.0,
result_dir: Optional[Path] = None
):
"""
Draw table borders using cell_boxes for accurate positioning.
LAYERED RENDERING APPROACH:
- This method ONLY draws cell borders and embedded images
- Text is rendered separately using raw OCR positions (via GapFillingService)
- This decouples visual structure (borders) from content (text)
FALLBACK: If cell_boxes are incomplete, always draws the outer table
border using the table's bbox to ensure table boundaries are visible.
Args:
pdf_canvas: ReportLab canvas object
table_element: Table element dict with cell_boxes
page_height: Height of page in PDF coordinates
scale_w: Scale factor for X coordinates
scale_h: Scale factor for Y coordinates
result_dir: Directory containing result files (for embedded images)
"""
try:
cell_boxes = table_element.get('cell_boxes', [])
# Always draw outer table border first (fallback for incomplete cell_boxes)
table_bbox = table_element.get('bbox', [])
if table_bbox and len(table_bbox) >= 4:
# Handle different bbox formats (list or dict)
if isinstance(table_bbox, dict):
tx1 = float(table_bbox.get('x0', 0))
ty1 = float(table_bbox.get('y0', 0))
tx2 = float(table_bbox.get('x1', 0))
ty2 = float(table_bbox.get('y1', 0))
else:
tx1, ty1, tx2, ty2 = table_bbox[:4]
# Apply scaling
tx1_scaled = tx1 * scale_w
ty1_scaled = ty1 * scale_h
tx2_scaled = tx2 * scale_w
ty2_scaled = ty2 * scale_h
table_width = tx2_scaled - tx1_scaled
table_height = ty2_scaled - ty1_scaled
# Transform Y coordinate (PDF uses bottom-left origin)
pdf_x = tx1_scaled
pdf_y = page_height - ty2_scaled # Bottom of table in PDF coords
# Draw outer table border (slightly thicker for visibility)
pdf_canvas.setStrokeColor(colors.black)
pdf_canvas.setLineWidth(1.0)
pdf_canvas.rect(pdf_x, pdf_y, table_width, table_height, stroke=1, fill=0)
logger.info(f"[TABLE] Drew outer table border at [{int(tx1)},{int(ty1)},{int(tx2)},{int(ty2)}]")
if not cell_boxes:
logger.warning("[TABLE] No cell_boxes available, only outer border drawn")
# Still draw embedded images even without cell borders
embedded_images = table_element.get('embedded_images', [])
if embedded_images and result_dir:
for emb_img in embedded_images:
self._draw_embedded_image(
pdf_canvas, emb_img, page_height, result_dir, scale_w, scale_h
)
return True # Outer border drawn successfully
# Normalize cell boxes to create aligned grid
cell_boxes = self._normalize_cell_boxes_to_grid(cell_boxes)
logger.info(f"[TABLE] Drawing {len(cell_boxes)} cell borders (layered mode, grid-aligned)")
# Draw each cell border
for box in cell_boxes:
x1, y1, x2, y2 = box[0], box[1], box[2], box[3]
# Apply scaling
x1_scaled = x1 * scale_w
y1_scaled = y1 * scale_h
x2_scaled = x2 * scale_w
y2_scaled = y2 * scale_h
cell_width = x2_scaled - x1_scaled
cell_height = y2_scaled - y1_scaled
# Transform Y coordinate (PDF uses bottom-left origin)
pdf_x = x1_scaled
pdf_y = page_height - y2_scaled # Bottom of cell in PDF coords
# Draw cell border only (no fill, no text)
pdf_canvas.setStrokeColor(colors.black)
pdf_canvas.setLineWidth(0.5)
pdf_canvas.rect(pdf_x, pdf_y, cell_width, cell_height, stroke=1, fill=0)
logger.info(f"[TABLE] Drew {len(cell_boxes)} cell borders")
# Draw embedded images
embedded_images = table_element.get('embedded_images', [])
if embedded_images and result_dir:
logger.info(f"[TABLE] Drawing {len(embedded_images)} embedded images")
for emb_img in embedded_images:
self._draw_embedded_image(
pdf_canvas, emb_img, page_height, result_dir, scale_w, scale_h
)
return True
except Exception as e:
logger.warning(f"[TABLE] Failed to draw cell borders: {e}")
import traceback
traceback.print_exc()
return False
def draw_image_region(
self,
pdf_canvas: canvas.Canvas,
@@ -2923,12 +3249,29 @@ class PDFGeneratorService:
from reportlab.platypus import Table, TableStyle
from reportlab.lib import colors
# Determine number of rows and columns for cell_boxes calculation
num_rows = len(rows)
max_cols = max(len(row['cells']) for row in rows) if rows else 0
# Use original column widths from extraction if available
# Otherwise let ReportLab auto-calculate
# Otherwise try to compute from cell_boxes (from PP-StructureV3)
col_widths = None
if element.metadata and 'column_widths' in element.metadata:
col_widths = element.metadata['column_widths']
logger.debug(f"Using extracted column widths: {col_widths}")
elif element.metadata and 'cell_boxes' in element.metadata:
# Use cell_boxes from PP-StructureV3 for accurate column/row sizing
cell_boxes = element.metadata['cell_boxes']
cell_boxes_source = element.metadata.get('cell_boxes_source', 'unknown')
table_bbox_list = [bbox.x0, bbox.y0, bbox.x1, bbox.y1]
logger.info(f"[TABLE] Using {len(cell_boxes)} cell boxes from {cell_boxes_source}")
computed_col_widths, computed_row_heights = self._compute_table_grid_from_cell_boxes(
cell_boxes, table_bbox_list, num_rows, max_cols
)
if computed_col_widths:
col_widths = computed_col_widths
logger.info(f"[TABLE] Computed {len(col_widths)} column widths from cell_boxes")
# NOTE: Don't use rowHeights from extraction - it causes content overlap
# The extracted row heights are based on cell boundaries, not text content height.

View File

@@ -26,9 +26,11 @@ import paddle
from paddleocr import PPStructureV3
from PIL import Image
import numpy as np
import cv2
from app.models.unified_document import ElementType
from app.core.config import settings
from app.services.memory_manager import prediction_context
from app.services.cv_table_detector import CVTableDetector
logger = logging.getLogger(__name__)
@@ -62,6 +64,7 @@ class PPStructureEnhanced:
'watermark': ElementType.WATERMARK,
'signature': ElementType.SIGNATURE,
'stamp': ElementType.STAMP,
'seal': ElementType.STAMP, # PP-StructureV3 may use 'seal' label
'logo': ElementType.LOGO,
'barcode': ElementType.BARCODE,
'qr-code': ElementType.QR_CODE,
@@ -80,183 +83,15 @@ class PPStructureEnhanced:
"""
self.structure_engine = structure_engine
# Lazy-loaded SLANeXt models for cell boxes extraction
# These are loaded on-demand when enable_table_cell_boxes_extraction is True
self._slanet_wired_model = None
self._slanet_wireless_model = None
self._table_cls_model = None
def _get_slanet_model(self, is_wired: bool = True):
"""
Get or create SLANeXt model for cell boxes extraction (lazy loading).
Args:
is_wired: True for wired (bordered) tables, False for wireless
Returns:
SLANeXt model instance or None if loading fails
"""
if not settings.enable_table_cell_boxes_extraction:
return None
try:
from paddlex import create_model
if is_wired:
if self._slanet_wired_model is None:
model_name = settings.wired_table_model_name or "SLANeXt_wired"
logger.info(f"Loading SLANeXt wired model: {model_name}")
self._slanet_wired_model = create_model(model_name)
return self._slanet_wired_model
else:
if self._slanet_wireless_model is None:
model_name = settings.wireless_table_model_name or "SLANeXt_wireless"
logger.info(f"Loading SLANeXt wireless model: {model_name}")
self._slanet_wireless_model = create_model(model_name)
return self._slanet_wireless_model
except Exception as e:
logger.error(f"Failed to load SLANeXt model: {e}")
return None
def _get_table_classifier(self):
"""
Get or create table classification model (lazy loading).
Returns:
Table classifier model instance or None if loading fails
"""
if not settings.enable_table_cell_boxes_extraction:
return None
try:
from paddlex import create_model
if self._table_cls_model is None:
model_name = settings.table_classification_model_name or "PP-LCNet_x1_0_table_cls"
logger.info(f"Loading table classification model: {model_name}")
self._table_cls_model = create_model(model_name)
return self._table_cls_model
except Exception as e:
logger.error(f"Failed to load table classifier: {e}")
return None
def _extract_cell_boxes_with_slanet(
self,
table_image: np.ndarray,
table_bbox: List[float],
is_wired: Optional[bool] = None
) -> Optional[List[List[float]]]:
"""
Extract cell bounding boxes using direct SLANeXt model call.
This supplements PPStructureV3 which doesn't expose cell boxes in its output.
Args:
table_image: Cropped table image as numpy array (BGR format)
table_bbox: Table bounding box in page coordinates [x1, y1, x2, y2]
is_wired: If None, auto-detect using classifier. True for bordered tables.
Returns:
List of cell bounding boxes in page coordinates [[x1,y1,x2,y2], ...],
or None if extraction fails
"""
if not settings.enable_table_cell_boxes_extraction:
return None
try:
# Auto-detect table type if not specified
if is_wired is None:
classifier = self._get_table_classifier()
if classifier:
try:
cls_result = classifier.predict(table_image)
# PP-LCNet returns classification result
for res in cls_result:
label_names = res.get('label_names', [])
if label_names:
is_wired = 'wired' in str(label_names[0]).lower()
logger.debug(f"Table classified as: {'wired' if is_wired else 'wireless'}")
break
except Exception as e:
logger.warning(f"Table classification failed, defaulting to wired: {e}")
is_wired = True
else:
is_wired = True # Default to wired if classifier unavailable
# Get appropriate SLANeXt model
model = self._get_slanet_model(is_wired=is_wired)
if model is None:
return None
# Run SLANeXt prediction
results = model.predict(table_image)
# Extract cell boxes from result
cell_boxes = []
table_x, table_y = table_bbox[0], table_bbox[1]
for result in results:
# SLANeXt returns 'bbox' with 8-point polygon format
# [[x1,y1,x2,y2,x3,y3,x4,y4], ...]
boxes = result.get('bbox', [])
for box in boxes:
if isinstance(box, (list, tuple)):
if len(box) >= 8:
# 8-point polygon: convert to 4-point rectangle
xs = [box[i] for i in range(0, 8, 2)]
ys = [box[i] for i in range(1, 8, 2)]
x1, y1 = min(xs), min(ys)
x2, y2 = max(xs), max(ys)
elif len(box) >= 4:
# Already 4-point rectangle
x1, y1, x2, y2 = box[:4]
else:
continue
# Convert to absolute page coordinates
abs_box = [
float(x1 + table_x),
float(y1 + table_y),
float(x2 + table_x),
float(y2 + table_y)
]
cell_boxes.append(abs_box)
logger.info(f"SLANeXt extracted {len(cell_boxes)} cell boxes (is_wired={is_wired})")
return cell_boxes if cell_boxes else None
except Exception as e:
logger.error(f"Cell boxes extraction with SLANeXt failed: {e}")
return None
def release_slanet_models(self):
"""Release SLANeXt models to free GPU memory."""
if self._slanet_wired_model is not None:
del self._slanet_wired_model
self._slanet_wired_model = None
logger.info("Released SLANeXt wired model")
if self._slanet_wireless_model is not None:
del self._slanet_wireless_model
self._slanet_wireless_model = None
logger.info("Released SLANeXt wireless model")
if self._table_cls_model is not None:
del self._table_cls_model
self._table_cls_model = None
logger.info("Released table classifier model")
gc.collect()
if TORCH_AVAILABLE:
torch.cuda.empty_cache()
def analyze_with_full_structure(
self,
image_path: Path,
output_dir: Optional[Path] = None,
current_page: int = 0,
preprocessed_image: Optional[Image.Image] = None,
scaling_info: Optional['ScalingInfo'] = None
scaling_info: Optional['ScalingInfo'] = None,
save_visualization: bool = False,
use_cv_table_detection: bool = False
) -> Dict[str, Any]:
"""
Analyze document with full PP-StructureV3 capabilities.
@@ -271,6 +106,10 @@ class PPStructureEnhanced:
scaling_info: Optional ScalingInfo from preprocessing. If image was scaled
for layout detection, all bbox coordinates will be scaled back
to original image coordinates for proper cropping.
save_visualization: If True, save detection visualization images
(layout_det_res, layout_order_res, overall_ocr_res, etc.)
use_cv_table_detection: If True, use CV-based line detection for wired tables
instead of ML-based cell detection (RT-DETR-L)
Returns:
Dictionary with complete structure information including:
@@ -278,6 +117,7 @@ class PPStructureEnhanced:
- reading_order: Reading order indices
- images: Extracted images with metadata
- tables: Extracted tables with structure
- visualization_dir: Path to visualization images (if save_visualization=True)
"""
try:
logger.info(f"Enhanced PP-StructureV3 analysis on {image_path.name}")
@@ -313,9 +153,21 @@ class PPStructureEnhanced:
all_elements = []
all_images = []
all_tables = []
visualization_dir = None
# Process each page result
for page_idx, page_result in enumerate(results):
# Save visualization images if requested
if save_visualization and output_dir and hasattr(page_result, 'save_to_img'):
try:
vis_dir = output_dir / 'visualization'
vis_dir.mkdir(parents=True, exist_ok=True)
page_result.save_to_img(str(vis_dir))
visualization_dir = vis_dir
logger.info(f"Saved visualization images to {vis_dir}")
except Exception as e:
logger.warning(f"Failed to save visualization images: {e}")
# Try to access parsing_res_list and table_res_list (the complete structure)
parsing_res_list = None
table_res_list = None
@@ -369,6 +221,7 @@ class PPStructureEnhanced:
logger.info(f"Found parsing_res_list in to_dict['res'] with {len(parsing_res_list)} elements")
# Extract table_res_list which contains cell_box_list
layout_det_res = None
if result_dict:
if 'table_res_list' in result_dict:
table_res_list = result_dict['table_res_list']
@@ -377,20 +230,40 @@ class PPStructureEnhanced:
if 'cell_box_list' in tbl:
logger.info(f" Table {i}: {len(tbl['cell_box_list'])} cell boxes")
# Extract layout_det_res for Image-in-Table processing
if 'layout_det_res' in result_dict:
layout_det_res = result_dict['layout_det_res']
logger.info(f"Found layout_det_res with {len(layout_det_res.get('boxes', []))} boxes")
# Process parsing_res_list if found
if parsing_res_list:
elements = self._process_parsing_res_list(
parsing_res_list, current_page, output_dir, image_path, scaling_info,
table_res_list=table_res_list # Pass table_res_list for cell_box_list
table_res_list=table_res_list, # Pass table_res_list for cell_box_list
layout_det_res=layout_det_res, # Pass layout_det_res for Image-in-Table
use_cv_table_detection=use_cv_table_detection # Use CV for wired tables
)
all_elements.extend(elements)
# Extract tables and images from elements
table_bboxes = [] # Collect table bboxes for standalone image filtering
for elem in elements:
if elem['type'] == ElementType.TABLE:
all_tables.append(elem)
table_bboxes.append(elem.get('bbox', [0, 0, 0, 0]))
elif elem['type'] in [ElementType.IMAGE, ElementType.FIGURE]:
all_images.append(elem)
# Extract standalone images from layout_det_res (images NOT inside tables)
if layout_det_res and image_path and output_dir:
standalone_images = self._extract_standalone_images(
layout_det_res, table_bboxes, image_path, output_dir,
current_page, len(elements), scaling_info
)
if standalone_images:
all_elements.extend(standalone_images)
all_images.extend(standalone_images)
logger.info(f"Extracted {len(standalone_images)} standalone images from layout_det_res")
else:
# Fallback to markdown if parsing_res_list not available
logger.warning("parsing_res_list not found, falling back to markdown")
@@ -402,7 +275,7 @@ class PPStructureEnhanced:
# Create reading order based on element positions
reading_order = self._determine_reading_order(all_elements)
return {
result = {
'elements': all_elements,
'total_elements': len(all_elements),
'reading_order': reading_order,
@@ -412,6 +285,12 @@ class PPStructureEnhanced:
'has_parsing_res_list': parsing_res_list is not None
}
# Add visualization directory if available
if visualization_dir:
result['visualization_dir'] = str(visualization_dir)
return result
except Exception as e:
logger.error(f"Enhanced PP-StructureV3 analysis error: {e}")
import traceback
@@ -446,7 +325,9 @@ class PPStructureEnhanced:
output_dir: Optional[Path],
source_image_path: Optional[Path] = None,
scaling_info: Optional['ScalingInfo'] = None,
table_res_list: Optional[List[Dict]] = None
table_res_list: Optional[List[Dict]] = None,
layout_det_res: Optional[Dict] = None,
use_cv_table_detection: bool = False
) -> List[Dict[str, Any]]:
"""
Process parsing_res_list to extract all elements.
@@ -458,6 +339,8 @@ class PPStructureEnhanced:
output_dir: Optional output directory
source_image_path: Path to source image for cropping image regions
table_res_list: Optional list of table results containing cell_box_list
layout_det_res: Optional layout detection result for Image-in-Table processing
use_cv_table_detection: If True, use CV line detection for wired tables
Returns:
List of processed elements with normalized structure
@@ -628,53 +511,55 @@ class PPStructureEnhanced:
logger.info(f"[TABLE] Processed {len(processed_cells)} cell boxes with table offset ({table_x}, {table_y})")
cell_boxes_extracted = True
# Supplement with direct SLANeXt call if PPStructureV3 didn't provide boxes
if not cell_boxes_extracted and source_image_path and bbox != [0, 0, 0, 0]:
logger.info(f"[TABLE] No boxes from PPStructureV3, attempting SLANeXt extraction...")
try:
# Load source image and crop table region
source_img = Image.open(source_image_path)
source_array = np.array(source_img)
# Crop table region (bbox is in original image coordinates)
x1, y1, x2, y2 = [int(round(c)) for c in bbox]
# Ensure coordinates are within image bounds
h, w = source_array.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
if x2 > x1 and y2 > y1:
table_crop = source_array[y1:y2, x1:x2]
# Convert RGB to BGR for SLANeXt
if len(table_crop.shape) == 3 and table_crop.shape[2] == 3:
table_crop_bgr = table_crop[:, :, ::-1]
else:
table_crop_bgr = table_crop
# Extract cell boxes using SLANeXt
slanet_boxes = self._extract_cell_boxes_with_slanet(
table_crop_bgr,
bbox, # Pass original bbox for coordinate offset
is_wired=None # Auto-detect
)
if slanet_boxes:
element['cell_boxes'] = slanet_boxes
element['cell_boxes_source'] = 'slanet'
cell_boxes_extracted = True
logger.info(f"[TABLE] SLANeXt extracted {len(slanet_boxes)} cell boxes")
else:
logger.warning(f"[TABLE] Invalid crop region: ({x1},{y1})-({x2},{y2})")
except Exception as e:
logger.error(f"[TABLE] SLANeXt extraction failed: {e}")
if not cell_boxes_extracted:
logger.info(f"[TABLE] No cell boxes available. PPStructureV3 keys: {list(res_data.keys()) if res_data else 'empty'}")
# Special handling for images/figures
elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE]:
# 2.5 CV-based table line detection for wired tables
if use_cv_table_detection and source_image_path and source_image_path.exists():
try:
# Load image for CV processing
cv_image = cv2.imread(str(source_image_path))
if cv_image is not None:
cv_detector = CVTableDetector()
ml_cell_boxes = element.get('cell_boxes', [])
# Detect cells using CV line detection
cv_cells = cv_detector.detect_and_merge_with_ml(
cv_image,
bbox, # Table bbox
ml_cell_boxes
)
if cv_cells:
# Apply scaling if needed
if scaling_info and scaling_info.was_scaled:
cv_cells = [
[
c[0] * scaling_info.scale_x,
c[1] * scaling_info.scale_y,
c[2] * scaling_info.scale_x,
c[3] * scaling_info.scale_y
]
for c in cv_cells
]
element['cell_boxes'] = cv_cells
element['cell_boxes_source'] = 'cv_line_detection'
logger.info(f"[TABLE] CV line detection found {len(cv_cells)} cells (ML had {len(ml_cell_boxes)})")
except Exception as cv_error:
logger.warning(f"[TABLE] CV line detection failed: {cv_error}")
# 3. Image-in-Table 處理:檢測並嵌入表格內的圖片
if layout_det_res and source_image_path and output_dir:
embedded_images = self._embed_images_in_table(
element, bbox, layout_det_res, source_image_path, output_dir
)
if embedded_images:
element['embedded_images'] = embedded_images
logger.info(f"[TABLE] Embedded {len(embedded_images)} images into table")
# Special handling for images/figures/stamps (visual elements that need cropping)
elif mapped_type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.STAMP, ElementType.LOGO]:
# Save image if path provided
if 'img_path' in item and output_dir:
saved_path = self._save_image(item['img_path'], output_dir, element['element_id'])
@@ -704,6 +589,209 @@ class PPStructureEnhanced:
return elements
def _embed_images_in_table(
self,
table_element: Dict[str, Any],
table_bbox: List[float],
layout_det_res: Dict,
source_image_path: Path,
output_dir: Path
) -> List[Dict[str, Any]]:
"""
Detect and embed images that are inside a table region.
This handles the case where layout detection finds an image inside a table,
similar to how pp_demo embeds images in table HTML.
Args:
table_element: The table element being processed
table_bbox: Table bounding box [x1, y1, x2, y2]
layout_det_res: Layout detection result containing all detected boxes
source_image_path: Path to source image for cropping
output_dir: Output directory for saving cropped images
Returns:
List of embedded image info dicts with 'bbox', 'saved_path', 'html_tag'
"""
embedded_images = []
try:
boxes = layout_det_res.get('boxes', [])
table_x1, table_y1, table_x2, table_y2 = table_bbox
for box in boxes:
label = box.get('label', '').lower()
if label != 'image':
continue
# Get image bbox
img_coord = box.get('coordinate', [])
if len(img_coord) < 4:
continue
img_x1, img_y1, img_x2, img_y2 = img_coord[:4]
# Check if image is inside table (with some tolerance)
tolerance = 5 # pixels
if (img_x1 >= table_x1 - tolerance and
img_y1 >= table_y1 - tolerance and
img_x2 <= table_x2 + tolerance and
img_y2 <= table_y2 + tolerance):
logger.info(f"[IMAGE-IN-TABLE] Found image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}] inside table")
# Crop and save the image
img_element_id = f"img_in_table_{int(img_x1)}_{int(img_y1)}_{int(img_x2)}_{int(img_y2)}"
cropped_path = self._crop_and_save_image(
source_image_path,
[img_x1, img_y1, img_x2, img_y2],
output_dir,
img_element_id
)
if cropped_path:
# Create relative path for HTML embedding
rel_path = f"imgs/{Path(cropped_path).name}"
# Create img tag similar to pp_demo
img_html = f'<div style="text-align: center;"><img src="{rel_path}" alt="Image" /></div>'
embedded_image = {
'bbox': [img_x1, img_y1, img_x2, img_y2],
'saved_path': str(cropped_path),
'relative_path': rel_path,
'html_tag': img_html,
'element_id': img_element_id
}
embedded_images.append(embedded_image)
# Try to insert image into HTML content
if 'html' in table_element and table_element['html']:
# Insert image reference at the end of HTML before </table>
original_html = table_element['html']
if '</tbody>' in original_html:
# Insert before </tbody> in a new row
new_html = original_html.replace(
'</tbody>',
f'<tr><td colspan="99" style="text-align:center;"><img src="{rel_path}" alt="Embedded Image" /></td></tr></tbody>'
)
table_element['html'] = new_html
logger.info(f"[IMAGE-IN-TABLE] Embedded image into table HTML")
except Exception as e:
logger.error(f"[IMAGE-IN-TABLE] Error processing images in table: {e}")
return embedded_images
def _extract_standalone_images(
self,
layout_det_res: Dict,
table_bboxes: List[List[float]],
source_image_path: Path,
output_dir: Path,
current_page: int,
start_index: int,
scaling_info: Optional['ScalingInfo'] = None
) -> List[Dict[str, Any]]:
"""
Extract standalone images from layout_det_res that are NOT inside tables.
This handles images that PP-StructureV3 detects in layout_det_res but
doesn't include in parsing_res_list (non-table images).
Args:
layout_det_res: Layout detection result containing all detected boxes
table_bboxes: List of table bounding boxes to exclude images inside tables
source_image_path: Path to source image for cropping
output_dir: Output directory for saving cropped images
current_page: Current page number
start_index: Starting index for element IDs
scaling_info: Optional scaling info for coordinate restoration
Returns:
List of standalone image elements
"""
standalone_images = []
try:
boxes = layout_det_res.get('boxes', [])
logger.info(f"[STANDALONE-IMAGE] Checking {len(boxes)} boxes for standalone images")
for box_idx, box in enumerate(boxes):
label = box.get('label', '').lower()
if label != 'image':
continue
# Get image bbox
img_coord = box.get('coordinate', [])
if len(img_coord) < 4:
continue
img_x1, img_y1, img_x2, img_y2 = img_coord[:4]
# Check if image is inside any table (skip if so)
is_inside_table = False
for table_bbox in table_bboxes:
if len(table_bbox) < 4:
continue
tx1, ty1, tx2, ty2 = table_bbox[:4]
tolerance = 5 # pixels
if (img_x1 >= tx1 - tolerance and
img_y1 >= ty1 - tolerance and
img_x2 <= tx2 + tolerance and
img_y2 <= ty2 + tolerance):
is_inside_table = True
logger.debug(f"[STANDALONE-IMAGE] Image at [{int(img_x1)},{int(img_y1)}] is inside table, skipping")
break
if is_inside_table:
continue
# Scale bbox back to original coordinates if needed
if scaling_info and scaling_info.was_scaled:
scale_factor = scaling_info.scale_factor
img_x1 *= scale_factor
img_y1 *= scale_factor
img_x2 *= scale_factor
img_y2 *= scale_factor
logger.debug(f"[STANDALONE-IMAGE] Scaled bbox by {scale_factor:.3f}")
logger.info(f"[STANDALONE-IMAGE] Found standalone image at [{int(img_x1)},{int(img_y1)},{int(img_x2)},{int(img_y2)}]")
# Crop and save the image
element_idx = start_index + len(standalone_images)
img_element_id = f"standalone_img_{current_page}_{element_idx}"
cropped_path = self._crop_and_save_image(
source_image_path,
[img_x1, img_y1, img_x2, img_y2],
output_dir,
img_element_id
)
if cropped_path:
element = {
'element_id': img_element_id,
'type': ElementType.IMAGE,
'original_type': 'image',
'content': '',
'page': current_page,
'bbox': [img_x1, img_y1, img_x2, img_y2],
'index': element_idx,
'confidence': box.get('score', 1.0),
'saved_path': cropped_path,
'img_path': cropped_path,
'source': 'layout_det_res'
}
standalone_images.append(element)
logger.info(f"[STANDALONE-IMAGE] Extracted and saved: {cropped_path}")
except Exception as e:
logger.error(f"[STANDALONE-IMAGE] Error extracting standalone images: {e}")
import traceback
traceback.print_exc()
return standalone_images
def _process_markdown_fallback(
self,
page_result: Any,