Translation status (multi-worker support):
- Add filesystem lock files (.translating) to track in-progress translations
- Check lock files in /status API when job_state not found in current worker
- Remove lock files on translation success or failure
OCR fallback fix:
- Fix empty pages when layout analysis fails but OCR succeeds
- Change 'enhanced_results' in ocr_results to ocr_results.get('enhanced_results')
- This ensures fallback to text_regions when enhanced_results is empty list
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1423 lines
56 KiB
Python
1423 lines
56 KiB
Python
"""
|
|
OCR to UnifiedDocument Converter
|
|
|
|
Converts PP-StructureV3 OCR results to UnifiedDocument format, preserving
|
|
all structure information and metadata.
|
|
|
|
Includes gap filling support to supplement PP-StructureV3 output with raw OCR
|
|
regions when significant content loss is detected.
|
|
"""
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Union
|
|
from datetime import datetime
|
|
import hashlib
|
|
|
|
from app.models.unified_document import (
|
|
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
|
|
BoundingBox, StyleInfo, TableData, ElementType,
|
|
ProcessingTrack, TableCell, Dimensions
|
|
)
|
|
from app.services.gap_filling_service import GapFillingService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def trim_empty_columns(table_dict: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Remove empty columns from a table dictionary.
|
|
|
|
A column is considered empty if ALL cells in that column have content that is
|
|
empty or whitespace-only (using .strip() to determine emptiness).
|
|
|
|
This function:
|
|
1. Identifies columns where every cell's content is empty/whitespace
|
|
2. Removes identified empty columns
|
|
3. Updates cols/columns value
|
|
4. Recalculates each cell's col index
|
|
5. Adjusts col_span when spans cross removed columns
|
|
6. Removes cells entirely when their complete span falls within removed columns
|
|
7. Preserves original bbox (no layout drift)
|
|
|
|
Args:
|
|
table_dict: Table dictionary with keys: rows, cols/columns, cells
|
|
|
|
Returns:
|
|
Cleaned table dictionary with empty columns removed
|
|
"""
|
|
cells = table_dict.get('cells', [])
|
|
if not cells:
|
|
return table_dict
|
|
|
|
# Get original column count
|
|
original_cols = table_dict.get('cols', table_dict.get('columns', 0))
|
|
if original_cols == 0:
|
|
# Calculate from cells if not provided
|
|
max_col = 0
|
|
for cell in cells:
|
|
cell_col = cell.get('col', 0) if isinstance(cell, dict) else getattr(cell, 'col', 0)
|
|
cell_span = cell.get('col_span', 1) if isinstance(cell, dict) else getattr(cell, 'col_span', 1)
|
|
max_col = max(max_col, cell_col + cell_span)
|
|
original_cols = max_col
|
|
|
|
if original_cols == 0:
|
|
return table_dict
|
|
|
|
# Build a map: column_index -> list of cell contents
|
|
# For cells with col_span > 1, we only check their primary column
|
|
column_contents: Dict[int, List[str]] = {i: [] for i in range(original_cols)}
|
|
|
|
for cell in cells:
|
|
if isinstance(cell, dict):
|
|
col = cell.get('col', 0)
|
|
col_span = cell.get('col_span', 1)
|
|
content = cell.get('content', '')
|
|
else:
|
|
col = getattr(cell, 'col', 0)
|
|
col_span = getattr(cell, 'col_span', 1)
|
|
content = getattr(cell, 'content', '')
|
|
|
|
# Mark content for each column this cell spans
|
|
for c in range(col, min(col + col_span, original_cols)):
|
|
if c in column_contents:
|
|
column_contents[c].append(str(content).strip() if content else '')
|
|
|
|
# Identify empty columns (all content is empty/whitespace)
|
|
empty_columns = set()
|
|
for col_idx, contents in column_contents.items():
|
|
# A column is empty if ALL cells in it have empty content
|
|
# Note: If a column has no cells at all, it's considered empty
|
|
if all(c == '' for c in contents):
|
|
empty_columns.add(col_idx)
|
|
|
|
if not empty_columns:
|
|
# No empty columns to remove, just ensure cols is set
|
|
result = dict(table_dict)
|
|
if result.get('cols', result.get('columns', 0)) == 0:
|
|
result['cols'] = original_cols
|
|
if 'columns' in result:
|
|
result['columns'] = original_cols
|
|
return result
|
|
|
|
logger.debug(f"Removing empty columns: {sorted(empty_columns)} from table with {original_cols} cols")
|
|
|
|
# Build column mapping: old_col -> new_col (or None if removed)
|
|
col_mapping: Dict[int, Optional[int]] = {}
|
|
new_col = 0
|
|
for old_col in range(original_cols):
|
|
if old_col in empty_columns:
|
|
col_mapping[old_col] = None
|
|
else:
|
|
col_mapping[old_col] = new_col
|
|
new_col += 1
|
|
|
|
new_cols = new_col
|
|
|
|
# Process cells
|
|
new_cells = []
|
|
for cell in cells:
|
|
if isinstance(cell, dict):
|
|
old_col = cell.get('col', 0)
|
|
old_col_span = cell.get('col_span', 1)
|
|
else:
|
|
old_col = getattr(cell, 'col', 0)
|
|
old_col_span = getattr(cell, 'col_span', 1)
|
|
|
|
# Calculate new col and col_span
|
|
# Find the first non-removed column in this cell's span
|
|
new_start_col = None
|
|
new_end_col = None
|
|
|
|
for c in range(old_col, min(old_col + old_col_span, original_cols)):
|
|
mapped = col_mapping.get(c)
|
|
if mapped is not None:
|
|
if new_start_col is None:
|
|
new_start_col = mapped
|
|
new_end_col = mapped
|
|
|
|
# If entire span falls within removed columns, skip this cell
|
|
if new_start_col is None:
|
|
logger.debug(f"Removing cell at row={cell.get('row', 0) if isinstance(cell, dict) else cell.row}, "
|
|
f"col={old_col} (entire span in removed columns)")
|
|
continue
|
|
|
|
new_col_span = new_end_col - new_start_col + 1
|
|
|
|
# Create new cell
|
|
if isinstance(cell, dict):
|
|
new_cell = dict(cell)
|
|
new_cell['col'] = new_start_col
|
|
new_cell['col_span'] = new_col_span
|
|
else:
|
|
# Handle TableCell objects
|
|
new_cell = {
|
|
'row': cell.row,
|
|
'col': new_start_col,
|
|
'row_span': cell.row_span,
|
|
'col_span': new_col_span,
|
|
'content': cell.content
|
|
}
|
|
if hasattr(cell, 'bbox') and cell.bbox:
|
|
new_cell['bbox'] = cell.bbox
|
|
if hasattr(cell, 'style') and cell.style:
|
|
new_cell['style'] = cell.style
|
|
|
|
new_cells.append(new_cell)
|
|
|
|
# Build result
|
|
result = dict(table_dict)
|
|
result['cells'] = new_cells
|
|
result['cols'] = new_cols
|
|
if 'columns' in result:
|
|
result['columns'] = new_cols
|
|
|
|
logger.info(f"Trimmed table: {original_cols} -> {new_cols} columns, "
|
|
f"{len(cells)} -> {len(new_cells)} cells")
|
|
|
|
return result
|
|
|
|
|
|
def validate_cell_boxes(
|
|
cell_boxes: List[List[float]],
|
|
table_bbox: List[float],
|
|
page_width: float,
|
|
page_height: float,
|
|
tolerance: float = 5.0
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Validate cell_boxes coordinates against page boundaries and table bbox.
|
|
|
|
PP-StructureV3 sometimes returns cell_boxes with coordinates that exceed
|
|
page boundaries or table bbox. This function validates and clamps to valid boundaries.
|
|
|
|
Args:
|
|
cell_boxes: List of cell bounding boxes [[x0, y0, x1, y1], ...]
|
|
table_bbox: Table bounding box [x0, y0, x1, y1]
|
|
page_width: Page width in pixels
|
|
page_height: Page height in pixels
|
|
tolerance: Allowed tolerance for boundary checks (pixels)
|
|
|
|
Returns:
|
|
Dict with:
|
|
- valid: bool - whether all cell_boxes are valid
|
|
- invalid_count: int - number of invalid cell_boxes
|
|
- clamped_boxes: List - cell_boxes clamped to valid boundaries
|
|
- issues: List[str] - description of issues found
|
|
"""
|
|
if not cell_boxes:
|
|
return {'valid': True, 'invalid_count': 0, 'clamped_boxes': [], 'issues': []}
|
|
|
|
issues = []
|
|
invalid_count = 0
|
|
clamped_boxes = []
|
|
|
|
# Page boundaries with tolerance
|
|
page_min_x = -tolerance
|
|
page_min_y = -tolerance
|
|
page_max_x = page_width + tolerance
|
|
page_max_y = page_height + tolerance
|
|
|
|
# Table boundaries with tolerance (prefer clamping to table bbox)
|
|
table_min_x = table_bbox[0] - tolerance if len(table_bbox) >= 4 else page_min_x
|
|
table_min_y = table_bbox[1] - tolerance if len(table_bbox) >= 4 else page_min_y
|
|
table_max_x = table_bbox[2] + tolerance if len(table_bbox) >= 4 else page_max_x
|
|
table_max_y = table_bbox[3] + tolerance if len(table_bbox) >= 4 else page_max_y
|
|
|
|
# For clamping, use the intersection of page and expanded table bbox
|
|
clamp_min_x = max(0, table_bbox[0] - tolerance) if len(table_bbox) >= 4 else 0
|
|
clamp_min_y = max(0, table_bbox[1] - tolerance) if len(table_bbox) >= 4 else 0
|
|
clamp_max_x = min(page_width, table_bbox[2] + tolerance) if len(table_bbox) >= 4 else page_width
|
|
clamp_max_y = min(page_height, table_bbox[3] + tolerance) if len(table_bbox) >= 4 else page_height
|
|
|
|
for idx, box in enumerate(cell_boxes):
|
|
if not box or len(box) < 4:
|
|
issues.append(f"Cell {idx}: Invalid box format")
|
|
invalid_count += 1
|
|
clamped_boxes.append([0, 0, 0, 0])
|
|
continue
|
|
|
|
x0, y0, x1, y1 = box[:4]
|
|
is_valid = True
|
|
cell_issues = []
|
|
|
|
# Check if coordinates exceed page boundaries
|
|
if x0 < page_min_x:
|
|
cell_issues.append(f"x0={x0:.1f} < 0")
|
|
is_valid = False
|
|
if y0 < page_min_y:
|
|
cell_issues.append(f"y0={y0:.1f} < 0")
|
|
is_valid = False
|
|
if x1 > page_max_x:
|
|
cell_issues.append(f"x1={x1:.1f} > page_width={page_width:.1f}")
|
|
is_valid = False
|
|
if y1 > page_max_y:
|
|
cell_issues.append(f"y1={y1:.1f} > page_height={page_height:.1f}")
|
|
is_valid = False
|
|
|
|
# Check if coordinates significantly exceed table bbox (more than 20% of table size)
|
|
if len(table_bbox) >= 4:
|
|
table_w = table_bbox[2] - table_bbox[0]
|
|
table_h = table_bbox[3] - table_bbox[1]
|
|
expand_tolerance = max(tolerance, table_h * 0.2) # 20% of table height
|
|
|
|
if y0 < table_bbox[1] - expand_tolerance:
|
|
cell_issues.append(f"y0={y0:.1f} above table (table_y0={table_bbox[1]:.1f})")
|
|
is_valid = False
|
|
if y1 > table_bbox[3] + expand_tolerance:
|
|
cell_issues.append(f"y1={y1:.1f} below table (table_y1={table_bbox[3]:.1f})")
|
|
is_valid = False
|
|
if x0 < table_bbox[0] - expand_tolerance:
|
|
cell_issues.append(f"x0={x0:.1f} left of table (table_x0={table_bbox[0]:.1f})")
|
|
is_valid = False
|
|
if x1 > table_bbox[2] + expand_tolerance:
|
|
cell_issues.append(f"x1={x1:.1f} right of table (table_x1={table_bbox[2]:.1f})")
|
|
is_valid = False
|
|
|
|
# Check for inverted coordinates
|
|
if x0 > x1:
|
|
cell_issues.append(f"x0={x0:.1f} > x1={x1:.1f}")
|
|
is_valid = False
|
|
if y0 > y1:
|
|
cell_issues.append(f"y0={y0:.1f} > y1={y1:.1f}")
|
|
is_valid = False
|
|
|
|
if not is_valid:
|
|
invalid_count += 1
|
|
issues.append(f"Cell {idx}: {', '.join(cell_issues)}")
|
|
|
|
# Clamp to valid boundaries (table bbox with some tolerance)
|
|
clamped_box = [
|
|
max(clamp_min_x, min(x0, clamp_max_x)),
|
|
max(clamp_min_y, min(y0, clamp_max_y)),
|
|
max(clamp_min_x, min(x1, clamp_max_x)),
|
|
max(clamp_min_y, min(y1, clamp_max_y))
|
|
]
|
|
|
|
# Ensure proper ordering after clamping
|
|
if clamped_box[0] > clamped_box[2]:
|
|
clamped_box[0], clamped_box[2] = clamped_box[2], clamped_box[0]
|
|
if clamped_box[1] > clamped_box[3]:
|
|
clamped_box[1], clamped_box[3] = clamped_box[3], clamped_box[1]
|
|
|
|
clamped_boxes.append(clamped_box)
|
|
|
|
if invalid_count > 0:
|
|
logger.warning(
|
|
f"Cell boxes validation: {invalid_count}/{len(cell_boxes)} invalid. "
|
|
f"Page: {page_width:.0f}x{page_height:.0f}, Table bbox: {table_bbox}"
|
|
)
|
|
|
|
return {
|
|
'valid': invalid_count == 0,
|
|
'invalid_count': invalid_count,
|
|
'clamped_boxes': clamped_boxes,
|
|
'issues': issues,
|
|
'needs_fallback': invalid_count > len(cell_boxes) * 0.5 # >50% invalid = needs fallback
|
|
}
|
|
|
|
|
|
class OCRToUnifiedConverter:
|
|
"""
|
|
Converter for transforming PP-StructureV3 OCR results to UnifiedDocument format.
|
|
|
|
This converter handles:
|
|
- PP-StructureV3 parsing_res_list results
|
|
- Markdown fallback results
|
|
- Multi-page document assembly
|
|
- Metadata preservation
|
|
- Structure relationship mapping
|
|
- Gap filling with raw OCR regions (when PP-StructureV3 misses content)
|
|
"""
|
|
|
|
def __init__(self, enable_gap_filling: bool = True):
|
|
"""
|
|
Initialize the converter.
|
|
|
|
Args:
|
|
enable_gap_filling: Whether to enable gap filling with raw OCR regions
|
|
"""
|
|
self.element_counter = 0
|
|
self.gap_filling_service = GapFillingService() if enable_gap_filling else None
|
|
self.gap_filling_stats: Dict[str, Any] = {}
|
|
|
|
def convert(
|
|
self,
|
|
ocr_results: Dict[str, Any],
|
|
file_path: Path,
|
|
processing_time: float,
|
|
lang: str = 'ch'
|
|
) -> UnifiedDocument:
|
|
"""
|
|
Convert OCR results to UnifiedDocument.
|
|
|
|
Args:
|
|
ocr_results: Raw OCR results from PP-StructureV3
|
|
file_path: Original file path
|
|
processing_time: Time taken for OCR processing
|
|
lang: Language used for OCR
|
|
|
|
Returns:
|
|
UnifiedDocument with all extracted information
|
|
"""
|
|
try:
|
|
# Create document metadata
|
|
metadata = self._create_metadata(file_path, processing_time, lang)
|
|
|
|
# Extract pages from OCR results
|
|
pages = self._extract_pages(ocr_results)
|
|
|
|
# Create document ID
|
|
document_id = self._generate_document_id(file_path)
|
|
|
|
# Create UnifiedDocument
|
|
unified_doc = UnifiedDocument(
|
|
document_id=document_id,
|
|
metadata=metadata,
|
|
pages=pages,
|
|
processing_errors=ocr_results.get('errors', [])
|
|
)
|
|
|
|
# Post-process to establish relationships
|
|
self._establish_relationships(unified_doc)
|
|
|
|
logger.info(f"Successfully converted OCR results to UnifiedDocument: "
|
|
f"{len(pages)} pages, {self._count_elements(pages)} elements")
|
|
|
|
return unified_doc
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error converting OCR results: {e}")
|
|
import traceback
|
|
logger.error(f"Traceback: {traceback.format_exc()}")
|
|
|
|
# Return minimal document with error
|
|
return UnifiedDocument(
|
|
document_id=self._generate_document_id(file_path),
|
|
metadata=self._create_metadata(file_path, processing_time, lang),
|
|
pages=[],
|
|
processing_errors=[{
|
|
'error': str(e),
|
|
'type': 'conversion_error',
|
|
'timestamp': datetime.now().isoformat()
|
|
}]
|
|
)
|
|
|
|
def _create_metadata(
|
|
self,
|
|
file_path: Path,
|
|
processing_time: float,
|
|
lang: str
|
|
) -> DocumentMetadata:
|
|
"""Create document metadata."""
|
|
return DocumentMetadata(
|
|
filename=file_path.name,
|
|
file_type=file_path.suffix,
|
|
file_size=file_path.stat().st_size if file_path.exists() else 0,
|
|
created_at=datetime.now(),
|
|
processing_track=ProcessingTrack.OCR,
|
|
processing_time=processing_time,
|
|
language=lang
|
|
)
|
|
|
|
def _extract_pages(self, ocr_results: Dict[str, Any]) -> List[Page]:
|
|
"""
|
|
Extract pages from OCR results.
|
|
|
|
Handles both enhanced PP-StructureV3 results (with parsing_res_list)
|
|
and traditional markdown results. Applies gap filling when enabled.
|
|
|
|
Gap filling can use either:
|
|
1. overall_ocr_res from PP-StructureV3 (preferred, no extra inference)
|
|
2. Separate raw OCR text_regions (fallback)
|
|
"""
|
|
pages = []
|
|
|
|
# Extract raw OCR text regions for gap filling
|
|
# Prefer overall_ocr_res from PP-StructureV3 when available
|
|
raw_text_regions = ocr_results.get('text_regions', [])
|
|
ocr_dimensions = ocr_results.get('ocr_dimensions', {})
|
|
|
|
# Check if we have enhanced results from PPStructureEnhanced
|
|
# Note: Must check for non-empty list, not just key existence (key may exist with empty list)
|
|
if ocr_results.get('enhanced_results'):
|
|
pages = self._extract_from_enhanced_results(
|
|
ocr_results['enhanced_results'],
|
|
raw_text_regions=raw_text_regions,
|
|
ocr_dimensions=ocr_dimensions
|
|
)
|
|
# Check for traditional OCR results with text_regions at top level (from process_file_traditional)
|
|
elif ocr_results.get('text_regions'):
|
|
pages = self._extract_from_traditional_ocr(ocr_results)
|
|
# Check for traditional layout_data structure
|
|
elif 'layout_data' in ocr_results:
|
|
pages = self._extract_from_layout_data(ocr_results['layout_data'])
|
|
# Check for direct PP-StructureV3 results
|
|
elif 'pages' in ocr_results:
|
|
pages = self._extract_from_direct_results(ocr_results['pages'])
|
|
else:
|
|
logger.warning("No recognized OCR result structure found")
|
|
|
|
return pages
|
|
|
|
def _extract_from_enhanced_results(
|
|
self,
|
|
enhanced_results: List[Dict[str, Any]],
|
|
raw_text_regions: Optional[List[Dict[str, Any]]] = None,
|
|
ocr_dimensions: Optional[Dict[str, Any]] = None
|
|
) -> List[Page]:
|
|
"""
|
|
Extract pages from enhanced PP-StructureV3 results.
|
|
|
|
Applies gap filling when enabled to supplement PP-StructureV3 output
|
|
with raw OCR regions that were not detected by the layout model.
|
|
|
|
Args:
|
|
enhanced_results: PP-StructureV3 enhanced results
|
|
raw_text_regions: Raw OCR text regions for gap filling
|
|
ocr_dimensions: OCR image dimensions for coordinate alignment
|
|
"""
|
|
pages = []
|
|
|
|
for page_idx, page_result in enumerate(enhanced_results):
|
|
elements = []
|
|
|
|
# Get page dimensions first (needed for element conversion)
|
|
page_width = page_result.get('width', 0)
|
|
page_height = page_result.get('height', 0)
|
|
pp_dimensions = Dimensions(width=page_width, height=page_height)
|
|
|
|
# Process elements from parsing_res_list
|
|
if 'elements' in page_result:
|
|
for elem_data in page_result['elements']:
|
|
element = self._convert_pp3_element(
|
|
elem_data, page_idx,
|
|
page_width=page_width,
|
|
page_height=page_height
|
|
)
|
|
if element:
|
|
elements.append(element)
|
|
|
|
# Apply gap filling if enabled
|
|
# Priority: 1) overall_ocr_res from page_result, 2) raw_text_regions from separate OCR
|
|
if self.gap_filling_service:
|
|
# Check for overall_ocr_res from PP-StructureV3 (preferred, no extra inference)
|
|
page_raw_regions = page_result.get('overall_ocr_res', [])
|
|
|
|
if page_raw_regions:
|
|
logger.debug(f"Page {page_idx + 1}: Using overall_ocr_res ({len(page_raw_regions)} regions)")
|
|
elif raw_text_regions:
|
|
# Fallback to separate raw OCR regions
|
|
page_raw_regions = [
|
|
r for r in raw_text_regions
|
|
if r.get('page', 0) == page_idx or r.get('page', 1) == page_idx + 1
|
|
]
|
|
if page_raw_regions:
|
|
logger.debug(f"Page {page_idx + 1}: Using separate raw OCR ({len(page_raw_regions)} regions)")
|
|
|
|
if page_raw_regions:
|
|
supplemented, stats = self.gap_filling_service.fill_gaps(
|
|
raw_ocr_regions=page_raw_regions,
|
|
pp_structure_elements=elements,
|
|
page_number=page_idx + 1,
|
|
ocr_dimensions=ocr_dimensions,
|
|
pp_dimensions=pp_dimensions
|
|
)
|
|
|
|
# Store statistics
|
|
self.gap_filling_stats[f'page_{page_idx + 1}'] = stats
|
|
|
|
if supplemented:
|
|
logger.info(
|
|
f"Page {page_idx + 1}: Gap filling added {len(supplemented)} elements "
|
|
f"(coverage: {stats.get('coverage_ratio', 0):.2%})"
|
|
)
|
|
elements.extend(supplemented)
|
|
|
|
# Recalculate reading order for combined elements
|
|
reading_order = self.gap_filling_service.recalculate_reading_order(elements)
|
|
page_result['reading_order'] = reading_order
|
|
|
|
# Create page
|
|
page = Page(
|
|
page_number=page_idx + 1,
|
|
dimensions=pp_dimensions,
|
|
elements=elements,
|
|
metadata={
|
|
'reading_order': page_result.get('reading_order', []),
|
|
'gap_filling': self.gap_filling_stats.get(f'page_{page_idx + 1}', {})
|
|
}
|
|
)
|
|
|
|
pages.append(page)
|
|
logger.debug(f"Extracted page {page_idx + 1} with {len(elements)} elements")
|
|
|
|
return pages
|
|
|
|
def _extract_from_layout_data(
|
|
self,
|
|
layout_data: Dict[str, Any]
|
|
) -> List[Page]:
|
|
"""Extract pages from traditional layout_data structure."""
|
|
pages = []
|
|
|
|
# Get page dimensions (assuming uniform for all pages)
|
|
page_width = layout_data.get('page_width', 0)
|
|
page_height = layout_data.get('page_height', 0)
|
|
|
|
# Group elements by page
|
|
elements_by_page = {}
|
|
|
|
# Process text regions
|
|
for text_region in layout_data.get('text_regions', []):
|
|
page_num = text_region.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_text_region(text_region)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Process images
|
|
for img_meta in layout_data.get('images_metadata', []):
|
|
page_num = img_meta.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_image_metadata(img_meta)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Process tables
|
|
for table_data in layout_data.get('tables', []):
|
|
page_num = table_data.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_table_data(table_data)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Create pages
|
|
max_page = max(elements_by_page.keys()) if elements_by_page else 0
|
|
for page_num in range(1, max_page + 1):
|
|
elements = elements_by_page.get(page_num, [])
|
|
|
|
# Determine reading order based on position
|
|
reading_order = self._calculate_reading_order(elements)
|
|
|
|
page = Page(
|
|
page_number=page_num,
|
|
dimensions=Dimensions(
|
|
width=page_width,
|
|
height=page_height
|
|
),
|
|
elements=elements,
|
|
metadata={'reading_order': reading_order}
|
|
)
|
|
|
|
pages.append(page)
|
|
|
|
return pages
|
|
|
|
def _extract_from_traditional_ocr(self, ocr_results: Dict[str, Any]) -> List[Page]:
|
|
"""
|
|
Extract pages from traditional OCR results (process_file_traditional).
|
|
|
|
This handles the structure where text_regions and images_metadata are at
|
|
the top level of ocr_results, not nested inside layout_data.
|
|
"""
|
|
pages = []
|
|
|
|
# Get text regions and page dimensions
|
|
text_regions = ocr_results.get('text_regions', [])
|
|
ocr_dimensions = ocr_results.get('ocr_dimensions', [])
|
|
total_pages = ocr_results.get('total_pages', 1)
|
|
|
|
# Group elements by page
|
|
elements_by_page = {}
|
|
|
|
# Process text regions
|
|
for text_region in text_regions:
|
|
page_num = text_region.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_text_region(text_region)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Process images
|
|
for img_meta in ocr_results.get('images_metadata', []):
|
|
page_num = img_meta.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_image_metadata(img_meta)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Process tables from layout_data if available
|
|
if 'layout_data' in ocr_results and isinstance(ocr_results['layout_data'], dict):
|
|
for table_data in ocr_results['layout_data'].get('tables', []):
|
|
page_num = table_data.get('page', 1)
|
|
if page_num not in elements_by_page:
|
|
elements_by_page[page_num] = []
|
|
|
|
element = self._convert_table_data(table_data)
|
|
if element:
|
|
elements_by_page[page_num].append(element)
|
|
|
|
# Create pages
|
|
max_page = max(elements_by_page.keys()) if elements_by_page else total_pages
|
|
for page_num in range(1, max_page + 1):
|
|
elements = elements_by_page.get(page_num, [])
|
|
|
|
# Get page dimensions
|
|
# Handle both dict (single page) and list (multiple pages) formats
|
|
if isinstance(ocr_dimensions, dict):
|
|
# Single page format: {'width': W, 'height': H}
|
|
page_width = ocr_dimensions.get('width', 0)
|
|
page_height = ocr_dimensions.get('height', 0)
|
|
elif isinstance(ocr_dimensions, list):
|
|
# Multi-page format: [{'page': 1, 'width': W, 'height': H}, ...]
|
|
page_dims = next((d for d in ocr_dimensions if isinstance(d, dict) and d.get('page') == page_num), None)
|
|
if page_dims:
|
|
page_width = page_dims.get('width', 0)
|
|
page_height = page_dims.get('height', 0)
|
|
else:
|
|
page_width = 0
|
|
page_height = 0
|
|
else:
|
|
# Default dimensions if not available
|
|
page_width = 0
|
|
page_height = 0
|
|
|
|
# Determine reading order based on position
|
|
reading_order = self._calculate_reading_order(elements)
|
|
|
|
page = Page(
|
|
page_number=page_num,
|
|
dimensions=Dimensions(
|
|
width=page_width,
|
|
height=page_height
|
|
),
|
|
elements=elements,
|
|
metadata={'reading_order': reading_order}
|
|
)
|
|
|
|
pages.append(page)
|
|
|
|
return pages
|
|
|
|
def _convert_pp3_element(
|
|
self,
|
|
elem_data: Dict[str, Any],
|
|
page_idx: int,
|
|
page_width: float = 0,
|
|
page_height: float = 0
|
|
) -> Optional[DocumentElement]:
|
|
"""
|
|
Convert PP-StructureV3 element to DocumentElement.
|
|
|
|
Args:
|
|
elem_data: Element data from PP-StructureV3
|
|
page_idx: Page index (0-based)
|
|
page_width: Page width for coordinate validation
|
|
page_height: Page height for coordinate validation
|
|
"""
|
|
try:
|
|
# Extract bbox
|
|
bbox_data = elem_data.get('bbox', [0, 0, 0, 0])
|
|
bbox = BoundingBox(
|
|
x0=float(bbox_data[0]),
|
|
y0=float(bbox_data[1]),
|
|
x1=float(bbox_data[2]),
|
|
y1=float(bbox_data[3])
|
|
)
|
|
|
|
# Get element type
|
|
element_type = elem_data.get('type', ElementType.TEXT)
|
|
if isinstance(element_type, str):
|
|
# Convert string to ElementType if needed
|
|
# ElementType is a str-based enum, so we can construct from value (lowercase)
|
|
try:
|
|
element_type = ElementType(element_type)
|
|
except ValueError:
|
|
# If value doesn't match, try member name (uppercase)
|
|
element_type = ElementType[element_type.upper()] if element_type.upper() in ElementType.__members__ else ElementType.TEXT
|
|
|
|
# Content-based reclassification: detect HTML tables in text content
|
|
content_str = elem_data.get('content', '')
|
|
if isinstance(content_str, str) and '<table' in content_str.lower():
|
|
if element_type == ElementType.TEXT:
|
|
logger.info(f"Element {elem_data.get('element_id')}: Reclassifying TEXT to TABLE (HTML table in content)")
|
|
element_type = ElementType.TABLE
|
|
|
|
# Prepare content based on element type
|
|
if element_type == ElementType.TABLE:
|
|
# For tables, use TableData as content
|
|
# Priority: rebuilt_table > HTML parsing
|
|
# rebuilt_table contains clean cells without empty padding
|
|
if 'rebuilt_table' in elem_data:
|
|
rebuilt = elem_data['rebuilt_table']
|
|
# Use rebuilt cells directly - they don't include empty cells
|
|
rebuilt_cells = rebuilt.get('cells', [])
|
|
from app.models.unified_document import TableCell
|
|
table_cells = [
|
|
TableCell(
|
|
row=c.get('row', 0),
|
|
col=c.get('col', 0),
|
|
row_span=c.get('row_span', 1),
|
|
col_span=c.get('col_span', 1),
|
|
content=c.get('content', '')
|
|
)
|
|
for c in rebuilt_cells
|
|
]
|
|
table_data = TableData(
|
|
rows=rebuilt.get('rows', 0),
|
|
cols=rebuilt.get('cols', 0),
|
|
cells=table_cells,
|
|
caption=elem_data.get('extracted_text')
|
|
)
|
|
logger.info(f"[CONVERTER] Table {elem_data.get('element_id')}: Using rebuilt_table directly ({len(rebuilt_cells)} cells)")
|
|
else:
|
|
# Fallback to HTML parsing for non-rebuilt tables
|
|
table_data = self._extract_table_data(elem_data)
|
|
content = table_data if table_data else elem_data.get('content', '')
|
|
|
|
# Preserve cell_boxes and embedded_images in metadata for PDF generation
|
|
# These are extracted by PP-StructureV3 and provide accurate cell positioning
|
|
if 'cell_boxes' in elem_data:
|
|
cell_boxes = elem_data['cell_boxes']
|
|
elem_data.setdefault('metadata', {})['cell_boxes_source'] = elem_data.get('cell_boxes_source', 'table_res_list')
|
|
|
|
# Validate cell_boxes coordinates if page dimensions are available
|
|
if page_width > 0 and page_height > 0:
|
|
validation = validate_cell_boxes(
|
|
cell_boxes=cell_boxes,
|
|
table_bbox=bbox_data,
|
|
page_width=page_width,
|
|
page_height=page_height
|
|
)
|
|
|
|
if not validation['valid']:
|
|
elem_data['metadata']['cell_boxes_validation'] = {
|
|
'valid': False,
|
|
'invalid_count': validation['invalid_count'],
|
|
'total_count': len(cell_boxes),
|
|
'needs_fallback': validation['needs_fallback']
|
|
}
|
|
# Use clamped boxes instead of invalid ones
|
|
elem_data['metadata']['cell_boxes'] = validation['clamped_boxes']
|
|
elem_data['metadata']['cell_boxes_original'] = cell_boxes
|
|
|
|
if validation['needs_fallback']:
|
|
logger.warning(
|
|
f"Table {elem_data.get('element_id')}: "
|
|
f"{validation['invalid_count']}/{len(cell_boxes)} cell_boxes invalid, "
|
|
f"fallback recommended"
|
|
)
|
|
else:
|
|
elem_data['metadata']['cell_boxes'] = cell_boxes
|
|
elem_data['metadata']['cell_boxes_validation'] = {'valid': True}
|
|
else:
|
|
# No page dimensions available, store as-is
|
|
elem_data['metadata']['cell_boxes'] = cell_boxes
|
|
|
|
if 'embedded_images' in elem_data:
|
|
elem_data.setdefault('metadata', {})['embedded_images'] = elem_data['embedded_images']
|
|
|
|
# Pass through rebuild information for tables that were rebuilt
|
|
# This tells the PDF renderer to use HTML content instead of cell_boxes
|
|
logger.info(f"[CONVERTER] Table {elem_data.get('element_id')}: checking for rebuild_stats, keys={list(elem_data.keys())}")
|
|
if 'rebuild_stats' in elem_data:
|
|
elem_data.setdefault('metadata', {})['rebuild_stats'] = elem_data['rebuild_stats']
|
|
elem_data['metadata']['was_rebuilt'] = True
|
|
logger.info(f"[CONVERTER] Table {elem_data.get('element_id')}: FOUND rebuild_stats, setting was_rebuilt=True")
|
|
|
|
if 'rebuilt_table' in elem_data:
|
|
elem_data.setdefault('metadata', {})['rebuilt_table'] = elem_data['rebuilt_table']
|
|
|
|
elif element_type in [
|
|
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
|
|
ElementType.DIAGRAM, ElementType.LOGO, ElementType.STAMP
|
|
]:
|
|
# For all visual elements, use metadata dict as content
|
|
# Priority: saved_path > img_path (PP-StructureV3 uses saved_path)
|
|
image_path = (
|
|
elem_data.get('saved_path') or
|
|
elem_data.get('img_path') or
|
|
''
|
|
)
|
|
content = {
|
|
'saved_path': image_path, # Preserve original path key
|
|
'path': image_path, # For backward compatibility
|
|
'width': elem_data.get('width', 0),
|
|
'height': elem_data.get('height', 0),
|
|
'format': elem_data.get('format', 'unknown')
|
|
}
|
|
if not image_path:
|
|
logger.warning(
|
|
f"Visual element {element_type.value} missing image path: "
|
|
f"saved_path={elem_data.get('saved_path')}, img_path={elem_data.get('img_path')}"
|
|
)
|
|
else:
|
|
content = elem_data.get('content', '')
|
|
|
|
# Create element
|
|
element = DocumentElement(
|
|
element_id=elem_data.get('element_id', f"elem_{self.element_counter}"),
|
|
type=element_type,
|
|
content=content,
|
|
bbox=bbox,
|
|
confidence=elem_data.get('confidence', 1.0),
|
|
metadata=elem_data.get('metadata', {})
|
|
)
|
|
|
|
# Add style info if available
|
|
if 'style' in elem_data:
|
|
element.style = self._extract_style_info(elem_data['style'])
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert PP3 element: {e}")
|
|
return None
|
|
|
|
def _convert_text_region(
|
|
self,
|
|
text_region: Dict[str, Any]
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert text region to DocumentElement."""
|
|
try:
|
|
# Extract bbox (handle both formats: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]] or [x0, y0, x1, y1])
|
|
bbox_data = text_region.get('bbox', [0, 0, 0, 0])
|
|
|
|
if isinstance(bbox_data, list) and len(bbox_data) == 4:
|
|
if isinstance(bbox_data[0], list):
|
|
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
|
|
x0 = float(bbox_data[0][0])
|
|
y0 = float(bbox_data[0][1])
|
|
x1 = float(bbox_data[2][0])
|
|
y1 = float(bbox_data[2][1])
|
|
else:
|
|
# Simple format: [x0, y0, x1, y1]
|
|
x0 = float(bbox_data[0])
|
|
y0 = float(bbox_data[1])
|
|
x1 = float(bbox_data[2])
|
|
y1 = float(bbox_data[3])
|
|
else:
|
|
x0 = y0 = x1 = y1 = 0
|
|
|
|
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
|
|
|
|
element = DocumentElement(
|
|
element_id=f"text_{self.element_counter}",
|
|
type=ElementType.TEXT,
|
|
content=text_region.get('text', ''),
|
|
bbox=bbox,
|
|
confidence=text_region.get('confidence', 1.0),
|
|
metadata={'page': text_region.get('page', 1)}
|
|
)
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert text region: {e}")
|
|
return None
|
|
|
|
def _convert_image_metadata(
|
|
self,
|
|
img_meta: Dict[str, Any]
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert image metadata to DocumentElement."""
|
|
try:
|
|
# Extract bbox (handle both formats)
|
|
bbox_data = img_meta.get('bbox', [0, 0, 0, 0])
|
|
|
|
if isinstance(bbox_data, list) and len(bbox_data) == 4:
|
|
if isinstance(bbox_data[0], list):
|
|
# 4-point format: [[x1,y1], [x2,y1], [x2,y2], [x1,y2]]
|
|
x0 = float(bbox_data[0][0])
|
|
y0 = float(bbox_data[0][1])
|
|
x1 = float(bbox_data[2][0])
|
|
y1 = float(bbox_data[2][1])
|
|
else:
|
|
# Simple format: [x0, y0, x1, y1]
|
|
x0 = float(bbox_data[0])
|
|
y0 = float(bbox_data[1])
|
|
x1 = float(bbox_data[2])
|
|
y1 = float(bbox_data[3])
|
|
else:
|
|
x0 = y0 = x1 = y1 = 0
|
|
|
|
bbox = BoundingBox(x0=x0, y0=y0, x1=x1, y1=y1)
|
|
|
|
# Create image content dict
|
|
image_content = {
|
|
'path': img_meta.get('path', ''),
|
|
'width': img_meta.get('width', 0),
|
|
'height': img_meta.get('height', 0),
|
|
'format': img_meta.get('format', 'unknown')
|
|
}
|
|
|
|
element = DocumentElement(
|
|
element_id=f"img_{self.element_counter}",
|
|
type=ElementType.IMAGE,
|
|
content=image_content,
|
|
bbox=bbox,
|
|
metadata={'page': img_meta.get('page', 1)}
|
|
)
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert image metadata: {e}")
|
|
return None
|
|
|
|
def _convert_table_data(
|
|
self,
|
|
table_dict: Dict[str, Any]
|
|
) -> Optional[DocumentElement]:
|
|
"""Convert table data to DocumentElement."""
|
|
try:
|
|
# Clean up empty columns before building TableData
|
|
table_dict = trim_empty_columns(table_dict)
|
|
|
|
# Extract bbox
|
|
bbox_data = table_dict.get('bbox', [0, 0, 0, 0])
|
|
bbox = BoundingBox(
|
|
x0=float(bbox_data[0]),
|
|
y0=float(bbox_data[1]),
|
|
x1=float(bbox_data[2]),
|
|
y1=float(bbox_data[3])
|
|
)
|
|
|
|
# Create table data
|
|
# Note: TableData uses 'cols' not 'columns', and doesn't have 'html' field
|
|
# HTML content is stored in metadata instead
|
|
raw_cells = table_dict.get('cells', [])
|
|
table_cells = []
|
|
|
|
# Convert raw cells to TableCell objects if needed
|
|
for cell_data in raw_cells:
|
|
if isinstance(cell_data, dict):
|
|
from app.models.unified_document import TableCell
|
|
table_cells.append(TableCell(
|
|
row=cell_data.get('row', 0),
|
|
col=cell_data.get('col', 0),
|
|
row_span=cell_data.get('row_span', 1),
|
|
col_span=cell_data.get('col_span', 1),
|
|
content=cell_data.get('content', '')
|
|
))
|
|
|
|
table_data = TableData(
|
|
rows=table_dict.get('rows', 0),
|
|
cols=table_dict.get('columns', table_dict.get('cols', 0)),
|
|
cells=table_cells,
|
|
caption=table_dict.get('caption')
|
|
)
|
|
|
|
element = DocumentElement(
|
|
element_id=f"table_{self.element_counter}",
|
|
type=ElementType.TABLE,
|
|
content=table_data, # Use TableData object as content
|
|
bbox=bbox,
|
|
metadata={'page': table_dict.get('page', 1), 'extracted_text': table_dict.get('extracted_text', '')}
|
|
)
|
|
|
|
self.element_counter += 1
|
|
return element
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert table data: {e}")
|
|
return None
|
|
|
|
def _extract_table_data(self, elem_data: Dict) -> Optional[TableData]:
|
|
"""
|
|
Extract table data from element using BeautifulSoup for robust HTML parsing.
|
|
|
|
This method produces TableData objects with fully populated cells arrays,
|
|
matching the format produced by DirectExtractionEngine for consistency.
|
|
"""
|
|
try:
|
|
html = elem_data.get('html', '')
|
|
extracted_text = elem_data.get('extracted_text', '')
|
|
|
|
# Fallback: check content field for HTML table if html field is empty
|
|
if not html:
|
|
content = elem_data.get('content', '')
|
|
if isinstance(content, str) and '<table' in content.lower():
|
|
html = content
|
|
logger.debug("Using content field as HTML table source")
|
|
|
|
# Return None if no HTML table content
|
|
if not html or '<table' not in html.lower():
|
|
if extracted_text:
|
|
# Return minimal TableData with just caption if we have text
|
|
return TableData(rows=0, cols=0, cells=[], caption=extracted_text)
|
|
return None
|
|
|
|
# Parse HTML table using BeautifulSoup
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
table = soup.find('table')
|
|
|
|
if not table:
|
|
logger.warning("No <table> element found in HTML")
|
|
return self._fallback_table_data(html, extracted_text)
|
|
|
|
cells = []
|
|
headers = []
|
|
rows = table.find_all('tr')
|
|
num_rows = len(rows)
|
|
|
|
# First pass: calculate total columns by finding max column extent
|
|
# Track cells that span multiple rows: occupied[row][col] = True
|
|
occupied: Dict[int, Dict[int, bool]] = {r: {} for r in range(num_rows)}
|
|
|
|
# Parse all cells with proper rowspan/colspan handling
|
|
for row_idx, row in enumerate(rows):
|
|
row_cells = row.find_all(['td', 'th'])
|
|
col_idx = 0
|
|
|
|
for cell in row_cells:
|
|
# Skip columns that are occupied by rowspan from previous rows
|
|
while occupied[row_idx].get(col_idx, False):
|
|
col_idx += 1
|
|
|
|
cell_content = cell.get_text(strip=True)
|
|
rowspan = int(cell.get('rowspan', 1))
|
|
colspan = int(cell.get('colspan', 1))
|
|
|
|
cells.append(TableCell(
|
|
row=row_idx,
|
|
col=col_idx,
|
|
row_span=rowspan,
|
|
col_span=colspan,
|
|
content=cell_content
|
|
))
|
|
|
|
# Collect headers from <th> elements or first row
|
|
if cell.name == 'th' or row_idx == 0:
|
|
headers.append(cell_content)
|
|
|
|
# Mark cells as occupied for rowspan/colspan
|
|
for r in range(row_idx, min(row_idx + rowspan, num_rows)):
|
|
for c in range(col_idx, col_idx + colspan):
|
|
if r not in occupied:
|
|
occupied[r] = {}
|
|
occupied[r][c] = True
|
|
|
|
# Advance column index by colspan
|
|
col_idx += colspan
|
|
|
|
# Calculate actual column count from occupied cells
|
|
num_cols = 0
|
|
for r in range(num_rows):
|
|
if occupied[r]:
|
|
max_col_in_row = max(occupied[r].keys()) + 1
|
|
num_cols = max(num_cols, max_col_in_row)
|
|
|
|
logger.debug(
|
|
f"Parsed HTML table: {num_rows} rows, {num_cols} cols, {len(cells)} cells"
|
|
)
|
|
|
|
# Build table dict for cleanup
|
|
table_dict = {
|
|
'rows': num_rows,
|
|
'cols': num_cols,
|
|
'cells': [
|
|
{
|
|
'row': c.row,
|
|
'col': c.col,
|
|
'row_span': c.row_span,
|
|
'col_span': c.col_span,
|
|
'content': c.content
|
|
}
|
|
for c in cells
|
|
],
|
|
'headers': headers if headers else None,
|
|
'caption': extracted_text if extracted_text else None
|
|
}
|
|
|
|
# Clean up empty columns
|
|
table_dict = trim_empty_columns(table_dict)
|
|
|
|
# Convert cleaned cells back to TableCell objects
|
|
cleaned_cells = [
|
|
TableCell(
|
|
row=c['row'],
|
|
col=c['col'],
|
|
row_span=c.get('row_span', 1),
|
|
col_span=c.get('col_span', 1),
|
|
content=c.get('content', '')
|
|
)
|
|
for c in table_dict.get('cells', [])
|
|
]
|
|
|
|
return TableData(
|
|
rows=table_dict.get('rows', num_rows),
|
|
cols=table_dict.get('cols', num_cols),
|
|
cells=cleaned_cells,
|
|
headers=table_dict.get('headers'),
|
|
caption=table_dict.get('caption')
|
|
)
|
|
|
|
except ImportError:
|
|
logger.warning("BeautifulSoup not available, using fallback parsing")
|
|
return self._fallback_table_data(html, extracted_text)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract table data: {e}")
|
|
return None
|
|
|
|
def _fallback_table_data(self, html: str, extracted_text: str = '') -> Optional[TableData]:
|
|
"""
|
|
Fallback table parsing when BeautifulSoup is not available.
|
|
Returns basic TableData with row/col counts only (no cells).
|
|
"""
|
|
try:
|
|
rows = html.count('<tr')
|
|
cols = 0
|
|
if rows > 0:
|
|
first_row_end = html.find('</tr>')
|
|
if first_row_end > 0:
|
|
first_row = html[:first_row_end]
|
|
cols = first_row.count('<td') + first_row.count('<th')
|
|
|
|
if rows == 0 and cols == 0 and not extracted_text:
|
|
return None
|
|
|
|
return TableData(
|
|
rows=rows,
|
|
cols=cols,
|
|
cells=[], # Empty cells in fallback mode
|
|
caption=extracted_text if extracted_text else None
|
|
)
|
|
except:
|
|
return None
|
|
|
|
|
|
def _extract_style_info(self, style_data: Dict) -> Optional[StyleInfo]:
|
|
"""Extract style info from element."""
|
|
try:
|
|
return StyleInfo(
|
|
font_family=style_data.get('font_family'),
|
|
font_size=style_data.get('font_size'),
|
|
font_weight=style_data.get('font_weight'),
|
|
font_style=style_data.get('font_style'),
|
|
text_color=style_data.get('text_color'),
|
|
background_color=style_data.get('background_color'),
|
|
alignment=style_data.get('alignment')
|
|
)
|
|
except:
|
|
return None
|
|
|
|
def _calculate_reading_order(self, elements: List[DocumentElement]) -> List[int]:
|
|
"""Calculate reading order based on element positions."""
|
|
if not elements:
|
|
return []
|
|
|
|
# Create indexed elements with position
|
|
indexed_elements = []
|
|
for i, elem in enumerate(elements):
|
|
# Use top-left corner for sorting
|
|
indexed_elements.append((
|
|
i,
|
|
elem.bbox.y1, # y coordinate (top to bottom)
|
|
elem.bbox.x1 # x coordinate (left to right)
|
|
))
|
|
|
|
# Sort by y first (top to bottom), then x (left to right)
|
|
indexed_elements.sort(key=lambda x: (x[1], x[2]))
|
|
|
|
# Return the sorted indices
|
|
return [idx for idx, _, _ in indexed_elements]
|
|
|
|
def _establish_relationships(self, doc: UnifiedDocument):
|
|
"""
|
|
Establish relationships between elements.
|
|
|
|
This includes:
|
|
- Linking captions to figures/tables
|
|
- Grouping list items
|
|
- Identifying headers and their content
|
|
"""
|
|
for page in doc.pages:
|
|
# Link captions to nearest figure/table
|
|
self._link_captions(page.elements)
|
|
|
|
# Group consecutive list items
|
|
self._group_list_items(page.elements)
|
|
|
|
# Link headers to content
|
|
self._link_headers(page.elements)
|
|
|
|
# Update metadata based on content
|
|
self._update_metadata(doc)
|
|
|
|
def _link_captions(self, elements: List[DocumentElement]):
|
|
"""Link caption elements to their associated figures/tables."""
|
|
captions = [e for e in elements if e.type in [ElementType.CAPTION, ElementType.TABLE_CAPTION]]
|
|
targets = [e for e in elements if e.type in [ElementType.FIGURE, ElementType.TABLE, ElementType.IMAGE]]
|
|
|
|
for caption in captions:
|
|
if not targets:
|
|
break
|
|
|
|
# Find nearest target above the caption
|
|
best_target = None
|
|
min_distance = float('inf')
|
|
|
|
for target in targets:
|
|
# Caption should be below the target (y1 is bottom in BoundingBox)
|
|
if target.bbox.y1 <= caption.bbox.y0:
|
|
distance = caption.bbox.y0 - target.bbox.y1
|
|
if distance < min_distance:
|
|
min_distance = distance
|
|
best_target = target
|
|
|
|
if best_target and min_distance < 50: # Within 50 pixels
|
|
caption.metadata['linked_to'] = best_target.element_id
|
|
best_target.metadata['caption_id'] = caption.element_id
|
|
|
|
def _group_list_items(self, elements: List[DocumentElement]):
|
|
"""Group consecutive list items."""
|
|
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
|
|
|
|
if not list_items:
|
|
return
|
|
|
|
# Sort by position
|
|
list_items.sort(key=lambda e: (e.bbox.y1, e.bbox.x1))
|
|
|
|
# Group consecutive items
|
|
current_group = []
|
|
groups = []
|
|
|
|
for i, item in enumerate(list_items):
|
|
if i == 0:
|
|
current_group = [item]
|
|
else:
|
|
prev_item = list_items[i-1]
|
|
# Check if items are consecutive (similar x position, reasonable y gap)
|
|
x_aligned = abs(item.bbox.x0 - prev_item.bbox.x0) < 20
|
|
y_consecutive = (item.bbox.y0 - prev_item.bbox.y1) < 30
|
|
|
|
if x_aligned and y_consecutive:
|
|
current_group.append(item)
|
|
else:
|
|
if current_group:
|
|
groups.append(current_group)
|
|
current_group = [item]
|
|
|
|
if current_group:
|
|
groups.append(current_group)
|
|
|
|
# Mark groups in metadata
|
|
for group_idx, group in enumerate(groups):
|
|
group_id = f"list_group_{group_idx}"
|
|
for item_idx, item in enumerate(group):
|
|
item.metadata['list_group'] = group_id
|
|
item.metadata['list_index'] = item_idx
|
|
|
|
def _link_headers(self, elements: List[DocumentElement]):
|
|
"""Link headers to their content sections."""
|
|
headers = [e for e in elements if e.type in [ElementType.HEADER, ElementType.TITLE]]
|
|
|
|
for i, header in enumerate(headers):
|
|
# Find content between this header and the next
|
|
next_header_y = float('inf')
|
|
if i + 1 < len(headers):
|
|
next_header_y = headers[i + 1].bbox.y1
|
|
|
|
# Find all elements between headers (y0=top, y1=bottom)
|
|
content_elements = [
|
|
e for e in elements
|
|
if (e.bbox.y0 > header.bbox.y1 and
|
|
e.bbox.y0 < next_header_y and
|
|
e.type not in [ElementType.HEADER, ElementType.TITLE])
|
|
]
|
|
|
|
if content_elements:
|
|
header.metadata['content_elements'] = [e.element_id for e in content_elements]
|
|
for elem in content_elements:
|
|
elem.metadata['header_id'] = header.element_id
|
|
|
|
def _update_metadata(self, doc: UnifiedDocument):
|
|
"""Update document metadata based on extracted content."""
|
|
# For now, just ensure basic metadata is present.
|
|
# Since DocumentMetadata doesn't have all these fields,
|
|
# we can store summary data at the document level or in processing_errors
|
|
pass
|
|
|
|
def _generate_document_id(self, file_path: Path) -> str:
|
|
"""Generate unique document ID."""
|
|
content = f"{file_path.name}_{datetime.now().isoformat()}"
|
|
return hashlib.md5(content.encode()).hexdigest()
|
|
|
|
def _detect_mime_type(self, file_path: Path) -> str:
|
|
"""Detect MIME type of file."""
|
|
try:
|
|
import magic
|
|
return magic.from_file(str(file_path), mime=True)
|
|
except:
|
|
# Fallback to extension-based detection
|
|
ext = file_path.suffix.lower()
|
|
mime_map = {
|
|
'.pdf': 'application/pdf',
|
|
'.png': 'image/png',
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg'
|
|
}
|
|
return mime_map.get(ext, 'application/octet-stream')
|
|
|
|
def _count_elements(self, pages: List[Page]) -> int:
|
|
"""Count total elements across all pages."""
|
|
return sum(len(page.elements) for page in pages)
|
|
|
|
def _extract_from_direct_results(
|
|
self,
|
|
pages_data: List[Dict[str, Any]]
|
|
) -> List[Page]:
|
|
"""Extract pages from direct PP-StructureV3 results."""
|
|
pages = []
|
|
|
|
for page_idx, page_data in enumerate(pages_data):
|
|
elements = []
|
|
|
|
# Get page dimensions first
|
|
page_width = page_data.get('width', 0)
|
|
page_height = page_data.get('height', 0)
|
|
|
|
# Process each element in the page
|
|
if 'elements' in page_data:
|
|
for elem_data in page_data['elements']:
|
|
element = self._convert_pp3_element(
|
|
elem_data, page_idx,
|
|
page_width=page_width,
|
|
page_height=page_height
|
|
)
|
|
if element:
|
|
elements.append(element)
|
|
|
|
# Create page
|
|
page = Page(
|
|
page_number=page_idx + 1,
|
|
dimensions=Dimensions(
|
|
width=page_width,
|
|
height=page_height
|
|
),
|
|
elements=elements,
|
|
metadata={'reading_order': self._calculate_reading_order(elements)}
|
|
)
|
|
|
|
pages.append(page)
|
|
|
|
return pages |