This commit fixes the critical table overlap issue in Direct track PDF layout restoration where generated tables exceeded their bounding boxes and overlapped with surrounding text. Root Cause: ReportLab's Table component auto-calculates row heights based on content, often rendering tables larger than their specified bbox. The rowHeights parameter was ignored during actual rendering, and font size reduction didn't proportionally affect table height. Solution - Canvas Transform Scaling: Implemented a reliable canvas transform approach in _draw_table_element_direct(): 1. Wrap table with generous space to get natural rendered dimensions 2. Calculate scale factor: min(bbox_width/actual_width, bbox_height/actual_height, 1.0) 3. Apply canvas transform: saveState → translate → scale → drawOn → restoreState 4. Removed all buffers, using exact bbox positioning Key Changes: - backend/app/services/pdf_generator_service.py (_draw_table_element_direct): * Added canvas scaling logic (lines 2180-2208) * Removed buffer adjustments (previously 2pt→18pt attempts) * Use exact bbox position: pdf_y = page_height - bbox.y1 * Supports column widths from metadata to preserve original ratios - backend/app/services/direct_extraction_engine.py (_process_native_table): * Extract column widths from PyMuPDF table.cells data (lines 691-761) * Calculate and store original column width ratios (e.g., 40:60) * Store in element metadata for use during PDF generation * Prevents unnecessary text wrapping that increases table height Results: Test case showed perfect scaling: natural table 246.8×108.0pt → scaled to 246.8×89.6pt with factor 0.830, fitting exactly within bbox without overlap. Cleanup: - Removed test/debug scripts: check_tables.py, verify_chart_recognition.py - Removed demo files from demo_docs/ (basic/, layout/, mixed/, tables/) User Confirmed: "FINAL_SCALING_FIX.pdf 此份的結果是可接受的. 恭喜你完成的direct pdf的修復" Next: Other document formats require layout verification and fixes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1296 lines
51 KiB
Python
1296 lines
51 KiB
Python
"""
|
||
Direct Extraction Engine using PyMuPDF
|
||
|
||
Handles direct text and structure extraction from editable PDFs without OCR.
|
||
This provides much faster processing and perfect accuracy for documents with
|
||
extractable text.
|
||
"""
|
||
|
||
import os
|
||
import logging
|
||
import fitz # PyMuPDF
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||
from datetime import datetime
|
||
import re
|
||
|
||
from ..models.unified_document import (
|
||
UnifiedDocument, DocumentElement, Page, DocumentMetadata,
|
||
BoundingBox, StyleInfo, TableData, TableCell, Dimensions,
|
||
ElementType, ProcessingTrack
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class DirectExtractionEngine:
|
||
"""
|
||
Engine for direct text extraction from editable PDFs using PyMuPDF.
|
||
|
||
This engine provides:
|
||
- Fast text extraction with exact positioning
|
||
- Font and style information preservation
|
||
- Table structure detection
|
||
- Image extraction with coordinates
|
||
- Hyperlink and annotation extraction
|
||
"""
|
||
|
||
def __init__(self,
|
||
enable_table_detection: bool = True,
|
||
enable_image_extraction: bool = True,
|
||
min_table_rows: int = 2,
|
||
min_table_cols: int = 2):
|
||
"""
|
||
Initialize the extraction engine.
|
||
|
||
Args:
|
||
enable_table_detection: Whether to detect and extract tables
|
||
enable_image_extraction: Whether to extract images
|
||
min_table_rows: Minimum rows for table detection
|
||
min_table_cols: Minimum columns for table detection
|
||
"""
|
||
self.enable_table_detection = enable_table_detection
|
||
self.enable_image_extraction = enable_image_extraction
|
||
self.min_table_rows = min_table_rows
|
||
self.min_table_cols = min_table_cols
|
||
|
||
def extract(self,
|
||
file_path: Path,
|
||
output_dir: Optional[Path] = None) -> UnifiedDocument:
|
||
"""
|
||
Extract content from PDF file to UnifiedDocument format.
|
||
|
||
Args:
|
||
file_path: Path to PDF file
|
||
output_dir: Optional directory to save extracted images.
|
||
If not provided, creates a temporary directory in storage/results/{document_id}/
|
||
|
||
Returns:
|
||
UnifiedDocument with extracted content
|
||
"""
|
||
start_time = datetime.now()
|
||
document_id = str(uuid.uuid4())[:8] # Short ID for cleaner paths
|
||
|
||
try:
|
||
doc = fitz.open(str(file_path))
|
||
|
||
# If no output_dir provided, create default directory for image extraction
|
||
if output_dir is None and self.enable_image_extraction:
|
||
# Create temporary directory in storage/results
|
||
default_output_dir = Path("storage/results") / document_id
|
||
default_output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_dir = default_output_dir
|
||
logger.debug(f"Created default output directory: {output_dir}")
|
||
|
||
# Extract document metadata
|
||
metadata = self._extract_metadata(file_path, doc, start_time)
|
||
|
||
# Extract pages
|
||
pages = []
|
||
for page_num in range(len(doc)):
|
||
logger.info(f"Extracting page {page_num + 1}/{len(doc)}")
|
||
page = self._extract_page(
|
||
doc[page_num],
|
||
page_num + 1,
|
||
document_id,
|
||
output_dir
|
||
)
|
||
pages.append(page)
|
||
|
||
doc.close()
|
||
|
||
# Calculate processing time
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
metadata.processing_time = processing_time
|
||
|
||
logger.info(f"Direct extraction completed in {processing_time:.2f}s")
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during direct extraction: {e}")
|
||
# Return partial result with error information
|
||
processing_time = (datetime.now() - start_time).total_seconds()
|
||
|
||
if 'metadata' not in locals():
|
||
metadata = DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size if file_path.exists() else 0,
|
||
created_at=datetime.now(),
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=processing_time
|
||
)
|
||
|
||
return UnifiedDocument(
|
||
document_id=document_id,
|
||
metadata=metadata,
|
||
pages=pages if 'pages' in locals() else [],
|
||
processing_errors=[{
|
||
"error": str(e),
|
||
"type": type(e).__name__
|
||
}]
|
||
)
|
||
|
||
def _extract_metadata(self,
|
||
file_path: Path,
|
||
doc: fitz.Document,
|
||
start_time: datetime) -> DocumentMetadata:
|
||
"""Extract document metadata"""
|
||
pdf_metadata = doc.metadata
|
||
|
||
return DocumentMetadata(
|
||
filename=file_path.name,
|
||
file_type="pdf",
|
||
file_size=file_path.stat().st_size,
|
||
created_at=start_time,
|
||
processing_track=ProcessingTrack.DIRECT,
|
||
processing_time=0.0, # Will be updated later
|
||
title=pdf_metadata.get("title"),
|
||
author=pdf_metadata.get("author"),
|
||
subject=pdf_metadata.get("subject"),
|
||
keywords=pdf_metadata.get("keywords", "").split(",") if pdf_metadata.get("keywords") else None,
|
||
producer=pdf_metadata.get("producer"),
|
||
creator=pdf_metadata.get("creator"),
|
||
creation_date=self._parse_pdf_date(pdf_metadata.get("creationDate")),
|
||
modification_date=self._parse_pdf_date(pdf_metadata.get("modDate"))
|
||
)
|
||
|
||
def _parse_pdf_date(self, date_str: str) -> Optional[datetime]:
|
||
"""Parse PDF date string to datetime"""
|
||
if not date_str:
|
||
return None
|
||
|
||
try:
|
||
# PDF date format: D:YYYYMMDDHHmmSSOHH'mm
|
||
# Example: D:20240101120000+09'00
|
||
if date_str.startswith("D:"):
|
||
date_str = date_str[2:]
|
||
|
||
# Extract just the date/time part (first 14 characters)
|
||
if len(date_str) >= 14:
|
||
date_part = date_str[:14]
|
||
return datetime.strptime(date_part, "%Y%m%d%H%M%S")
|
||
except:
|
||
pass
|
||
|
||
return None
|
||
|
||
def _extract_page(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
output_dir: Optional[Path]) -> Page:
|
||
"""Extract content from a single page"""
|
||
elements = []
|
||
element_counter = 0
|
||
|
||
# Get page-level metadata (for final Page metadata)
|
||
drawings = page.get_drawings()
|
||
links = page.get_links()
|
||
|
||
# Get page dimensions
|
||
rect = page.rect
|
||
dimensions = Dimensions(
|
||
width=rect.width,
|
||
height=rect.height,
|
||
dpi=72 # PDF standard DPI
|
||
)
|
||
|
||
# Extract tables first (if enabled) to get table regions
|
||
table_bboxes = []
|
||
if self.enable_table_detection:
|
||
try:
|
||
# Try native table detection (PyMuPDF 1.23.0+)
|
||
tables = page.find_tables()
|
||
for table_idx, table in enumerate(tables):
|
||
element = self._process_native_table(
|
||
table, page_num, element_counter
|
||
)
|
||
if element and element.bbox:
|
||
elements.append(element)
|
||
table_bboxes.append(element.bbox)
|
||
element_counter += 1
|
||
except AttributeError:
|
||
# Fallback to positional table detection
|
||
logger.debug("Native table detection not available, using positional detection")
|
||
table_elements = self._detect_tables_by_position(page, page_num, element_counter)
|
||
for elem in table_elements:
|
||
if elem.bbox:
|
||
table_bboxes.append(elem.bbox)
|
||
elements.extend(table_elements)
|
||
element_counter += len(table_elements)
|
||
|
||
# Extract text blocks with formatting (sort=True for reading order)
|
||
# Filter out lines that overlap with table regions
|
||
text_dict = page.get_text("dict", sort=True)
|
||
for block_idx, block in enumerate(text_dict.get("blocks", [])):
|
||
if block.get("type") == 0: # Text block
|
||
element = self._process_text_block(
|
||
block, page_num, element_counter, table_bboxes
|
||
)
|
||
if element:
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# Extract images (if enabled)
|
||
if self.enable_image_extraction:
|
||
image_elements = self._extract_images(
|
||
page, page_num, document_id, element_counter, output_dir
|
||
)
|
||
elements.extend(image_elements)
|
||
element_counter += len(image_elements)
|
||
|
||
# Extract vector graphics (charts, diagrams) from drawing commands
|
||
if self.enable_image_extraction:
|
||
vector_elements = self._extract_vector_graphics(
|
||
page, page_num, document_id, element_counter, output_dir
|
||
)
|
||
elements.extend(vector_elements)
|
||
element_counter += len(vector_elements)
|
||
|
||
# Extract hyperlinks
|
||
links = page.get_links()
|
||
for link_idx, link in enumerate(links):
|
||
# Create link annotation element if it has URI
|
||
if link.get("uri"):
|
||
from_rect = link.get("from")
|
||
if from_rect:
|
||
element = DocumentElement(
|
||
element_id=f"link_{page_num}_{element_counter}",
|
||
type=ElementType.REFERENCE,
|
||
content={"uri": link["uri"], "type": "hyperlink"},
|
||
bbox=BoundingBox(
|
||
x0=from_rect.x0,
|
||
y0=from_rect.y0,
|
||
x1=from_rect.x1,
|
||
y1=from_rect.y1
|
||
),
|
||
metadata={"link_type": "external" if link["uri"].startswith("http") else "internal"}
|
||
)
|
||
elements.append(element)
|
||
element_counter += 1
|
||
|
||
# PyMuPDF's sort=True already provides good reading order for multi-column layouts
|
||
# (top-to-bottom, left-to-right within each row). We don't need to re-sort.
|
||
# NOTE: If sort=True is not used in get_text(), uncomment the line below:
|
||
# elements = self._sort_elements_for_reading_order(elements, dimensions)
|
||
|
||
# Deduplicate: Remove CHART elements that overlap with TABLE elements
|
||
# (Tables have structured data, so they take priority over vector graphics)
|
||
elements = self._deduplicate_table_chart_overlap(elements)
|
||
|
||
# Post-process elements for header/footer detection and structure
|
||
elements = self._detect_headers_footers(elements, dimensions)
|
||
elements = self._build_section_hierarchy(elements)
|
||
elements = self._build_nested_lists(elements)
|
||
|
||
return Page(
|
||
page_number=page_num,
|
||
elements=elements,
|
||
dimensions=dimensions,
|
||
metadata={
|
||
"has_drawings": len(drawings) > 0,
|
||
"drawing_count": len(drawings),
|
||
"link_count": len(links)
|
||
}
|
||
)
|
||
|
||
def _sort_elements_for_reading_order(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""
|
||
Sort elements by reading order, handling multi-column layouts.
|
||
|
||
For multi-column layouts (e.g., two-column documents), this ensures
|
||
elements are ordered correctly: top-to-bottom, then left-to-right
|
||
within each row.
|
||
|
||
Args:
|
||
elements: List of document elements
|
||
dimensions: Page dimensions
|
||
|
||
Returns:
|
||
Sorted list of elements in reading order
|
||
"""
|
||
if not elements:
|
||
return elements
|
||
|
||
# Detect if page has multi-column layout
|
||
text_elements = [e for e in elements if e.bbox and e.is_text]
|
||
if len(text_elements) < 3:
|
||
# Too few elements to determine layout, just sort by Y position
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
# Cluster x-positions to detect columns
|
||
x_positions = [e.bbox.x0 for e in text_elements]
|
||
columns = self._detect_columns(x_positions, dimensions.width)
|
||
|
||
if len(columns) <= 1:
|
||
# Single column layout - simple top-to-bottom sort
|
||
logger.debug(f"Detected single-column layout")
|
||
return sorted(elements, key=lambda e: (e.bbox.y0 if e.bbox else 0, e.bbox.x0 if e.bbox else 0))
|
||
|
||
logger.debug(f"Detected {len(columns)}-column layout at x positions: {[f'{x:.1f}' for x in columns]}")
|
||
|
||
# Multi-column layout - use newspaper-style reading order
|
||
# (complete left column, then right column, etc.)
|
||
# This is more appropriate for technical documents and data sheets
|
||
element_data = []
|
||
for elem in elements:
|
||
if not elem.bbox:
|
||
element_data.append((elem, 0, 0))
|
||
continue
|
||
|
||
# Find which column this element belongs to
|
||
col_idx = 0
|
||
min_dist = float('inf')
|
||
for i, col_x in enumerate(columns):
|
||
dist = abs(elem.bbox.x0 - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
col_idx = i
|
||
|
||
element_data.append((elem, col_idx, elem.bbox.y0))
|
||
|
||
# Sort by: column first, then Y position within column
|
||
# This gives newspaper-style reading: complete column 1, then column 2, etc.
|
||
element_data.sort(key=lambda x: (x[1], x[2]))
|
||
|
||
logger.debug(f"Using newspaper-style column reading order (column by column, top to bottom)")
|
||
return [e[0] for e in element_data]
|
||
|
||
def _detect_columns(self, x_positions: List[float], page_width: float) -> List[float]:
|
||
"""
|
||
Detect column positions from x-coordinates of text elements.
|
||
|
||
Args:
|
||
x_positions: List of x-coordinates (left edges of text)
|
||
page_width: Page width in points
|
||
|
||
Returns:
|
||
List of column x-positions (sorted left to right)
|
||
"""
|
||
if not x_positions:
|
||
return []
|
||
|
||
# Cluster x-positions to find column starts
|
||
# Use k-means-like approach: find groups of x-positions
|
||
threshold = page_width * 0.15 # 15% of page width as clustering threshold
|
||
|
||
sorted_x = sorted(set(x_positions))
|
||
if not sorted_x:
|
||
return []
|
||
|
||
clusters = [[sorted_x[0]]]
|
||
|
||
for x in sorted_x[1:]:
|
||
# Check if x belongs to current cluster
|
||
cluster_center = sum(clusters[-1]) / len(clusters[-1])
|
||
if abs(x - cluster_center) < threshold:
|
||
clusters[-1].append(x)
|
||
else:
|
||
# Start new cluster
|
||
clusters.append([x])
|
||
|
||
# Return average x position of each cluster (column start)
|
||
column_positions = [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
# Filter out columns that are too close to each other
|
||
min_column_width = page_width * 0.2 # Columns must be at least 20% of page width apart
|
||
filtered_columns = [column_positions[0]]
|
||
for col_x in column_positions[1:]:
|
||
if col_x - filtered_columns[-1] >= min_column_width:
|
||
filtered_columns.append(col_x)
|
||
|
||
return filtered_columns
|
||
|
||
def _detect_headers_footers(self, elements: List[DocumentElement], dimensions: Dimensions) -> List[DocumentElement]:
|
||
"""Detect and mark header/footer elements based on page position"""
|
||
page_height = dimensions.height
|
||
header_threshold = page_height * 0.1 # Top 10% of page
|
||
footer_threshold = page_height * 0.9 # Bottom 10% of page
|
||
|
||
for elem in elements:
|
||
# Skip non-text elements
|
||
if not elem.is_text:
|
||
continue
|
||
|
||
# Check if element is in header region
|
||
if elem.bbox.y1 <= header_threshold:
|
||
# Only mark as header if it's short text
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.HEADER
|
||
elem.metadata['is_page_header'] = True
|
||
|
||
# Check if element is in footer region
|
||
elif elem.bbox.y0 >= footer_threshold:
|
||
# Short text in footer region
|
||
if isinstance(elem.content, str) and len(elem.content) < 200:
|
||
elem.type = ElementType.FOOTER
|
||
elem.metadata['is_page_footer'] = True
|
||
|
||
return elements
|
||
|
||
def _build_section_hierarchy(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build hierarchical section structure based on font sizes"""
|
||
# Collect all headers with their font sizes
|
||
headers = []
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
# Get average font size from style
|
||
font_size = 12.0 # Default
|
||
if elem.style and elem.style.font_size:
|
||
font_size = elem.style.font_size
|
||
headers.append((elem, font_size))
|
||
|
||
if not headers:
|
||
return elements
|
||
|
||
# Sort headers by font size to determine hierarchy levels
|
||
font_sizes = sorted(set(size for _, size in headers), reverse=True)
|
||
size_to_level = {size: level for level, size in enumerate(font_sizes, 1)}
|
||
|
||
# Assign section levels to headers
|
||
for elem, font_size in headers:
|
||
level = size_to_level.get(font_size, 1)
|
||
elem.metadata['section_level'] = level
|
||
elem.metadata['font_size'] = font_size
|
||
|
||
# Build parent-child relationships between headers
|
||
header_stack = [] # Stack of (element, level)
|
||
for elem, font_size in headers:
|
||
level = elem.metadata['section_level']
|
||
|
||
# Pop headers that are at same or lower level (larger font)
|
||
while header_stack and header_stack[-1][1] >= level:
|
||
header_stack.pop()
|
||
|
||
# Set parent header
|
||
if header_stack:
|
||
parent = header_stack[-1][0]
|
||
elem.metadata['parent_section'] = parent.element_id
|
||
if 'child_sections' not in parent.metadata:
|
||
parent.metadata['child_sections'] = []
|
||
parent.metadata['child_sections'].append(elem.element_id)
|
||
|
||
header_stack.append((elem, level))
|
||
|
||
# Link content to nearest preceding header at same or higher level
|
||
current_header = None
|
||
for elem in elements:
|
||
if elem.type in [ElementType.TITLE, ElementType.HEADER]:
|
||
current_header = elem
|
||
elif current_header and elem.type not in [ElementType.HEADER, ElementType.FOOTER]:
|
||
elem.metadata['section_id'] = current_header.element_id
|
||
|
||
return elements
|
||
|
||
def _build_nested_lists(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""Build nested list structure from flat list items"""
|
||
# Group list items
|
||
list_items = [e for e in elements if e.type == ElementType.LIST_ITEM]
|
||
if not list_items:
|
||
return elements
|
||
|
||
# Sort by position (top to bottom)
|
||
list_items.sort(key=lambda e: (e.bbox.y0, e.bbox.x0))
|
||
|
||
# Detect indentation levels based on x position
|
||
x_positions = [item.bbox.x0 for item in list_items]
|
||
if not x_positions:
|
||
return elements
|
||
|
||
min_x = min(x_positions)
|
||
indent_unit = 20 # Typical indent size in points
|
||
|
||
# Assign nesting levels
|
||
for item in list_items:
|
||
indent = item.bbox.x0 - min_x
|
||
level = int(indent / indent_unit)
|
||
item.metadata['list_level'] = level
|
||
|
||
# Build parent-child relationships
|
||
item_stack = [] # Stack of (element, level)
|
||
for item in list_items:
|
||
level = item.metadata.get('list_level', 0)
|
||
|
||
# Pop items at same or deeper level
|
||
while item_stack and item_stack[-1][1] >= level:
|
||
item_stack.pop()
|
||
|
||
# Set parent
|
||
if item_stack:
|
||
parent = item_stack[-1][0]
|
||
item.metadata['parent_item'] = parent.element_id
|
||
if 'children' not in parent.metadata:
|
||
parent.metadata['children'] = []
|
||
parent.metadata['children'].append(item.element_id)
|
||
# Also add to actual children list
|
||
parent.children.append(item)
|
||
|
||
item_stack.append((item, level))
|
||
|
||
return elements
|
||
|
||
def _process_text_block(self, block: Dict, page_num: int, counter: int,
|
||
table_bboxes: List[BoundingBox] = None) -> Optional[DocumentElement]:
|
||
"""
|
||
Process a text block into a DocumentElement.
|
||
|
||
Args:
|
||
block: Text block from PyMuPDF
|
||
page_num: Page number
|
||
counter: Element counter
|
||
table_bboxes: List of table bounding boxes to filter overlapping lines
|
||
|
||
Returns:
|
||
DocumentElement or None if all lines overlap with tables
|
||
"""
|
||
if table_bboxes is None:
|
||
table_bboxes = []
|
||
|
||
# Extract text content and span information
|
||
# Filter out lines that significantly overlap with table regions
|
||
text_parts = []
|
||
styles = []
|
||
span_children = [] # Store span-level children for inline styling
|
||
span_counter = 0
|
||
valid_line_bboxes = [] # Track bboxes of valid lines for overall bbox calculation
|
||
|
||
for line in block.get("lines", []):
|
||
line_bbox_data = line.get("bbox", [0, 0, 0, 0])
|
||
|
||
# Check if this line overlaps with any table region
|
||
line_overlaps_table = False
|
||
for table_bbox in table_bboxes:
|
||
overlap_x0 = max(line_bbox_data[0], table_bbox.x0)
|
||
overlap_y0 = max(line_bbox_data[1], table_bbox.y0)
|
||
overlap_x1 = min(line_bbox_data[2], table_bbox.x1)
|
||
overlap_y1 = min(line_bbox_data[3], table_bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
# Calculate overlap ratio
|
||
line_height = line_bbox_data[3] - line_bbox_data[1]
|
||
overlap_height = overlap_y1 - overlap_y0
|
||
if line_height > 0:
|
||
overlap_ratio = overlap_height / line_height
|
||
if overlap_ratio >= 0.5: # Line significantly overlaps with table
|
||
line_overlaps_table = True
|
||
break
|
||
|
||
if line_overlaps_table:
|
||
continue # Skip this line
|
||
|
||
# Process valid line
|
||
valid_line_bboxes.append(line_bbox_data)
|
||
|
||
for span in line.get("spans", []):
|
||
text = span.get("text", "")
|
||
if text:
|
||
text_parts.append(text)
|
||
|
||
# Extract style information
|
||
style = StyleInfo(
|
||
font_name=span.get("font"),
|
||
font_size=span.get("size"),
|
||
font_weight="bold" if span.get("flags", 0) & 2**4 else "normal",
|
||
font_style="italic" if span.get("flags", 0) & 2**1 else "normal",
|
||
text_color=span.get("color")
|
||
)
|
||
styles.append(style)
|
||
|
||
# Create span child element for inline styling
|
||
span_bbox_data = span.get("bbox", [0, 0, 0, 0])
|
||
span_bbox = BoundingBox(
|
||
x0=span_bbox_data[0],
|
||
y0=span_bbox_data[1],
|
||
x1=span_bbox_data[2],
|
||
y1=span_bbox_data[3]
|
||
)
|
||
|
||
span_element = DocumentElement(
|
||
element_id=f"span_{page_num}_{counter}_{span_counter}",
|
||
type=ElementType.TEXT, # Spans are always text
|
||
content=text,
|
||
bbox=span_bbox,
|
||
style=style,
|
||
confidence=1.0,
|
||
metadata={"span_index": span_counter}
|
||
)
|
||
span_children.append(span_element)
|
||
span_counter += 1
|
||
|
||
if not text_parts:
|
||
return None # All lines overlapped with tables
|
||
|
||
full_text = "".join(text_parts)
|
||
|
||
# Calculate bbox from valid lines only
|
||
if valid_line_bboxes:
|
||
min_x0 = min(b[0] for b in valid_line_bboxes)
|
||
min_y0 = min(b[1] for b in valid_line_bboxes)
|
||
max_x1 = max(b[2] for b in valid_line_bboxes)
|
||
max_y1 = max(b[3] for b in valid_line_bboxes)
|
||
bbox = BoundingBox(x0=min_x0, y0=min_y0, x1=max_x1, y1=max_y1)
|
||
else:
|
||
# Fallback to original bbox if no valid lines found
|
||
bbox_data = block.get("bbox", [0, 0, 0, 0])
|
||
bbox = BoundingBox(x0=bbox_data[0], y0=bbox_data[1], x1=bbox_data[2], y1=bbox_data[3])
|
||
|
||
# Determine element type based on content and style
|
||
element_type = self._infer_element_type(full_text, styles)
|
||
|
||
# Use the most common style for the block
|
||
if styles:
|
||
block_style = styles[0] # Could be improved with style merging
|
||
else:
|
||
block_style = None
|
||
|
||
return DocumentElement(
|
||
element_id=f"text_{page_num}_{counter}",
|
||
type=element_type,
|
||
content=full_text,
|
||
bbox=bbox,
|
||
style=block_style,
|
||
confidence=1.0, # Direct extraction has perfect confidence
|
||
children=span_children # Store span children for inline styling
|
||
)
|
||
|
||
def _infer_element_type(self, text: str, styles: List[StyleInfo]) -> ElementType:
|
||
"""Infer element type based on text content and styling"""
|
||
text_lower = text.lower().strip()
|
||
|
||
# Check for common patterns
|
||
if len(text_lower) < 100 and styles:
|
||
# Short text with large font might be title/header
|
||
avg_size = sum(s.font_size or 12 for s in styles) / len(styles)
|
||
if avg_size > 16:
|
||
return ElementType.TITLE
|
||
elif avg_size > 14:
|
||
return ElementType.HEADER
|
||
|
||
# Check for list patterns
|
||
if re.match(r'^[\d•·▪▫◦‣⁃]\s', text_lower):
|
||
return ElementType.LIST_ITEM
|
||
|
||
# Check for page numbers
|
||
if re.match(r'^page\s+\d+|^\d+\s*$|^-\s*\d+\s*-$', text_lower):
|
||
return ElementType.PAGE_NUMBER
|
||
|
||
# Check for footnote patterns
|
||
if re.match(r'^[\[\d+\]]|^\d+\)', text_lower):
|
||
return ElementType.FOOTNOTE
|
||
|
||
# Default to paragraph for longer text, text for shorter
|
||
return ElementType.PARAGRAPH if len(text) > 150 else ElementType.TEXT
|
||
|
||
def _process_native_table(self, table, page_num: int, counter: int) -> Optional[DocumentElement]:
|
||
"""Process a natively detected table"""
|
||
try:
|
||
# Extract table data
|
||
data = table.extract()
|
||
if not data or len(data) < self.min_table_rows:
|
||
return None
|
||
|
||
# Get table bounding box
|
||
bbox_data = table.bbox
|
||
bbox = BoundingBox(
|
||
x0=bbox_data[0],
|
||
y0=bbox_data[1],
|
||
x1=bbox_data[2],
|
||
y1=bbox_data[3]
|
||
)
|
||
|
||
# Extract column widths from table cells
|
||
column_widths = []
|
||
if hasattr(table, 'cells') and table.cells:
|
||
# Group cells by column
|
||
cols_x = {}
|
||
for cell in table.cells:
|
||
col_idx = None
|
||
# Determine column index by x0 position
|
||
for idx, x0 in enumerate(sorted(set(c[0] for c in table.cells))):
|
||
if abs(cell[0] - x0) < 1.0: # Within 1pt tolerance
|
||
col_idx = idx
|
||
break
|
||
|
||
if col_idx is not None:
|
||
if col_idx not in cols_x:
|
||
cols_x[col_idx] = {'x0': cell[0], 'x1': cell[2]}
|
||
else:
|
||
cols_x[col_idx]['x1'] = max(cols_x[col_idx]['x1'], cell[2])
|
||
|
||
# Calculate width for each column
|
||
for col_idx in sorted(cols_x.keys()):
|
||
width = cols_x[col_idx]['x1'] - cols_x[col_idx]['x0']
|
||
column_widths.append(width)
|
||
|
||
# Create table cells
|
||
cells = []
|
||
for row_idx, row in enumerate(data):
|
||
for col_idx, cell_text in enumerate(row):
|
||
if cell_text:
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=str(cell_text) if cell_text else ""
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(data),
|
||
cols=max(len(row) for row in data) if data else 0,
|
||
cells=cells,
|
||
headers=data[0] if data else None # Assume first row is header
|
||
)
|
||
|
||
# Store column widths in metadata
|
||
metadata = {"column_widths": column_widths} if column_widths else None
|
||
|
||
return DocumentElement(
|
||
element_id=f"table_{page_num}_{counter}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata=metadata
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing native table: {e}")
|
||
return None
|
||
|
||
def _detect_tables_by_position(self, page: fitz.Page, page_num: int, counter: int) -> List[DocumentElement]:
|
||
"""Detect tables by analyzing text positioning"""
|
||
tables = []
|
||
|
||
# Get all words with positions
|
||
words = page.get_text("words") # Returns (x0, y0, x1, y1, "word", block_no, line_no, word_no)
|
||
|
||
if not words:
|
||
return tables
|
||
|
||
# Group words by approximate row (y-coordinate)
|
||
rows = {}
|
||
for word in words:
|
||
y = round(word[1] / 5) * 5 # Round to nearest 5 points
|
||
if y not in rows:
|
||
rows[y] = []
|
||
rows[y].append({
|
||
'x0': word[0],
|
||
'y0': word[1],
|
||
'x1': word[2],
|
||
'y1': word[3],
|
||
'text': word[4],
|
||
'block': word[5] if len(word) > 5 else 0
|
||
})
|
||
|
||
# Sort rows by y-coordinate
|
||
sorted_rows = sorted(rows.items(), key=lambda x: x[0])
|
||
|
||
# Find potential tables (consecutive rows with multiple columns)
|
||
current_table_rows = []
|
||
tables_found = []
|
||
|
||
for y, words_in_row in sorted_rows:
|
||
words_in_row.sort(key=lambda w: w['x0'])
|
||
|
||
if len(words_in_row) >= self.min_table_cols:
|
||
# Check if this could be a table row
|
||
x_positions = [w['x0'] for w in words_in_row]
|
||
|
||
# Check for somewhat regular spacing
|
||
if self._has_regular_spacing(x_positions):
|
||
current_table_rows.append((y, words_in_row))
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
else:
|
||
# End current table if exists
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
current_table_rows = []
|
||
|
||
# Don't forget the last table
|
||
if len(current_table_rows) >= self.min_table_rows:
|
||
tables_found.append(current_table_rows)
|
||
|
||
# Convert detected tables to DocumentElements
|
||
for table_idx, table_rows in enumerate(tables_found):
|
||
if not table_rows:
|
||
continue
|
||
|
||
# Calculate table bounding box
|
||
all_words = []
|
||
for _, words in table_rows:
|
||
all_words.extend(words)
|
||
|
||
min_x = min(w['x0'] for w in all_words)
|
||
min_y = min(w['y0'] for w in all_words)
|
||
max_x = max(w['x1'] for w in all_words)
|
||
max_y = max(w['y1'] for w in all_words)
|
||
|
||
bbox = BoundingBox(x0=min_x, y0=min_y, x1=max_x, y1=max_y)
|
||
|
||
# Create table cells
|
||
cells = []
|
||
for row_idx, (y, words) in enumerate(table_rows):
|
||
# Group words into columns
|
||
columns = self._group_into_columns(words, table_rows)
|
||
for col_idx, col_text in enumerate(columns):
|
||
if col_text:
|
||
cells.append(TableCell(
|
||
row=row_idx,
|
||
col=col_idx,
|
||
content=col_text
|
||
))
|
||
|
||
# Create table data
|
||
table_data = TableData(
|
||
rows=len(table_rows),
|
||
cols=max(len(self._group_into_columns(words, table_rows))
|
||
for _, words in table_rows),
|
||
cells=cells
|
||
)
|
||
|
||
element = DocumentElement(
|
||
element_id=f"table_{page_num}_{counter + table_idx}",
|
||
type=ElementType.TABLE,
|
||
content=table_data,
|
||
bbox=bbox,
|
||
confidence=0.8, # Lower confidence for positional detection
|
||
metadata={"detection_method": "positional"}
|
||
)
|
||
tables.append(element)
|
||
|
||
return tables
|
||
|
||
def _has_regular_spacing(self, x_positions: List[float], tolerance: float = 0.3) -> bool:
|
||
"""Check if x positions have somewhat regular spacing"""
|
||
if len(x_positions) < 3:
|
||
return False
|
||
|
||
spacings = [x_positions[i+1] - x_positions[i] for i in range(len(x_positions)-1)]
|
||
avg_spacing = sum(spacings) / len(spacings)
|
||
|
||
# Check if spacings are within tolerance of average
|
||
for spacing in spacings:
|
||
if abs(spacing - avg_spacing) > avg_spacing * tolerance:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _group_into_columns(self, words: List[Dict], all_rows: List) -> List[str]:
|
||
"""Group words into columns based on x-position"""
|
||
if not words:
|
||
return []
|
||
|
||
# Find common column positions across all rows
|
||
all_x_positions = []
|
||
for _, row_words in all_rows:
|
||
all_x_positions.extend([w['x0'] for w in row_words])
|
||
|
||
# Cluster x-positions to find columns
|
||
column_positions = self._cluster_positions(all_x_positions)
|
||
|
||
# Assign words to columns
|
||
columns = [""] * len(column_positions)
|
||
for word in words:
|
||
# Find closest column
|
||
closest_col = 0
|
||
min_dist = float('inf')
|
||
for col_idx, col_x in enumerate(column_positions):
|
||
dist = abs(word['x0'] - col_x)
|
||
if dist < min_dist:
|
||
min_dist = dist
|
||
closest_col = col_idx
|
||
|
||
if columns[closest_col]:
|
||
columns[closest_col] += " " + word['text']
|
||
else:
|
||
columns[closest_col] = word['text']
|
||
|
||
return columns
|
||
|
||
def _cluster_positions(self, positions: List[float], threshold: float = 20) -> List[float]:
|
||
"""Cluster positions to find common columns"""
|
||
if not positions:
|
||
return []
|
||
|
||
sorted_pos = sorted(positions)
|
||
clusters = [[sorted_pos[0]]]
|
||
|
||
for pos in sorted_pos[1:]:
|
||
# Check if position belongs to current cluster
|
||
if pos - clusters[-1][-1] < threshold:
|
||
clusters[-1].append(pos)
|
||
else:
|
||
clusters.append([pos])
|
||
|
||
# Return average position of each cluster
|
||
return [sum(cluster) / len(cluster) for cluster in clusters]
|
||
|
||
def _extract_images(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path]) -> List[DocumentElement]:
|
||
"""Extract images from page"""
|
||
elements = []
|
||
image_list = page.get_images()
|
||
|
||
for img_idx, img in enumerate(image_list):
|
||
try:
|
||
xref = img[0]
|
||
|
||
# Get image position(s)
|
||
img_rects = page.get_image_rects(xref)
|
||
if not img_rects:
|
||
continue
|
||
|
||
rect = img_rects[0] # Use first occurrence
|
||
bbox = BoundingBox(
|
||
x0=rect.x0,
|
||
y0=rect.y0,
|
||
x1=rect.x1,
|
||
y1=rect.y1
|
||
)
|
||
|
||
# Extract image data
|
||
pix = fitz.Pixmap(page.parent, xref)
|
||
image_data = {
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"xref": xref
|
||
}
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
image_filename = f"{document_id}_p{page_num}_img{img_idx}.png"
|
||
image_path = output_dir / image_filename
|
||
pix.save(str(image_path))
|
||
image_data["saved_path"] = str(image_path)
|
||
logger.debug(f"Saved image to {image_path}")
|
||
|
||
element = DocumentElement(
|
||
element_id=f"image_{page_num}_{counter + img_idx}",
|
||
type=ElementType.IMAGE,
|
||
content=image_data,
|
||
bbox=bbox,
|
||
confidence=1.0,
|
||
metadata={
|
||
"image_index": img_idx,
|
||
"xref": xref
|
||
}
|
||
)
|
||
elements.append(element)
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting image {img_idx}: {e}")
|
||
|
||
return elements
|
||
|
||
def _extract_vector_graphics(self,
|
||
page: fitz.Page,
|
||
page_num: int,
|
||
document_id: str,
|
||
counter: int,
|
||
output_dir: Optional[Path]) -> List[DocumentElement]:
|
||
"""
|
||
Extract vector graphics (charts, diagrams) from page.
|
||
|
||
This method identifies regions that are composed of vector drawing commands
|
||
(paths, lines, rectangles) rather than embedded raster images. These are
|
||
typically charts created in Excel, vector diagrams, or other graphics.
|
||
|
||
Args:
|
||
page: PyMuPDF page object
|
||
page_num: Page number (1-indexed)
|
||
document_id: Unique document identifier
|
||
counter: Starting counter for element IDs
|
||
output_dir: Directory to save rendered graphics
|
||
|
||
Returns:
|
||
List of DocumentElement objects representing vector graphics
|
||
"""
|
||
elements = []
|
||
|
||
try:
|
||
# Get all drawing commands
|
||
drawings = page.get_drawings()
|
||
if not drawings:
|
||
return elements
|
||
|
||
logger.debug(f"Page {page_num} contains {len(drawings)} vector drawing commands")
|
||
|
||
# Cluster drawings into groups (charts, diagrams, etc.)
|
||
try:
|
||
# PyMuPDF's cluster_drawings() groups nearby drawings automatically
|
||
drawing_clusters = page.cluster_drawings()
|
||
logger.debug(f"Clustered into {len(drawing_clusters)} groups")
|
||
except (AttributeError, TypeError) as e:
|
||
# cluster_drawings not available or has different signature
|
||
# Fallback: try to identify charts by analyzing drawing density
|
||
logger.warning(f"cluster_drawings() failed ({e}), using fallback method")
|
||
drawing_clusters = self._cluster_drawings_fallback(page, drawings)
|
||
|
||
for cluster_idx, bbox in enumerate(drawing_clusters):
|
||
# Ignore small regions (likely noise or separator lines)
|
||
if bbox.width < 50 or bbox.height < 50:
|
||
logger.debug(f"Skipping small cluster {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f}")
|
||
continue
|
||
|
||
# Render the region to a raster image
|
||
# matrix=fitz.Matrix(2, 2) increases resolution to ~200 DPI
|
||
try:
|
||
pix = page.get_pixmap(clip=bbox, matrix=fitz.Matrix(2, 2))
|
||
|
||
# Save image if output directory provided
|
||
if output_dir:
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
filename = f"{document_id}_p{page_num}_chart{cluster_idx}.png"
|
||
filepath = output_dir / filename
|
||
pix.save(str(filepath))
|
||
|
||
# Create DocumentElement
|
||
image_data = {
|
||
"saved_path": str(filepath),
|
||
"width": pix.width,
|
||
"height": pix.height,
|
||
"colorspace": pix.colorspace.name if pix.colorspace else "unknown",
|
||
"source": "vector_graphics"
|
||
}
|
||
|
||
element = DocumentElement(
|
||
element_id=f"chart_{page_num}_{counter + cluster_idx}",
|
||
type=ElementType.CHART, # Use CHART type for vector graphics
|
||
content=image_data,
|
||
bbox=BoundingBox(
|
||
x0=bbox.x0,
|
||
y0=bbox.y0,
|
||
x1=bbox.x1,
|
||
y1=bbox.y1
|
||
),
|
||
confidence=0.85, # Slightly lower confidence than raster images
|
||
metadata={
|
||
"cluster_index": cluster_idx,
|
||
"drawing_count": len(drawings)
|
||
}
|
||
)
|
||
elements.append(element)
|
||
logger.debug(f"Extracted chart {cluster_idx}: {bbox.width:.1f}x{bbox.height:.1f} -> {filepath}")
|
||
|
||
pix = None # Free memory
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error rendering vector graphic cluster {cluster_idx}: {e}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error extracting vector graphics: {e}")
|
||
|
||
return elements
|
||
|
||
def _cluster_drawings_fallback(self, page: fitz.Page, drawings: list) -> list:
|
||
"""
|
||
Fallback method to cluster drawings when cluster_drawings() is not available.
|
||
|
||
This uses a simple spatial clustering approach based on bounding boxes.
|
||
"""
|
||
if not drawings:
|
||
return []
|
||
|
||
# Collect all drawing bounding boxes
|
||
bboxes = []
|
||
for drawing in drawings:
|
||
rect = drawing.get('rect')
|
||
if rect:
|
||
bboxes.append(fitz.Rect(rect))
|
||
|
||
if not bboxes:
|
||
return []
|
||
|
||
# Simple clustering: merge overlapping or nearby rectangles
|
||
clusters = []
|
||
tolerance = 20
|
||
|
||
for bbox in bboxes:
|
||
# Try to merge with existing cluster
|
||
merged = False
|
||
for i, cluster in enumerate(clusters):
|
||
# Check if bbox is close to this cluster
|
||
expanded_cluster = cluster + (-tolerance, -tolerance, tolerance, tolerance)
|
||
if expanded_cluster.intersects(bbox):
|
||
# Merge bbox into cluster
|
||
clusters[i] = cluster | bbox # Union of rectangles
|
||
merged = True
|
||
break
|
||
|
||
if not merged:
|
||
# Create new cluster
|
||
clusters.append(bbox)
|
||
|
||
# Filter out very small clusters
|
||
filtered_clusters = [c for c in clusters if c.width >= 50 and c.height >= 50]
|
||
|
||
logger.debug(f"Fallback clustering: {len(bboxes)} drawings -> {len(clusters)} clusters -> {len(filtered_clusters)} filtered")
|
||
|
||
return filtered_clusters
|
||
|
||
def _deduplicate_table_chart_overlap(self, elements: List[DocumentElement]) -> List[DocumentElement]:
|
||
"""
|
||
Intelligently resolve TABLE-CHART overlaps based on table structure completeness.
|
||
|
||
When a region is detected as both TABLE and CHART:
|
||
- Calculate cell completeness = actual_cells / (rows × cols)
|
||
- If completeness ≥50% → Real table with complete structure → Keep TABLE
|
||
- If completeness <50% → False positive (chart detected as table) → Keep CHART
|
||
|
||
Args:
|
||
elements: List of extracted elements
|
||
|
||
Returns:
|
||
Filtered list with low-quality overlaps removed
|
||
"""
|
||
# Collect all tables and charts
|
||
tables = [elem for elem in elements if elem.type == ElementType.TABLE]
|
||
charts = [elem for elem in elements if elem.type == ElementType.CHART]
|
||
|
||
if not tables or not charts:
|
||
return elements # No potential conflicts
|
||
|
||
# Analyze TABLE structure completeness
|
||
table_completeness = {}
|
||
for table in tables:
|
||
if hasattr(table.content, 'rows') and hasattr(table.content, 'cols') and hasattr(table.content, 'cells'):
|
||
expected_cells = table.content.rows * table.content.cols
|
||
actual_cells = len(table.content.cells)
|
||
|
||
if expected_cells > 0:
|
||
completeness = actual_cells / expected_cells
|
||
table_completeness[table.element_id] = completeness
|
||
else:
|
||
table_completeness[table.element_id] = 0.0
|
||
else:
|
||
table_completeness[table.element_id] = 0.0
|
||
|
||
# Check overlaps and decide what to keep
|
||
filtered_elements = []
|
||
removed_charts = 0
|
||
removed_tables = 0
|
||
|
||
# Process TABLEs
|
||
for table in tables:
|
||
if not table.bbox:
|
||
filtered_elements.append(table)
|
||
continue
|
||
|
||
# Check if this TABLE overlaps with any CHART
|
||
overlaps_chart = False
|
||
for chart in charts:
|
||
if not chart.bbox:
|
||
continue
|
||
|
||
# Calculate overlap
|
||
overlap_x0 = max(table.bbox.x0, chart.bbox.x0)
|
||
overlap_y0 = max(table.bbox.y0, chart.bbox.y0)
|
||
overlap_x1 = min(table.bbox.x1, chart.bbox.x1)
|
||
overlap_y1 = min(table.bbox.y1, chart.bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
||
table_area = (table.bbox.x1 - table.bbox.x0) * (table.bbox.y1 - table.bbox.y0)
|
||
|
||
if table_area > 0:
|
||
overlap_ratio = overlap_area / table_area
|
||
|
||
if overlap_ratio >= 0.8:
|
||
overlaps_chart = True
|
||
completeness = table_completeness.get(table.element_id, 0.0)
|
||
|
||
logger.debug(
|
||
f"TABLE-CHART overlap: {table.element_id} vs {chart.element_id}: "
|
||
f"{overlap_ratio*100:.1f}% overlap, TABLE cell completeness: {completeness*100:.1f}%"
|
||
)
|
||
|
||
# Decision: Keep TABLE only if structure is complete
|
||
if completeness < 0.5: # <50% cell completeness
|
||
logger.info(
|
||
f"Removing incomplete TABLE {table.element_id} "
|
||
f"({completeness*100:.1f}% completeness, overlaps with CHART {chart.element_id})"
|
||
)
|
||
removed_tables += 1
|
||
break
|
||
else:
|
||
logger.info(
|
||
f"Keeping TABLE {table.element_id} with {completeness*100:.1f}% completeness "
|
||
f"(will remove overlapping CHART {chart.element_id})"
|
||
)
|
||
|
||
if not overlaps_chart or table_completeness.get(table.element_id, 0.0) >= 0.5:
|
||
filtered_elements.append(table)
|
||
|
||
# Process CHARTs
|
||
for chart in charts:
|
||
if not chart.bbox:
|
||
filtered_elements.append(chart)
|
||
continue
|
||
|
||
# Check if this CHART should be removed due to overlap with high-quality TABLE
|
||
should_remove = False
|
||
for table in tables:
|
||
if not table.bbox:
|
||
continue
|
||
|
||
# Calculate overlap
|
||
overlap_x0 = max(chart.bbox.x0, table.bbox.x0)
|
||
overlap_y0 = max(chart.bbox.y0, table.bbox.y0)
|
||
overlap_x1 = min(chart.bbox.x1, table.bbox.x1)
|
||
overlap_y1 = min(chart.bbox.y1, table.bbox.y1)
|
||
|
||
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
|
||
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
|
||
chart_area = (chart.bbox.x1 - chart.bbox.x0) * (chart.bbox.y1 - chart.bbox.y0)
|
||
|
||
if chart_area > 0:
|
||
overlap_ratio = overlap_area / chart_area
|
||
|
||
if overlap_ratio >= 0.8:
|
||
completeness = table_completeness.get(table.element_id, 0.0)
|
||
|
||
# Remove CHART only if TABLE structure is complete
|
||
if completeness >= 0.5:
|
||
should_remove = True
|
||
logger.info(
|
||
f"Removing CHART {chart.element_id} "
|
||
f"({overlap_ratio*100:.1f}% overlap with TABLE {table.element_id} having {completeness*100:.1f}% completeness)"
|
||
)
|
||
removed_charts += 1
|
||
break
|
||
|
||
if not should_remove:
|
||
filtered_elements.append(chart)
|
||
|
||
# Process all other elements
|
||
for elem in elements:
|
||
if elem.type not in [ElementType.TABLE, ElementType.CHART]:
|
||
filtered_elements.append(elem)
|
||
|
||
if removed_charts > 0 or removed_tables > 0:
|
||
logger.info(
|
||
f"Deduplication complete: removed {removed_tables} incomplete TABLE(s), "
|
||
f"{removed_charts} overlapping CHART(s)"
|
||
)
|
||
|
||
return filtered_elements |