Files
OCR/backend/app/services/pdf_generator_service.py
egg c65df754cf wip: add TableData.from_dict() for OCR track table parsing (incomplete)
Add TableData.from_dict() and TableCell.from_dict() methods to convert
JSON table dicts to proper TableData objects during UnifiedDocument parsing.

Modified _json_to_document_element() to detect TABLE elements with dict
content containing 'cells' key and convert to TableData.

Note: This fix ensures table elements have proper to_html() method available
but the rendered output still needs investigation - tables may still render
incorrectly in OCR track PDFs.

Files changed:
- unified_document.py: Add from_dict() class methods
- pdf_generator_service.py: Convert table dicts during JSON parsing
- Add fix-ocr-track-table-rendering proposal

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 19:16:51 +08:00

2689 lines
112 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Layout-Preserving PDF Generation Service
Generates PDF files that preserve the original document layout using OCR JSON data
"""
import json
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime
from reportlab.lib.pagesizes import A4, letter
from reportlab.lib.units import mm
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import Table, TableStyle
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
from reportlab.platypus import Paragraph
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from PIL import Image
from html.parser import HTMLParser
from app.core.config import settings
# Import UnifiedDocument for dual-track support
try:
from app.models.unified_document import (
UnifiedDocument, DocumentElement, ElementType,
BoundingBox, TableData, ProcessingTrack,
DocumentMetadata, Dimensions, Page, StyleInfo
)
UNIFIED_DOCUMENT_AVAILABLE = True
except ImportError:
UNIFIED_DOCUMENT_AVAILABLE = False
UnifiedDocument = None
logger = logging.getLogger(__name__)
class HTMLTableParser(HTMLParser):
"""Parse HTML table to extract structure and data"""
def __init__(self):
super().__init__()
self.tables = []
self.current_table = None
self.current_row = None
self.current_cell = None
self.in_table = False
def handle_starttag(self, tag, attrs):
attrs_dict = dict(attrs)
if tag == 'table':
self.in_table = True
self.current_table = {'rows': []}
elif tag == 'tr' and self.in_table:
self.current_row = {'cells': []}
elif tag in ('td', 'th') and self.in_table and self.current_row is not None:
colspan = int(attrs_dict.get('colspan', 1))
rowspan = int(attrs_dict.get('rowspan', 1))
self.current_cell = {
'text': '',
'is_header': tag == 'th',
'colspan': colspan,
'rowspan': rowspan
}
def handle_endtag(self, tag):
if tag == 'table' and self.in_table:
if self.current_table and self.current_table['rows']:
self.tables.append(self.current_table)
self.current_table = None
self.in_table = False
elif tag == 'tr' and self.current_row is not None:
if self.current_table is not None:
self.current_table['rows'].append(self.current_row)
self.current_row = None
elif tag in ('td', 'th') and self.current_cell is not None:
if self.current_row is not None:
self.current_row['cells'].append(self.current_cell)
self.current_cell = None
def handle_data(self, data):
if self.current_cell is not None:
self.current_cell['text'] += data.strip() + ' '
class PDFGeneratorService:
"""Service for generating layout-preserving PDFs from OCR JSON data"""
# Font mapping from common fonts to PDF standard fonts
FONT_MAPPING = {
'Arial': 'Helvetica',
'Arial Black': 'Helvetica-Bold',
'Times New Roman': 'Times-Roman',
'Times': 'Times-Roman',
'Courier New': 'Courier',
'Courier': 'Courier',
'Calibri': 'Helvetica',
'Cambria': 'Times-Roman',
'Georgia': 'Times-Roman',
'Verdana': 'Helvetica',
'Tahoma': 'Helvetica',
'Trebuchet MS': 'Helvetica',
'Comic Sans MS': 'Helvetica',
'Impact': 'Helvetica-Bold',
'Lucida Console': 'Courier',
'Palatino': 'Times-Roman',
'Garamond': 'Times-Roman',
'Bookman': 'Times-Roman',
'Century Gothic': 'Helvetica',
'Franklin Gothic': 'Helvetica',
}
# Style flags for text formatting
STYLE_FLAG_BOLD = 1
STYLE_FLAG_ITALIC = 2
STYLE_FLAG_UNDERLINE = 4
STYLE_FLAG_STRIKETHROUGH = 8
def __init__(self):
"""Initialize PDF generator with font configuration"""
self.font_name = 'NotoSansSC'
self.font_path = None
self.font_registered = False
self.current_processing_track = None # Track type for current document
self._register_chinese_font()
def _register_chinese_font(self):
"""Register Chinese font for PDF generation"""
try:
# Get font path from settings
font_path = Path(settings.chinese_font_path)
# Try relative path from project root
if not font_path.is_absolute():
# Adjust path - settings.chinese_font_path starts with ./backend/
project_root = Path(__file__).resolve().parent.parent.parent.parent
font_path = project_root / font_path
if not font_path.exists():
logger.error(f"Chinese font not found at {font_path}")
return
# Register font
pdfmetrics.registerFont(TTFont(self.font_name, str(font_path)))
self.font_path = font_path
self.font_registered = True
logger.info(f"Chinese font registered: {self.font_name} from {font_path}")
except Exception as e:
logger.error(f"Failed to register Chinese font: {e}")
self.font_registered = False
def _parse_color(self, color_value) -> Tuple[float, float, float]:
"""
Parse color value to RGB tuple.
Args:
color_value: Color as hex string (#RRGGBB), RGB tuple, or color name
Returns:
RGB tuple with values 0-1 for ReportLab
"""
if not color_value:
return (0, 0, 0) # Default to black
try:
# Handle hex color (#RRGGBB or #RGB)
if isinstance(color_value, str) and color_value.startswith('#'):
hex_color = color_value.lstrip('#')
# Expand short form (#RGB -> #RRGGBB)
if len(hex_color) == 3:
hex_color = ''.join([c*2 for c in hex_color])
if len(hex_color) == 6:
r = int(hex_color[0:2], 16) / 255.0
g = int(hex_color[2:4], 16) / 255.0
b = int(hex_color[4:6], 16) / 255.0
return (r, g, b)
# Handle RGB tuple or list
elif isinstance(color_value, (tuple, list)) and len(color_value) >= 3:
r, g, b = color_value[0:3]
# Normalize to 0-1 if values are 0-255
if any(v > 1 for v in [r, g, b]):
return (r/255.0, g/255.0, b/255.0)
return (r, g, b)
except (ValueError, TypeError) as e:
logger.warning(f"Failed to parse color {color_value}: {e}")
# Default to black
return (0, 0, 0)
def _map_font(self, font_name: Optional[str]) -> str:
"""
Map font name to PDF standard font.
Args:
font_name: Original font name
Returns:
PDF standard font name
"""
if not font_name:
return 'Helvetica'
# Direct lookup
if font_name in self.FONT_MAPPING:
return self.FONT_MAPPING[font_name]
# Case-insensitive lookup
font_lower = font_name.lower()
for orig_font, pdf_font in self.FONT_MAPPING.items():
if orig_font.lower() == font_lower:
return pdf_font
# Partial match for common patterns
if 'arial' in font_lower:
return 'Helvetica'
elif 'times' in font_lower:
return 'Times-Roman'
elif 'courier' in font_lower:
return 'Courier'
# Default fallback
logger.debug(f"Font '{font_name}' not found in mapping, using Helvetica")
return 'Helvetica'
def _apply_text_style(self, c: canvas.Canvas, style_info, default_size: float = 12):
"""
Apply text styling from StyleInfo to PDF canvas.
Args:
c: ReportLab canvas object
style_info: StyleInfo object or dict with font, size, color, flags
default_size: Default font size if not specified
"""
if not style_info:
# Apply default styling
c.setFont('Helvetica', default_size)
c.setFillColorRGB(0, 0, 0)
return
try:
# Extract style attributes
if hasattr(style_info, '__dict__'):
# StyleInfo object
font_family = getattr(style_info, 'font_name', None)
font_size = getattr(style_info, 'font_size', default_size)
color = getattr(style_info, 'text_color', None)
font_weight = getattr(style_info, 'font_weight', 'normal')
font_style = getattr(style_info, 'font_style', 'normal')
# Legacy flags support
flags = getattr(style_info, 'flags', 0)
elif isinstance(style_info, dict):
# Dictionary
font_family = style_info.get('font_name')
font_size = style_info.get('font_size', default_size)
color = style_info.get('text_color')
font_weight = style_info.get('font_weight', 'normal')
font_style = style_info.get('font_style', 'normal')
# Legacy flags support
flags = style_info.get('flags', 0)
else:
# Unknown format, use defaults
c.setFont('Helvetica', default_size)
c.setFillColorRGB(0, 0, 0)
return
# Map font name
base_font = self._map_font(font_family) if font_family else 'Helvetica'
# Determine bold and italic from font_weight/font_style (preferred) or flags (legacy)
is_bold = font_weight == 'bold' if font_weight else bool(flags & self.STYLE_FLAG_BOLD)
is_italic = font_style == 'italic' if font_style else bool(flags & self.STYLE_FLAG_ITALIC)
# Apply bold/italic modifiers
if is_bold or is_italic:
if is_bold and is_italic:
# Try bold-italic variant
if 'Helvetica' in base_font:
base_font = 'Helvetica-BoldOblique'
elif 'Times' in base_font:
base_font = 'Times-BoldItalic'
elif 'Courier' in base_font:
base_font = 'Courier-BoldOblique'
elif is_bold:
# Try bold variant
if 'Helvetica' in base_font:
base_font = 'Helvetica-Bold'
elif 'Times' in base_font:
base_font = 'Times-Bold'
elif 'Courier' in base_font:
base_font = 'Courier-Bold'
elif is_italic:
# Try italic variant
if 'Helvetica' in base_font:
base_font = 'Helvetica-Oblique'
elif 'Times' in base_font:
base_font = 'Times-Italic'
elif 'Courier' in base_font:
base_font = 'Courier-Oblique'
# Apply font and size
actual_size = font_size if font_size and font_size > 0 else default_size
try:
c.setFont(base_font, actual_size)
except KeyError:
# Font not available, fallback
logger.warning(f"Font '{base_font}' not available, using Helvetica")
c.setFont('Helvetica', actual_size)
# Apply color
rgb_color = None
if hasattr(style_info, 'get_rgb_color'):
# Use StyleInfo method if available
rgb_color = style_info.get_rgb_color()
elif color is not None:
# Parse from extracted color value
r, g, b = self._parse_color(color)
rgb_color = (r, g, b)
if rgb_color:
# text_color is in 0-255 range, convert to 0-1 for ReportLab
r, g, b = rgb_color
if any(v > 1 for v in [r, g, b]):
r, g, b = r/255.0, g/255.0, b/255.0
c.setFillColorRGB(r, g, b)
else:
c.setFillColorRGB(0, 0, 0) # Default black
except Exception as e:
logger.error(f"Failed to apply text style: {e}")
# Fallback to defaults
c.setFont('Helvetica', default_size)
c.setFillColorRGB(0, 0, 0)
def load_ocr_json(self, json_path: Path) -> Optional[Dict]:
"""
Load and parse OCR JSON result file
Args:
json_path: Path to JSON file
Returns:
Parsed JSON data or None if failed
"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Loaded OCR JSON: {json_path.name}")
return data
except Exception as e:
logger.error(f"Failed to load JSON {json_path}: {e}")
return None
def _get_image_path(self, element) -> Optional[str]:
"""
Get image path with fallback logic.
Checks multiple locations in order:
1. element.content["saved_path"] - Direct track saved path
2. element.content["path"] - Legacy path
3. element.content["image_path"] - Alternative path
4. element.saved_path - Direct attribute
5. element.metadata["path"] - Metadata fallback
Args:
element: DocumentElement object
Returns:
Path to image file or None if not found
"""
# Check content dictionary
if isinstance(element.content, dict):
for key in ['saved_path', 'path', 'image_path']:
if key in element.content:
return element.content[key]
# Check direct attribute
if hasattr(element, 'saved_path') and element.saved_path:
return element.saved_path
# Check metadata
if element.metadata and isinstance(element.metadata, dict):
if 'path' in element.metadata:
return element.metadata['path']
if 'saved_path' in element.metadata:
return element.metadata['saved_path']
return None
def convert_unified_document_to_ocr_data(self, unified_doc: 'UnifiedDocument') -> Dict:
"""
Convert UnifiedDocument to OCR data format for PDF generation.
This method transforms the UnifiedDocument structure into the legacy
OCR data format that the PDF generator expects, supporting both
OCR and DIRECT processing tracks.
Args:
unified_doc: UnifiedDocument object from either processing track
Returns:
Dictionary in OCR data format with text_regions, images_metadata, layout_data
"""
text_regions = []
images_metadata = []
layout_elements = []
for page in unified_doc.pages:
page_num = page.page_number # 1-based
for element in page.elements:
# Convert BoundingBox to polygon format [[x,y], [x,y], [x,y], [x,y]]
bbox_polygon = [
[element.bbox.x0, element.bbox.y0], # top-left
[element.bbox.x1, element.bbox.y0], # top-right
[element.bbox.x1, element.bbox.y1], # bottom-right
[element.bbox.x0, element.bbox.y1], # bottom-left
]
# Handle text elements
if element.is_text or element.type in [
ElementType.TEXT, ElementType.TITLE, ElementType.HEADER,
ElementType.FOOTER, ElementType.PARAGRAPH, ElementType.CAPTION,
ElementType.LIST_ITEM, ElementType.FOOTNOTE, ElementType.REFERENCE
]:
text_content = element.get_text()
if text_content:
text_region = {
'text': text_content,
'bbox': bbox_polygon,
'confidence': element.confidence or 1.0,
'page': page_num
}
# Include style information if available (for Direct track)
if hasattr(element, 'style') and element.style:
text_region['style'] = element.style
text_regions.append(text_region)
# Handle table elements
elif element.type == ElementType.TABLE:
# Convert TableData to HTML for layout_data
if isinstance(element.content, TableData):
html_content = element.content.to_html()
elif isinstance(element.content, dict):
html_content = element.content.get('html', str(element.content))
else:
html_content = str(element.content)
layout_elements.append({
'type': 'table',
'content': html_content,
'bbox': [element.bbox.x0, element.bbox.y0,
element.bbox.x1, element.bbox.y1],
'page': page_num - 1 # layout uses 0-based
})
# Add bbox to images_metadata for text overlap filtering
# (no actual image file, just bbox for filtering)
images_metadata.append({
'image_path': None, # No fake table image
'bbox': bbox_polygon,
'page': page_num - 1, # 0-based for images_metadata
'type': 'table',
'element_id': element.element_id
})
# Handle image/visual elements
elif element.is_visual or element.type in [
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
ElementType.DIAGRAM, ElementType.LOGO
]:
# Get image path using fallback logic
image_path = self._get_image_path(element)
# Only add if we found a valid path
if image_path:
images_metadata.append({
'image_path': image_path,
'bbox': bbox_polygon,
'page': page_num - 1, # 0-based
'type': element.type.value
})
logger.debug(f"Found image path: {image_path} for element {element.element_id}")
else:
logger.warning(f"No image path found for visual element {element.element_id}")
# Build page dimensions mapping for multi-page support
page_dimensions = {}
for page in unified_doc.pages:
page_dimensions[page.page_number - 1] = { # 0-based index
'width': page.dimensions.width,
'height': page.dimensions.height
}
# Build OCR data structure
ocr_data = {
'text_regions': text_regions,
'images_metadata': images_metadata,
'layout_data': {
'elements': layout_elements,
'total_elements': len(layout_elements)
},
'total_pages': unified_doc.page_count,
'ocr_dimensions': {
'width': unified_doc.pages[0].dimensions.width if unified_doc.pages else 0,
'height': unified_doc.pages[0].dimensions.height if unified_doc.pages else 0
},
'page_dimensions': page_dimensions, # Per-page dimensions for multi-page support
# Metadata for tracking
'_from_unified_document': True,
'_processing_track': unified_doc.metadata.processing_track.value
}
logger.info(f"Converted UnifiedDocument to OCR data: "
f"{len(text_regions)} text regions, "
f"{len(images_metadata)} images, "
f"{len(layout_elements)} layout elements, "
f"track={unified_doc.metadata.processing_track.value}")
return ocr_data
def generate_from_unified_document(
self,
unified_doc: 'UnifiedDocument',
output_path: Path,
source_file_path: Optional[Path] = None
) -> bool:
"""
Generate layout-preserving PDF directly from UnifiedDocument.
This method supports both OCR and DIRECT processing tracks,
preserving layout and coordinate information from either source.
Args:
unified_doc: UnifiedDocument object
output_path: Path to save generated PDF
source_file_path: Optional path to original source file
Returns:
True if successful, False otherwise
"""
if not UNIFIED_DOCUMENT_AVAILABLE:
logger.error("UnifiedDocument support not available")
return False
try:
# Detect processing track for track-specific rendering
processing_track = None
if hasattr(unified_doc, 'metadata') and unified_doc.metadata:
if hasattr(unified_doc.metadata, 'processing_track'):
processing_track = unified_doc.metadata.processing_track
elif isinstance(unified_doc.metadata, dict):
processing_track = unified_doc.metadata.get('processing_track')
# Route to track-specific rendering method
# ProcessingTrack is (str, Enum), so comparing with enum value works for both string and enum
# HYBRID track uses Direct track rendering (Direct text/tables + OCR images)
is_direct_track = (processing_track == ProcessingTrack.DIRECT or
processing_track == ProcessingTrack.HYBRID)
logger.info(f"Processing track: {processing_track}, using {'Direct' if is_direct_track else 'OCR'} track rendering")
if is_direct_track:
# Direct track: Rich formatting preservation
return self._generate_direct_track_pdf(
unified_doc=unified_doc,
output_path=output_path,
source_file_path=source_file_path
)
else:
# OCR track: Simplified rendering (backward compatible)
return self._generate_ocr_track_pdf(
unified_doc=unified_doc,
output_path=output_path,
source_file_path=source_file_path
)
except Exception as e:
logger.error(f"Failed to generate PDF from UnifiedDocument: {e}")
import traceback
traceback.print_exc()
return False
def _is_element_inside_regions(self, element_bbox, regions_elements, overlap_threshold=0.5) -> bool:
"""
Check if an element overlaps significantly with any exclusion region (table, image).
This prevents duplicate rendering when text overlaps with tables/images.
Direct extraction often extracts both the structured element (table/image)
AND its text content as separate text blocks.
Uses overlap ratio detection instead of strict containment, since text blocks
from DirectExtractionEngine may be larger than detected table/image regions
(e.g., text block includes heading above table).
Args:
element_bbox: BBox of the element to check
regions_elements: List of region elements (tables, images) to check against
overlap_threshold: Minimum overlap percentage to trigger filtering (default 0.5 = 50%)
Returns:
True if element overlaps ≥50% with any region, False otherwise
"""
if not element_bbox:
return False
e_x0, e_y0, e_x1, e_y1 = element_bbox.x0, element_bbox.y0, element_bbox.x1, element_bbox.y1
elem_area = (e_x1 - e_x0) * (e_y1 - e_y0)
if elem_area <= 0:
return False
for region in regions_elements:
r_bbox = region.bbox
if not r_bbox:
continue
# Calculate overlap rectangle
overlap_x0 = max(e_x0, r_bbox.x0)
overlap_y0 = max(e_y0, r_bbox.y0)
overlap_x1 = min(e_x1, r_bbox.x1)
overlap_y1 = min(e_y1, r_bbox.y1)
# Check if there is any overlap
if overlap_x0 < overlap_x1 and overlap_y0 < overlap_y1:
# Calculate overlap area
overlap_area = (overlap_x1 - overlap_x0) * (overlap_y1 - overlap_y0)
overlap_ratio = overlap_area / elem_area
# If element overlaps more than threshold, filter it out
if overlap_ratio >= overlap_threshold:
return True
return False
def _generate_direct_track_pdf(
self,
unified_doc: 'UnifiedDocument',
output_path: Path,
source_file_path: Optional[Path] = None
) -> bool:
"""
Generate PDF with rich formatting preservation for Direct track.
This method processes UnifiedDocument directly without converting to
legacy OCR format, preserving StyleInfo and applying proper text
formatting including line breaks.
Args:
unified_doc: UnifiedDocument from Direct extraction
output_path: Path to save generated PDF
source_file_path: Optional path to original source file
Returns:
True if successful, False otherwise
"""
try:
logger.info("=== Direct Track PDF Generation ===")
logger.info(f"Total pages: {len(unified_doc.pages)}")
# Set current track for helper methods (may be DIRECT or HYBRID)
if hasattr(unified_doc, 'metadata') and unified_doc.metadata:
self.current_processing_track = unified_doc.metadata.processing_track
else:
self.current_processing_track = ProcessingTrack.DIRECT
# Get page dimensions from first page (for canvas initialization)
if not unified_doc.pages:
logger.error("No pages in document")
return False
first_page = unified_doc.pages[0]
page_width = first_page.dimensions.width
page_height = first_page.dimensions.height
logger.info(f"First page dimensions: {page_width} x {page_height}")
# Create PDF canvas with first page dimensions (will be updated per page)
from reportlab.pdfgen import canvas
pdf_canvas = canvas.Canvas(str(output_path), pagesize=(page_width, page_height))
# Process each page
for page_idx, page in enumerate(unified_doc.pages):
logger.info(f">>> Processing page {page_idx + 1}/{len(unified_doc.pages)}")
# Get current page dimensions
current_page_width = page.dimensions.width
current_page_height = page.dimensions.height
logger.info(f"Page {page_idx + 1} dimensions: {current_page_width} x {current_page_height}")
if page_idx > 0:
pdf_canvas.showPage()
# Set page size for current page
pdf_canvas.setPageSize((current_page_width, current_page_height))
# Separate elements by type
text_elements = []
table_elements = []
image_elements = []
list_elements = []
# FIX: Collect exclusion regions (tables, images) to prevent duplicate rendering
regions_to_avoid = []
for element in page.elements:
if element.type == ElementType.TABLE:
table_elements.append(element)
regions_to_avoid.append(element) # Tables are exclusion regions
elif element.is_visual or element.type in [
ElementType.IMAGE, ElementType.FIGURE,
ElementType.CHART, ElementType.DIAGRAM, ElementType.LOGO
]:
image_elements.append(element)
# Only add real images to exclusion regions, NOT charts/diagrams
# Charts often have large bounding boxes that include text labels
# which should be rendered as selectable text on top
if element.type in [ElementType.IMAGE, ElementType.FIGURE, ElementType.LOGO]:
regions_to_avoid.append(element)
elif element.type == ElementType.LIST_ITEM:
list_elements.append(element)
elif self._is_list_item_fallback(element):
# Fallback detection: Check metadata and text patterns
list_elements.append(element)
# Mark as list item for downstream processing
element.type = ElementType.LIST_ITEM
elif element.is_text or element.type in [
ElementType.TEXT, ElementType.TITLE, ElementType.HEADER,
ElementType.FOOTER, ElementType.PARAGRAPH
]:
text_elements.append(element)
logger.info(f"Page {page_idx + 1}: {len(text_elements)} text, "
f"{len(table_elements)} tables, {len(image_elements)} images, "
f"{len(list_elements)} list items")
# Use original element order from extraction engine
# The extraction engine has already sorted elements by reading order,
# handling multi-column layouts correctly (top-to-bottom, left-to-right)
all_elements = []
# Preserve original order by iterating through page.elements
for elem in page.elements:
if elem in image_elements:
all_elements.append(('image', elem))
elif elem in table_elements:
all_elements.append(('table', elem))
elif elem in list_elements:
all_elements.append(('list', elem))
elif elem in text_elements:
all_elements.append(('text', elem))
logger.debug(f"Drawing {len(all_elements)} elements in extraction order (preserves multi-column reading order)")
logger.debug(f"Exclusion regions: {len(regions_to_avoid)} (tables/images/charts)")
# Debug: Log exclusion region types
region_types = {}
for region in regions_to_avoid:
region_type = region.type.name
region_types[region_type] = region_types.get(region_type, 0) + 1
if region_types:
logger.debug(f" Exclusion region breakdown: {region_types}")
# Draw elements in document order
for elem_type, elem in all_elements:
if elem_type == 'image':
self._draw_image_element_direct(pdf_canvas, elem, current_page_height, output_path.parent)
elif elem_type == 'table':
self._draw_table_element_direct(pdf_canvas, elem, current_page_height)
elif elem_type == 'list':
# FIX: Check if list item overlaps with table/image
if not self._is_element_inside_regions(elem.bbox, regions_to_avoid):
self._draw_text_element_direct(pdf_canvas, elem, current_page_height)
else:
logger.debug(f"Skipping list element {elem.element_id} inside table/image region")
elif elem_type == 'text':
# FIX: Check if text overlaps with table/image before drawing
if not self._is_element_inside_regions(elem.bbox, regions_to_avoid):
self._draw_text_element_direct(pdf_canvas, elem, current_page_height)
else:
logger.debug(f"Skipping text element {elem.element_id} inside table/image region")
# Save PDF
pdf_canvas.save()
logger.info(f"Direct track PDF saved to {output_path}")
# Reset track
self.current_processing_track = None
return True
except Exception as e:
logger.error(f"Failed to generate Direct track PDF: {e}")
import traceback
traceback.print_exc()
self.current_processing_track = None
return False
def _generate_ocr_track_pdf(
self,
unified_doc: 'UnifiedDocument',
output_path: Path,
source_file_path: Optional[Path] = None
) -> bool:
"""
Generate PDF with simplified rendering for OCR track.
This method uses the existing OCR data conversion and rendering
pipeline for backward compatibility.
Args:
unified_doc: UnifiedDocument from OCR processing
output_path: Path to save generated PDF
source_file_path: Optional path to original source file
Returns:
True if successful, False otherwise
"""
try:
logger.info("=== OCR Track PDF Generation ===")
# Set current track
self.current_processing_track = 'ocr'
# Convert UnifiedDocument to OCR data format (legacy)
ocr_data = self.convert_unified_document_to_ocr_data(unified_doc)
# Use existing generation pipeline
result = self._generate_pdf_from_data(
ocr_data=ocr_data,
output_path=output_path,
source_file_path=source_file_path
)
# Reset track
self.current_processing_track = None
return result
except Exception as e:
logger.error(f"Failed to generate OCR track PDF: {e}")
import traceback
traceback.print_exc()
self.current_processing_track = None
return False
def _generate_pdf_from_data(
self,
ocr_data: Dict,
output_path: Path,
source_file_path: Optional[Path] = None,
json_parent_dir: Optional[Path] = None
) -> bool:
"""
Internal method to generate PDF from OCR data dictionary.
This is the core generation logic extracted for reuse by both
JSON-based and UnifiedDocument-based generation paths.
Args:
ocr_data: OCR data dictionary
output_path: Path to save generated PDF
source_file_path: Optional path to original source file
json_parent_dir: Directory containing images (for JSON-based generation)
Returns:
True if successful, False otherwise
"""
try:
# Check if PDF already exists (caching)
if output_path.exists():
logger.info(f"PDF already exists: {output_path.name}")
return True
# Get text regions
text_regions = ocr_data.get('text_regions', [])
if not text_regions:
logger.warning("No text regions found in data")
# Don't fail - might have only tables/images
# Get images metadata
images_metadata = ocr_data.get('images_metadata', [])
# Get layout data
layout_data = ocr_data.get('layout_data', {})
# Step 1: Get OCR processing dimensions (for first page / default)
ocr_width, ocr_height = self.calculate_page_dimensions(ocr_data, source_file_path=None)
logger.info(f"OCR 處理時使用的座標系尺寸 (第一頁): {ocr_width:.1f} x {ocr_height:.1f}")
# Step 2: Get page dimensions mapping for multi-page support
page_dimensions = ocr_data.get('page_dimensions', {})
if not page_dimensions:
# Fallback: use first page dimensions for all pages
page_dimensions = {0: {'width': ocr_width, 'height': ocr_height}}
logger.info("No page_dimensions found, using first page size for all pages")
# Step 3: Get original file dimensions for all pages
original_page_sizes = {}
if source_file_path:
original_page_sizes = self.get_all_page_sizes(source_file_path)
if original_page_sizes:
logger.info(f"從原始文件獲取到 {len(original_page_sizes)} 頁尺寸")
else:
logger.warning(f"無法獲取原始文件尺寸,將使用 OCR/UnifiedDocument 尺寸")
else:
logger.info(f"無原始文件,將使用 OCR/UnifiedDocument 尺寸")
# Determine initial canvas size (will be updated per page)
# Priority: original file first page > OCR/UnifiedDocument first page
if 0 in original_page_sizes:
target_width, target_height = original_page_sizes[0]
logger.info(f"初始 PDF 尺寸(來自原始文件首頁): {target_width:.1f} x {target_height:.1f}")
else:
target_width, target_height = ocr_width, ocr_height
logger.info(f"初始 PDF 尺寸(來自 OCR/UnifiedDocument: {target_width:.1f} x {target_height:.1f}")
# Create PDF canvas with initial page size (will be updated per page)
pdf_canvas = canvas.Canvas(str(output_path), pagesize=(target_width, target_height))
# Filter text regions to avoid overlap with tables/images
regions_to_avoid = images_metadata
table_count = len([img for img in images_metadata if img.get('type') == 'table'])
logger.info(f"過濾文字區域: {len(regions_to_avoid)} 個區域需要避免 (含 {table_count} 個表格)")
filtered_text_regions = self._filter_text_in_regions(text_regions, regions_to_avoid)
# Group regions by page
pages_data = {}
for region in filtered_text_regions:
page_num = region.get('page', 1)
if page_num not in pages_data:
pages_data[page_num] = []
pages_data[page_num].append(region)
# Get table elements from layout_data
table_elements = []
if layout_data and layout_data.get('elements'):
table_elements = [e for e in layout_data['elements'] if e.get('type') == 'table']
# Process each page
total_pages = ocr_data.get('total_pages', 1)
logger.info(f"開始處理 {total_pages} 頁 PDF")
# Determine image directory
if json_parent_dir is None:
json_parent_dir = output_path.parent
for page_num in range(1, total_pages + 1):
logger.info(f">>> 處理第 {page_num}/{total_pages}")
# Get current page dimensions with priority order:
# 1. Original file dimensions (highest priority)
# 2. OCR/UnifiedDocument dimensions
# 3. Fallback to first page dimensions
page_idx = page_num - 1
dimension_source = "unknown"
# Priority 1: Original file dimensions
if page_idx in original_page_sizes:
current_target_w, current_target_h = original_page_sizes[page_idx]
dimension_source = "original_file"
# Priority 2: OCR/UnifiedDocument dimensions
elif page_idx in page_dimensions:
current_page_dims = page_dimensions[page_idx]
current_target_w = float(current_page_dims['width'])
current_target_h = float(current_page_dims['height'])
dimension_source = "ocr_unified_doc"
# Priority 3: Fallback to first page
else:
current_target_w = ocr_width
current_target_h = ocr_height
dimension_source = "fallback_first_page"
logger.warning(f"No dimensions for page {page_num}, using first page size")
# Calculate scale factors for coordinate transformation
# OCR coordinates need to be scaled if original file dimensions differ
if dimension_source == "original_file":
# Get OCR dimensions for this page to calculate scale
if page_idx in page_dimensions:
ocr_page_w = float(page_dimensions[page_idx]['width'])
ocr_page_h = float(page_dimensions[page_idx]['height'])
else:
ocr_page_w = ocr_width
ocr_page_h = ocr_height
current_scale_w = current_target_w / ocr_page_w if ocr_page_w > 0 else 1.0
current_scale_h = current_target_h / ocr_page_h if ocr_page_h > 0 else 1.0
else:
# Using OCR/UnifiedDocument dimensions directly, no scaling needed
current_scale_w = 1.0
current_scale_h = 1.0
logger.info(f"{page_num} 頁尺寸: {current_target_w:.1f} x {current_target_h:.1f} "
f"(來源: {dimension_source}, 縮放: {current_scale_w:.3f}x{current_scale_h:.3f})")
if page_num > 1:
pdf_canvas.showPage()
# Set page size for current page
pdf_canvas.setPageSize((current_target_w, current_target_h))
# Get regions for this page
page_text_regions = pages_data.get(page_num, [])
page_table_regions = [t for t in table_elements if t.get('page') == page_num - 1]
page_image_regions = [
img for img in images_metadata
if img.get('page') == page_num - 1
and img.get('type') != 'table'
and img.get('image_path') is not None # Skip table placeholders
]
# Draw in layers: images → tables → text
# 1. Draw images (bottom layer)
for img_meta in page_image_regions:
self.draw_image_region(
pdf_canvas, img_meta, current_target_h,
json_parent_dir, current_scale_w, current_scale_h
)
# 2. Draw tables (middle layer)
for table_elem in page_table_regions:
self.draw_table_region(
pdf_canvas, table_elem, images_metadata,
current_target_h, current_scale_w, current_scale_h
)
# 3. Draw text (top layer)
for region in page_text_regions:
self.draw_text_region(
pdf_canvas, region, current_target_h,
current_scale_w, current_scale_h
)
logger.info(f"<<< 第 {page_num} 頁完成")
# Save PDF
pdf_canvas.save()
file_size = output_path.stat().st_size
logger.info(f"Generated PDF: {output_path.name} ({file_size} bytes)")
return True
except Exception as e:
logger.error(f"Failed to generate PDF: {e}")
import traceback
traceback.print_exc()
return False
def calculate_page_dimensions(self, ocr_data: Dict, source_file_path: Optional[Path] = None) -> Tuple[float, float]:
"""
從 OCR JSON 數據中取得頁面尺寸。
優先使用明確的 dimensions 欄位,失敗時才回退到 bbox 推斷。
Args:
ocr_data: Complete OCR data dictionary with text_regions and layout
source_file_path: Optional path to source file (fallback only)
Returns:
Tuple of (width, height) in points
"""
# *** 優先級 1: 檢查 ocr_dimensions (UnifiedDocument 轉換來的) ***
if 'ocr_dimensions' in ocr_data:
dims = ocr_data['ocr_dimensions']
# Handle both dict format {'width': w, 'height': h} and
# list format [{'page': 1, 'width': w, 'height': h}, ...]
if isinstance(dims, list) and len(dims) > 0:
dims = dims[0] # Use first page dimensions
if isinstance(dims, dict):
w = float(dims.get('width', 0))
h = float(dims.get('height', 0))
if w > 0 and h > 0:
logger.info(f"使用 ocr_dimensions 欄位的頁面尺寸: {w:.1f} x {h:.1f}")
return (w, h)
# *** 優先級 2: 檢查原始 JSON 的 dimensions ***
if 'dimensions' in ocr_data:
dims = ocr_data['dimensions']
w = float(dims.get('width', 0))
h = float(dims.get('height', 0))
if w > 0 and h > 0:
logger.info(f"使用 dimensions 欄位的頁面尺寸: {w:.1f} x {h:.1f}")
return (w, h)
# *** 優先級 3: Fallback - 從 bbox 推斷 (僅當上述皆缺失時使用) ***
logger.info("dimensions 欄位不可用,回退到 bbox 推斷")
max_x = 0
max_y = 0
# *** 關鍵修復:檢查所有可能包含 bbox 的字段 ***
# 不同版本的 OCR 輸出可能使用不同的字段名
all_regions = []
# 1. text_regions - 包含所有文字區域(最常見)
if 'text_regions' in ocr_data and isinstance(ocr_data['text_regions'], list):
all_regions.extend(ocr_data['text_regions'])
# 2. image_regions - 包含圖片區域
if 'image_regions' in ocr_data and isinstance(ocr_data['image_regions'], list):
all_regions.extend(ocr_data['image_regions'])
# 3. tables - 包含表格區域
if 'tables' in ocr_data and isinstance(ocr_data['tables'], list):
all_regions.extend(ocr_data['tables'])
# 4. layout - 可能包含布局信息(可能是空列表)
if 'layout' in ocr_data and isinstance(ocr_data['layout'], list):
all_regions.extend(ocr_data['layout'])
# 5. layout_data.elements - PP-StructureV3 格式
if 'layout_data' in ocr_data and isinstance(ocr_data['layout_data'], dict):
elements = ocr_data['layout_data'].get('elements', [])
if elements:
all_regions.extend(elements)
if not all_regions:
# 如果 JSON 為空,回退到原始檔案尺寸
logger.warning("JSON 中沒有找到 text_regions, image_regions, tables, layout 或 layout_data.elements回退到原始檔案尺寸。")
if source_file_path:
dims = self.get_original_page_size(source_file_path)
if dims:
return dims
return A4
region_count = 0
for region in all_regions:
try:
bbox = region.get('bbox')
if not bbox:
continue
region_count += 1
# *** 關鍵修復:正確處理多邊形 [[x, y], ...] 格式 ***
if isinstance(bbox[0], (int, float)):
# 處理簡單的 [x1, y1, x2, y2] 格式
max_x = max(max_x, bbox[2])
max_y = max(max_y, bbox[3])
elif isinstance(bbox[0], (list, tuple)):
# 處理多邊形 [[x, y], ...] 格式
x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
if x_coords and y_coords:
max_x = max(max_x, max(x_coords))
max_y = max(max_y, max(y_coords))
except Exception as e:
logger.warning(f"Error processing bbox {bbox}: {e}")
if max_x > 0 and max_y > 0:
logger.info(f"{region_count} 個區域中推斷出的 OCR 座標系尺寸: {max_x:.1f} x {max_y:.1f}")
return (max_x, max_y)
else:
# 如果所有 bbox 都解析失敗,才回退
logger.warning("無法從 bbox 推斷尺寸,回退到原始檔案尺寸。")
if source_file_path:
dims = self.get_original_page_size(source_file_path)
if dims:
return dims
return A4
def get_all_page_sizes(self, file_path: Path) -> Dict[int, Tuple[float, float]]:
"""
Extract dimensions for all pages from original source file
Args:
file_path: Path to original file (image or PDF)
Returns:
Dict mapping page index (0-based) to (width, height) in points
Empty dict if extraction fails
"""
page_sizes = {}
try:
if not file_path.exists():
logger.warning(f"File not found: {file_path}")
return page_sizes
# For images, single page with dimensions from PIL
if file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
img = Image.open(file_path)
# Use pixel dimensions directly as points (1:1 mapping)
# This matches how PaddleOCR reports coordinates
width_pt = float(img.width)
height_pt = float(img.height)
page_sizes[0] = (width_pt, height_pt)
logger.info(f"Extracted dimensions from image: {width_pt:.1f} x {height_pt:.1f} points (1:1 pixel mapping)")
return page_sizes
# For PDFs, extract dimensions for all pages using PyPDF2
if file_path.suffix.lower() == '.pdf':
try:
from PyPDF2 import PdfReader
reader = PdfReader(file_path)
total_pages = len(reader.pages)
for page_idx in range(total_pages):
page = reader.pages[page_idx]
# MediaBox gives [x1, y1, x2, y2] in points
mediabox = page.mediabox
width_pt = float(mediabox.width)
height_pt = float(mediabox.height)
page_sizes[page_idx] = (width_pt, height_pt)
logger.info(f"Extracted dimensions from PDF: {total_pages} pages")
for idx, (w, h) in page_sizes.items():
logger.debug(f" Page {idx}: {w:.1f} x {h:.1f} points")
return page_sizes
except ImportError:
logger.warning("PyPDF2 not available, cannot extract PDF dimensions")
except Exception as e:
logger.warning(f"Failed to extract PDF dimensions: {e}")
except Exception as e:
logger.warning(f"Failed to get page sizes from {file_path}: {e}")
return page_sizes
def get_original_page_size(self, file_path: Path) -> Optional[Tuple[float, float]]:
"""
Extract first page dimensions from original source file (backward compatibility)
Args:
file_path: Path to original file (image or PDF)
Returns:
Tuple of (width, height) in points or None
"""
page_sizes = self.get_all_page_sizes(file_path)
if 0 in page_sizes:
return page_sizes[0]
return None
def _get_bbox_coords(self, bbox: Union[List[List[float]], List[float]]) -> Optional[Tuple[float, float, float, float]]:
"""將任何 bbox 格式 (多邊形或 [x1,y1,x2,y2]) 轉換為 [x_min, y_min, x_max, y_max]"""
try:
if isinstance(bbox[0], (list, tuple)):
# 處理多邊形 [[x, y], ...]
x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
if not x_coords or not y_coords:
return None
return min(x_coords), min(y_coords), max(x_coords), max(y_coords)
elif isinstance(bbox[0], (int, float)) and len(bbox) == 4:
# 處理 [x1, y1, x2, y2]
return bbox[0], bbox[1], bbox[2], bbox[3]
else:
logger.warning(f"未知的 bbox 格式: {bbox}")
return None
except Exception as e:
logger.error(f"解析 bbox {bbox} 時出錯: {e}")
return None
def _is_bbox_inside(self, inner_bbox_data: Dict, outer_bbox_data: Dict, tolerance: float = 5.0) -> bool:
"""
檢查 'inner_bbox' 是否在 'outer_bbox' 內部(帶有容錯)。
此版本可處理多邊形和矩形。
"""
inner_coords = self._get_bbox_coords(inner_bbox_data.get('bbox'))
outer_coords = self._get_bbox_coords(outer_bbox_data.get('bbox'))
if not inner_coords or not outer_coords:
return False
inner_x1, inner_y1, inner_x2, inner_y2 = inner_coords
outer_x1, outer_y1, outer_x2, outer_y2 = outer_coords
# 檢查 inner 是否在 outer 內部 (加入 tolerance)
is_inside = (
(inner_x1 >= outer_x1 - tolerance) and
(inner_y1 >= outer_y1 - tolerance) and
(inner_x2 <= outer_x2 + tolerance) and
(inner_y2 <= outer_y2 + tolerance)
)
return is_inside
def _bbox_overlaps(self, bbox1_data: Dict, bbox2_data: Dict, tolerance: float = 5.0) -> bool:
"""
檢查兩個 bbox 是否有重疊(帶有容錯)。
如果有任何重疊,返回 True。
Args:
bbox1_data: 第一個 bbox 數據
bbox2_data: 第二個 bbox 數據
tolerance: 容錯值(像素)
Returns:
True 如果兩個 bbox 有重疊
"""
coords1 = self._get_bbox_coords(bbox1_data.get('bbox'))
coords2 = self._get_bbox_coords(bbox2_data.get('bbox'))
if not coords1 or not coords2:
return False
x1_min, y1_min, x1_max, y1_max = coords1
x2_min, y2_min, x2_max, y2_max = coords2
# 擴展 bbox2表格/圖片區域)的範圍
x2_min -= tolerance
y2_min -= tolerance
x2_max += tolerance
y2_max += tolerance
# 檢查是否有重疊:如果沒有重疊,則必定滿足以下條件之一
no_overlap = (
x1_max < x2_min or # bbox1 在 bbox2 左側
x1_min > x2_max or # bbox1 在 bbox2 右側
y1_max < y2_min or # bbox1 在 bbox2 上方
y1_min > y2_max # bbox1 在 bbox2 下方
)
return not no_overlap
def _filter_text_in_regions(self, text_regions: List[Dict], regions_to_avoid: List[Dict], tolerance: float = 10.0) -> List[Dict]:
"""
過濾掉與 'regions_to_avoid'(例如表格、圖片)重疊的文字區域。
Args:
text_regions: 文字區域列表
regions_to_avoid: 需要避免的區域列表(表格、圖片)
tolerance: 容錯值(像素),增加到 10.0 以更好地處理邊界情況
Returns:
過濾後的文字區域列表
"""
filtered_text = []
filtered_count = 0
for text_region in text_regions:
should_filter = False
for avoid_region in regions_to_avoid:
# 使用重疊檢測:只要有任何重疊就過濾掉
if self._bbox_overlaps(text_region, avoid_region, tolerance=tolerance):
should_filter = True
filtered_count += 1
logger.debug(f"過濾掉重疊文字: {text_region.get('text', '')[:20]}...")
break # 找到一個重疊區域就足夠了
if not should_filter:
filtered_text.append(text_region)
logger.info(f"原始文字區域: {len(text_regions)}, 過濾後: {len(filtered_text)}, 移除: {filtered_count}")
return filtered_text
def draw_text_region(
self,
pdf_canvas: canvas.Canvas,
region: Dict,
page_height: float,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""
Draw a text region at precise coordinates
Args:
pdf_canvas: ReportLab canvas object
region: Text region dict with text, bbox, confidence
page_height: Height of page (for coordinate transformation)
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
"""
text = region.get('text', '')
bbox = region.get('bbox', [])
confidence = region.get('confidence', 1.0)
if not text or not bbox or len(bbox) < 4:
return
try:
# bbox from OCR: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
# Points: top-left, top-right, bottom-right, bottom-left
# OCR coordinates: origin (0,0) at top-left, Y increases downward
ocr_x_left = bbox[0][0] # Left X
ocr_y_top = bbox[0][1] # Top Y in OCR coordinates
ocr_x_right = bbox[2][0] # Right X
ocr_y_bottom = bbox[2][1] # Bottom Y in OCR coordinates
logger.info(f"[文字] '{text[:20]}...' OCR原始座標: L={ocr_x_left:.0f}, T={ocr_y_top:.0f}, R={ocr_x_right:.0f}, B={ocr_y_bottom:.0f}")
# Apply scale factors to convert from OCR space to PDF space
scaled_x_left = ocr_x_left * scale_w
scaled_y_top = ocr_y_top * scale_h
scaled_x_right = ocr_x_right * scale_w
scaled_y_bottom = ocr_y_bottom * scale_h
logger.info(f"[文字] '{text[:20]}...' 縮放後(scale={scale_w:.3f},{scale_h:.3f}): L={scaled_x_left:.1f}, T={scaled_y_top:.1f}, R={scaled_x_right:.1f}, B={scaled_y_bottom:.1f}")
# Calculate bbox dimensions (after scaling)
bbox_width = abs(scaled_x_right - scaled_x_left)
bbox_height = abs(scaled_y_bottom - scaled_y_top)
# Calculate font size using heuristics
# For multi-line text, divide bbox height by number of lines
lines = text.split('\n')
non_empty_lines = [l for l in lines if l.strip()]
num_lines = max(len(non_empty_lines), 1)
# Font size = bbox_height / num_lines * factor
# Use 0.8 factor to leave room for line spacing
font_size = (bbox_height / num_lines) * 0.8
font_size = max(min(font_size, 72), 4) # Clamp between 4pt and 72pt
logger.debug(f"Text has {num_lines} non-empty lines, bbox_height={bbox_height:.1f}, calculated font_size={font_size:.1f}")
# Transform coordinates: OCR (top-left origin) → PDF (bottom-left origin)
# CRITICAL: Y-axis flip!
# For multi-line text, start from TOP of bbox and go downward
pdf_x = scaled_x_left
pdf_y_top = page_height - scaled_y_top # Top of bbox in PDF coordinates
# Adjust for font baseline: first line starts below the top edge
pdf_y = pdf_y_top - font_size # Start first line one font size below top
logger.info(f"[文字] '{text[:30]}' → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 字體:{font_size:.1f}pt, 寬x高:{bbox_width:.0f}x{bbox_height:.0f}, 行數:{num_lines}")
# Set font with track-specific styling
# Note: OCR track has no StyleInfo (extracted from images), so no advanced formatting
style_info = region.get('style')
is_direct_track = (self.current_processing_track == ProcessingTrack.DIRECT or
self.current_processing_track == ProcessingTrack.HYBRID)
if style_info and is_direct_track:
# Direct track: Apply rich styling from StyleInfo
self._apply_text_style(pdf_canvas, style_info, default_size=font_size)
# Get current font for width calculation
font_name = pdf_canvas._fontname
font_size = pdf_canvas._fontsize
logger.debug(f"Applied Direct track style: font={font_name}, size={font_size}")
else:
# OCR track or no style: Use simple font selection
font_name = self.font_name if self.font_registered else 'Helvetica'
pdf_canvas.setFont(font_name, font_size)
# Handle line breaks (split text by newlines)
# OCR track: simple left-aligned rendering
# Note: non_empty_lines was already calculated above for font sizing
line_height = font_size * 1.2 # 120% of font size for line spacing
# Draw each non-empty line (using proper line index for positioning)
for i, line in enumerate(non_empty_lines):
line_y = pdf_y - (i * line_height)
# Calculate text width to prevent overflow
text_width = pdf_canvas.stringWidth(line, font_name, font_size)
# If text is too wide for bbox, scale down font for this line
current_font_size = font_size
if text_width > bbox_width:
scale_factor = bbox_width / text_width
current_font_size = font_size * scale_factor * 0.95 # 95% to add small margin
current_font_size = max(current_font_size, 3) # Minimum 3pt
pdf_canvas.setFont(font_name, current_font_size)
# Draw text at left-aligned position (OCR track uses simple left alignment)
pdf_canvas.drawString(pdf_x, line_y, line)
# Reset font size for next line
if text_width > bbox_width:
pdf_canvas.setFont(font_name, font_size)
# Debug: Draw bounding box (optional)
if settings.pdf_enable_bbox_debug:
pdf_canvas.setStrokeColorRGB(1, 0, 0, 0.3) # Red, semi-transparent
pdf_canvas.setLineWidth(0.5)
# Transform all bbox points to PDF coordinates (apply scaling first)
pdf_points = [(p[0] * scale_w, page_height - p[1] * scale_h) for p in bbox]
# Draw bbox rectangle
for i in range(4):
x1, y1 = pdf_points[i]
x2, y2 = pdf_points[(i + 1) % 4]
pdf_canvas.line(x1, y1, x2, y2)
except Exception as e:
logger.warning(f"Failed to draw text region '{text[:20]}...': {e}")
def draw_table_region(
self,
pdf_canvas: canvas.Canvas,
table_element: Dict,
images_metadata: List[Dict],
page_height: float,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""
Draw a table region by parsing HTML and rebuilding with ReportLab Table
Args:
pdf_canvas: ReportLab canvas object
table_element: Table element dict with HTML content
images_metadata: List of image metadata to find table bbox
page_height: Height of page
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
"""
try:
html_content = table_element.get('content', '')
if not html_content:
return
# Parse HTML to extract table structure
parser = HTMLTableParser()
parser.feed(html_content)
if not parser.tables:
logger.warning("No tables found in HTML content")
return
# Get the first table (PP-StructureV3 usually provides one table per element)
table_data = parser.tables[0]
rows = table_data['rows']
if not rows:
return
# Get bbox directly from table element
table_bbox = table_element.get('bbox')
# If no bbox directly, check for bbox_polygon
if not table_bbox:
bbox_polygon = table_element.get('bbox_polygon')
if bbox_polygon and len(bbox_polygon) >= 4:
# Convert polygon format to simple bbox [x0, y0, x1, y1]
table_bbox = [
bbox_polygon[0][0], # x0
bbox_polygon[0][1], # y0
bbox_polygon[2][0], # x1
bbox_polygon[2][1] # y1
]
if not table_bbox:
logger.warning(f"No bbox found for table element")
return
# Handle different bbox formats
if isinstance(table_bbox, list) and len(table_bbox) == 4:
# Simple bbox format [x0, y0, x1, y1]
if isinstance(table_bbox[0], (int, float)):
ocr_x_left_raw = table_bbox[0]
ocr_y_top_raw = table_bbox[1]
ocr_x_right_raw = table_bbox[2]
ocr_y_bottom_raw = table_bbox[3]
# Polygon format [[x,y], [x,y], [x,y], [x,y]]
elif isinstance(table_bbox[0], list):
ocr_x_left_raw = table_bbox[0][0]
ocr_y_top_raw = table_bbox[0][1]
ocr_x_right_raw = table_bbox[2][0]
ocr_y_bottom_raw = table_bbox[2][1]
else:
logger.error(f"Unexpected bbox format: {table_bbox}")
return
else:
logger.error(f"Invalid table_bbox format: {table_bbox}")
return
logger.info(f"[表格] OCR原始座標: L={ocr_x_left_raw:.0f}, T={ocr_y_top_raw:.0f}, R={ocr_x_right_raw:.0f}, B={ocr_y_bottom_raw:.0f}")
# Apply scaling
ocr_x_left = ocr_x_left_raw * scale_w
ocr_y_top = ocr_y_top_raw * scale_h
ocr_x_right = ocr_x_right_raw * scale_w
ocr_y_bottom = ocr_y_bottom_raw * scale_h
table_width = abs(ocr_x_right - ocr_x_left)
table_height = abs(ocr_y_bottom - ocr_y_top)
# Transform coordinates
pdf_x = ocr_x_left
pdf_y = page_height - ocr_y_bottom
# Build table data for ReportLab
# Convert parsed structure to simple 2D array
max_cols = max(len(row['cells']) for row in rows)
logger.info(f"[表格] {len(rows)}行x{max_cols}列 → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 寬x高: {table_width:.0f}x{table_height:.0f}")
reportlab_data = []
for row in rows:
row_data = []
for cell in row['cells']:
text = cell['text'].strip()
row_data.append(text)
# Pad row if needed
while len(row_data) < max_cols:
row_data.append('')
reportlab_data.append(row_data)
# Calculate column widths (equal distribution)
col_widths = [table_width / max_cols] * max_cols
# Create ReportLab Table
# Use smaller font size to fit in bbox
font_size = min(table_height / len(rows) * 0.5, 10)
font_size = max(font_size, 6)
# Create table with font
table = Table(reportlab_data, colWidths=col_widths)
# Apply table style
style = TableStyle([
('FONT', (0, 0), (-1, -1), self.font_name if self.font_registered else 'Helvetica', font_size),
('GRID', (0, 0), (-1, -1), 0.5, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('LEFTPADDING', (0, 0), (-1, -1), 2),
('RIGHTPADDING', (0, 0), (-1, -1), 2),
('TOPPADDING', (0, 0), (-1, -1), 2),
('BOTTOMPADDING', (0, 0), (-1, -1), 2),
])
# Add header style if first row has headers
if rows and rows[0]['cells'] and rows[0]['cells'][0].get('is_header'):
style.add('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey)
style.add('FONT', (0, 0), (-1, 0), self.font_name if self.font_registered else 'Helvetica-Bold', font_size)
table.setStyle(style)
# Calculate table size
table.wrapOn(pdf_canvas, table_width, table_height)
# Draw table at position
table.drawOn(pdf_canvas, pdf_x, pdf_y)
logger.info(f"Drew table at ({pdf_x:.0f}, {pdf_y:.0f}) size {table_width:.0f}x{table_height:.0f} with {len(rows)} rows")
except Exception as e:
logger.warning(f"Failed to draw table region: {e}")
import traceback
traceback.print_exc()
def draw_image_region(
self,
pdf_canvas: canvas.Canvas,
region: Dict,
page_height: float,
result_dir: Path,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""
Draw an image region by embedding the extracted image
Handles images extracted by PP-StructureV3 (tables, figures, charts, etc.)
Args:
pdf_canvas: ReportLab canvas object
region: Image metadata dict with image_path and bbox
page_height: Height of page (for coordinate transformation)
result_dir: Directory containing result files
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
"""
try:
image_path_str = region.get('image_path', '')
if not image_path_str:
return
# Construct full path to image
# saved_path is relative to result_dir (e.g., "imgs/element_id.png")
image_path = result_dir / image_path_str
# Fallback for legacy data
if not image_path.exists():
image_path = result_dir / Path(image_path_str).name
if not image_path.exists():
logger.warning(f"Image not found: {image_path_str} (in {result_dir})")
return
# Get bbox for positioning
bbox = region.get('bbox', [])
if not bbox or len(bbox) < 4:
# If no bbox, skip for now
logger.warning(f"No bbox for image {image_path_str}")
return
# bbox from OCR: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
# OCR coordinates: origin (0,0) at top-left, Y increases downward
ocr_x_left_raw = bbox[0][0]
ocr_y_top_raw = bbox[0][1]
ocr_x_right_raw = bbox[2][0]
ocr_y_bottom_raw = bbox[2][1]
logger.info(f"[圖片] '{image_path_str}' OCR原始座標: L={ocr_x_left_raw:.0f}, T={ocr_y_top_raw:.0f}, R={ocr_x_right_raw:.0f}, B={ocr_y_bottom_raw:.0f}")
# Apply scaling
ocr_x_left = ocr_x_left_raw * scale_w
ocr_y_top = ocr_y_top_raw * scale_h
ocr_x_right = ocr_x_right_raw * scale_w
ocr_y_bottom = ocr_y_bottom_raw * scale_h
# Calculate bbox dimensions (after scaling)
bbox_width = abs(ocr_x_right - ocr_x_left)
bbox_height = abs(ocr_y_bottom - ocr_y_top)
# Transform coordinates: OCR (top-left origin) → PDF (bottom-left origin)
# CRITICAL: Y-axis flip!
# For images, we position at bottom-left corner
pdf_x_left = ocr_x_left
pdf_y_bottom = page_height - ocr_y_bottom # Flip Y-axis
logger.info(f"[圖片] '{image_path_str}' → PDF位置: ({pdf_x_left:.1f}, {pdf_y_bottom:.1f}), 寬x高: {bbox_width:.0f}x{bbox_height:.0f}")
# Draw image using ReportLab
# drawImage expects: (path, x, y, width, height)
# where (x, y) is the bottom-left corner of the image
pdf_canvas.drawImage(
str(image_path),
pdf_x_left,
pdf_y_bottom,
width=bbox_width,
height=bbox_height,
preserveAspectRatio=True,
mask='auto' # Handle transparency
)
logger.info(f"[圖片] ✓ 成功繪製 '{image_path_str}'")
except Exception as e:
logger.warning(f"Failed to draw image region: {e}")
def generate_layout_pdf(
self,
json_path: Path,
output_path: Path,
source_file_path: Optional[Path] = None
) -> bool:
"""
Generate layout-preserving PDF from OCR JSON data
Args:
json_path: Path to OCR JSON file
output_path: Path to save generated PDF
source_file_path: Optional path to original source file for dimension extraction
Returns:
True if successful, False otherwise
"""
try:
# Load JSON data
ocr_data = self.load_ocr_json(json_path)
if not ocr_data:
return False
# Check if this is new UnifiedDocument format (has 'pages' with elements)
# vs old OCR format (has 'text_regions')
if 'pages' in ocr_data and isinstance(ocr_data.get('pages'), list):
# New UnifiedDocument format - convert and use Direct track rendering
logger.info("Detected UnifiedDocument JSON format, using Direct track rendering")
unified_doc = self._json_to_unified_document(ocr_data, json_path.parent)
if unified_doc:
return self.generate_from_unified_document(
unified_doc=unified_doc,
output_path=output_path,
source_file_path=source_file_path
)
else:
logger.error("Failed to convert JSON to UnifiedDocument")
return False
else:
# Old OCR format - use legacy generation
logger.info("Detected legacy OCR JSON format, using OCR track rendering")
return self._generate_pdf_from_data(
ocr_data=ocr_data,
output_path=output_path,
source_file_path=source_file_path,
json_parent_dir=json_path.parent
)
except Exception as e:
logger.error(f"Failed to generate PDF: {e}")
import traceback
traceback.print_exc()
return False
def _json_to_unified_document(self, json_data: Dict, result_dir: Path) -> Optional['UnifiedDocument']:
"""
Convert JSON dict to UnifiedDocument object.
Args:
json_data: Loaded JSON dictionary in UnifiedDocument format
result_dir: Directory containing image files
Returns:
UnifiedDocument object or None if conversion fails
"""
try:
from datetime import datetime
# Parse metadata
metadata_dict = json_data.get('metadata', {})
# Parse processing track
track_str = metadata_dict.get('processing_track', 'direct')
try:
processing_track = ProcessingTrack(track_str)
except ValueError:
processing_track = ProcessingTrack.DIRECT
# Create DocumentMetadata
metadata = DocumentMetadata(
filename=metadata_dict.get('filename', ''),
file_type=metadata_dict.get('file_type', 'pdf'),
file_size=metadata_dict.get('file_size', 0),
created_at=datetime.fromisoformat(metadata_dict.get('created_at', datetime.now().isoformat()).replace('Z', '+00:00')),
processing_track=processing_track,
processing_time=metadata_dict.get('processing_time', 0),
language=metadata_dict.get('language'),
title=metadata_dict.get('title'),
author=metadata_dict.get('author'),
subject=metadata_dict.get('subject'),
keywords=metadata_dict.get('keywords'),
producer=metadata_dict.get('producer'),
creator=metadata_dict.get('creator'),
creation_date=datetime.fromisoformat(metadata_dict['creation_date'].replace('Z', '+00:00')) if metadata_dict.get('creation_date') else None,
modification_date=datetime.fromisoformat(metadata_dict['modification_date'].replace('Z', '+00:00')) if metadata_dict.get('modification_date') else None,
)
# Parse pages
pages = []
for page_dict in json_data.get('pages', []):
# Parse page dimensions
dims = page_dict.get('dimensions', {})
if not dims:
# Fallback dimensions
dims = {'width': 595.32, 'height': 841.92}
dimensions = Dimensions(
width=dims.get('width', 595.32),
height=dims.get('height', 841.92),
dpi=dims.get('dpi')
)
# Parse elements
elements = []
for elem_dict in page_dict.get('elements', []):
element = self._json_to_document_element(elem_dict)
if element:
elements.append(element)
page = Page(
page_number=page_dict.get('page_number', 1),
dimensions=dimensions,
elements=elements,
metadata=page_dict.get('metadata', {})
)
pages.append(page)
# Create UnifiedDocument
unified_doc = UnifiedDocument(
document_id=json_data.get('document_id', ''),
metadata=metadata,
pages=pages,
processing_errors=json_data.get('processing_errors', [])
)
logger.info(f"Converted JSON to UnifiedDocument: {len(pages)} pages, track={processing_track.value}")
return unified_doc
except Exception as e:
logger.error(f"Failed to convert JSON to UnifiedDocument: {e}")
import traceback
traceback.print_exc()
return None
def _json_to_document_element(self, elem_dict: Dict) -> Optional['DocumentElement']:
"""
Convert JSON dict to DocumentElement.
Args:
elem_dict: Element dictionary from JSON
Returns:
DocumentElement or None if conversion fails
"""
try:
# Parse element type
type_str = elem_dict.get('type', 'text')
try:
elem_type = ElementType(type_str)
except ValueError:
# Fallback to TEXT for unknown types
elem_type = ElementType.TEXT
logger.warning(f"Unknown element type '{type_str}', falling back to TEXT")
# Content-based HTML table detection: reclassify text elements with HTML table content
content = elem_dict.get('content', '')
if elem_type == ElementType.TEXT and isinstance(content, str) and '<table' in content.lower():
logger.info(f"Reclassifying element from TEXT to TABLE due to HTML table content")
elem_type = ElementType.TABLE
# Parse bounding box
bbox_dict = elem_dict.get('bbox', {})
bbox = BoundingBox(
x0=bbox_dict.get('x0', 0),
y0=bbox_dict.get('y0', 0),
x1=bbox_dict.get('x1', 0),
y1=bbox_dict.get('y1', 0)
)
# Parse style if present
style = None
if 'style' in elem_dict and elem_dict['style']:
style_dict = elem_dict['style']
style = StyleInfo(
font_name=style_dict.get('font_name'),
font_size=style_dict.get('font_size'),
font_weight=style_dict.get('font_weight'),
font_style=style_dict.get('font_style'),
text_color=style_dict.get('text_color'),
bg_color=style_dict.get('bg_color') or style_dict.get('background_color'),
alignment=style_dict.get('alignment'),
)
# Parse children (spans)
children = []
for child_dict in elem_dict.get('children', []):
child = self._json_to_document_element(child_dict)
if child:
children.append(child)
# Process content based on element type
content = elem_dict.get('content', '')
# For TABLE elements, convert dict content to TableData object
if elem_type == ElementType.TABLE and isinstance(content, dict) and 'cells' in content:
try:
content = TableData.from_dict(content)
logger.debug(f"Converted table dict to TableData: {content.rows}x{content.cols}, {len(content.cells)} cells")
except Exception as e:
logger.warning(f"Failed to convert table dict to TableData: {e}")
# Keep original dict as fallback
# Create element
element = DocumentElement(
element_id=elem_dict.get('element_id', ''),
type=elem_type,
content=content,
bbox=bbox,
confidence=elem_dict.get('confidence'),
style=style,
metadata=elem_dict.get('metadata', {}),
children=children
)
return element
except Exception as e:
logger.warning(f"Failed to convert element: {e}")
return None
def _is_list_item_fallback(self, element: 'DocumentElement') -> bool:
"""
Fallback detection for list items not marked with ElementType.LIST_ITEM.
Checks metadata and text patterns to identify list items.
Args:
element: Document element to check
Returns:
True if element appears to be a list item
"""
# Skip if already categorized as table or image
if element.type in [ElementType.TABLE, ElementType.IMAGE, ElementType.FIGURE,
ElementType.CHART, ElementType.DIAGRAM]:
return False
# Check metadata for list-related fields
if element.metadata:
# Check for list_level metadata
if 'list_level' in element.metadata:
return True
# Check for parent_item (indicates list hierarchy)
if 'parent_item' in element.metadata:
return True
# Check for children (could be parent list item)
if 'children' in element.metadata and element.metadata['children']:
return True
# Check text content for list patterns
if element.is_text:
text = element.get_text().lstrip()
# Ordered list pattern: starts with number followed by . or )
if re.match(r'^\d+[\.\)]\s', text):
return True
# Unordered list pattern: starts with bullet character
if re.match(r'^[•·▪▫◦‣⁃\-\*]\s', text):
return True
return False
def _draw_list_elements_direct(
self,
pdf_canvas: canvas.Canvas,
list_elements: List['DocumentElement'],
page_height: float
):
"""
Draw list elements with proper sequential numbering and formatting.
This method processes all list items on a page, groups them into lists,
and assigns proper sequential numbering to ordered lists.
Args:
pdf_canvas: ReportLab canvas object
list_elements: List of LIST_ITEM elements
page_height: Page height for coordinate transformation
"""
if not list_elements:
return
# Sort list items by position (top to bottom, left to right)
sorted_items = sorted(list_elements, key=lambda e: (e.bbox.y0, e.bbox.x0))
# Group list items into lists based on proximity and level
list_groups = []
current_group = []
prev_y = None
prev_level = None
max_gap = 30 # Maximum vertical gap between items in same list (in points)
for item in sorted_items:
level = item.metadata.get('list_level', 0) if item.metadata else 0
y_pos = item.bbox.y0
# Check if this item belongs to current group
if current_group and prev_y is not None:
gap = abs(y_pos - prev_y)
# Start new group if gap is too large or level changed significantly
if gap > max_gap or (prev_level is not None and level != prev_level):
list_groups.append(current_group)
current_group = []
current_group.append(item)
prev_y = y_pos
prev_level = level
if current_group:
list_groups.append(current_group)
# Process each list group
for group in list_groups:
# Detect list type from first item
first_item = group[0]
text_content = first_item.get_text()
text_stripped = text_content.lstrip()
list_type = None
list_counter = 1
# Determine list type
if re.match(r'^\d+[\.\)]\s', text_stripped):
list_type = 'ordered'
# Extract starting number
match = re.match(r'^(\d+)[\.\)]\s', text_stripped)
if match:
list_counter = int(match.group(1))
elif re.match(r'^[•·▪▫◦‣⁃]\s', text_stripped):
list_type = 'unordered'
# Draw each item in the group with proper spacing
# Track cumulative Y offset to apply spacing_after between items
cumulative_y_offset = 0
for item_idx, item in enumerate(group):
# Prepare list marker based on type
if list_type == 'ordered':
list_marker = f"{list_counter}. "
list_counter += 1
elif list_type == 'unordered':
list_marker = ""
else:
list_marker = "" # No marker if type unknown
# Store list marker in item metadata for _draw_text_element_direct
if not item.metadata:
item.metadata = {}
item.metadata['_list_marker'] = list_marker
item.metadata['_list_type'] = list_type
# Add default list item spacing if not specified
# This ensures consistent spacing between list items
desired_spacing_after = item.metadata.get('spacing_after', 0)
if desired_spacing_after == 0:
# Default list item spacing: 3 points between items (except last item)
if item_idx < len(group) - 1:
desired_spacing_after = 3.0
item.metadata['spacing_after'] = desired_spacing_after
# Draw the list item with cumulative Y offset
self._draw_text_element_direct(pdf_canvas, item, page_height, y_offset=cumulative_y_offset)
# Calculate spacing to add after this item
if item_idx < len(group) - 1 and desired_spacing_after > 0:
next_item = group[item_idx + 1]
# Calculate actual vertical gap between items (in document coordinates)
# Note: Y increases downward in document coordinates
actual_gap = next_item.bbox.y0 - item.bbox.y1
# If actual gap is less than desired spacing, add offset to push next item down
if actual_gap < desired_spacing_after:
additional_spacing = desired_spacing_after - actual_gap
cumulative_y_offset -= additional_spacing # Negative because PDF Y increases upward
logger.debug(f"Adding {additional_spacing:.1f}pt spacing after list item {item.element_id} "
f"(actual_gap={actual_gap:.1f}pt, desired={desired_spacing_after:.1f}pt)")
def _draw_text_with_spans(
self,
pdf_canvas: canvas.Canvas,
spans: List['DocumentElement'],
line_x: float,
line_y: float,
default_font_size: float,
max_width: float = None
) -> float:
"""
Draw text with inline span styling (mixed styles within a line).
Args:
pdf_canvas: ReportLab canvas object
spans: List of span DocumentElements
line_x: Starting X position
line_y: Y position
default_font_size: Default font size if span has none
max_width: Maximum width available (for scaling if needed)
Returns:
Total width of drawn text
"""
if not spans:
return 0
# First pass: calculate total width with original sizes
total_width = 0
span_data = [] # Store (span, text, font, size) for rendering
for span in spans:
span_text = span.get_text()
if not span_text:
continue
# Apply span-specific styling to get font and size
if span.style:
self._apply_text_style(pdf_canvas, span.style, default_size=default_font_size)
else:
font_name = self.font_name if self.font_registered else 'Helvetica'
pdf_canvas.setFont(font_name, default_font_size)
current_font = pdf_canvas._fontname
current_size = pdf_canvas._fontsize
# Calculate span width
span_width = pdf_canvas.stringWidth(span_text, current_font, current_size)
total_width += span_width
span_data.append((span, span_text, current_font, current_size, span_width))
# Calculate scale factor if needed
scale_factor = 1.0
if max_width and total_width > max_width:
scale_factor = (max_width / total_width) * 0.95 # 95% to leave margin
logger.debug(f"Scaling spans: total_width={total_width:.1f}pt > max_width={max_width:.1f}pt, scale={scale_factor:.2f}")
# Second pass: draw spans with scaling
x_pos = line_x
for span, span_text, font_name, original_size, span_width in span_data:
# Apply scaled font size
scaled_size = original_size * scale_factor
scaled_size = max(scaled_size, 3) # Minimum 3pt
# Set font with scaled size
pdf_canvas.setFont(font_name, scaled_size)
# Draw this span
pdf_canvas.drawString(x_pos, line_y, span_text)
# Calculate actual width with scaled size and advance position
actual_width = pdf_canvas.stringWidth(span_text, font_name, scaled_size)
x_pos += actual_width
return total_width * scale_factor
def _draw_text_element_direct(
self,
pdf_canvas: canvas.Canvas,
element: 'DocumentElement',
page_height: float,
y_offset: float = 0
):
"""
Draw text element with Direct track rich formatting.
FIXED: Correctly handles multi-line blocks and spans coordinates.
Prioritizes span-based rendering (using precise bbox from each span),
falls back to block-level rendering with corrected Y-axis logic.
Args:
pdf_canvas: ReportLab canvas object
element: DocumentElement with text content
page_height: Page height for coordinate transformation
y_offset: Optional Y coordinate offset (for list spacing), in PDF coordinates
"""
try:
text_content = element.get_text()
if not text_content:
return
# Get bounding box
bbox = element.bbox
if not bbox:
logger.warning(f"No bbox for text element {element.element_id}")
return
bbox_width = bbox.x1 - bbox.x0
bbox_height = bbox.y1 - bbox.y0
# --- FIX 1: Prioritize Span-based Drawing (Precise Layout) ---
# DirectExtractionEngine provides children (spans) with precise bboxes.
# Using these preserves exact layout, kerning, and multi-column positioning.
if element.children and len(element.children) > 0:
for span in element.children:
span_text = span.get_text()
if not span_text:
continue
# Use span's own bbox for positioning
s_bbox = span.bbox
if not s_bbox:
continue
# Calculate font size from span style or bbox
s_font_size = 10 # default
if span.style and span.style.font_size:
s_font_size = span.style.font_size
else:
# Estimate from bbox height
s_font_size = (s_bbox.y1 - s_bbox.y0) * 0.75
s_font_size = max(min(s_font_size, 72), 4)
# Apply span style
if span.style:
self._apply_text_style(pdf_canvas, span.style, default_size=s_font_size)
else:
font_name = self.font_name if self.font_registered else 'Helvetica'
pdf_canvas.setFont(font_name, s_font_size)
# Transform coordinates
# PyMuPDF y1 is bottom of text box. ReportLab draws at baseline.
# Using y1 with a small offset (20% of font size) approximates baseline position.
span_pdf_x = s_bbox.x0
span_pdf_y = page_height - s_bbox.y1 + (s_font_size * 0.2)
pdf_canvas.drawString(span_pdf_x, span_pdf_y + y_offset, span_text)
# If we drew spans, we are done. Do not draw the block text on top.
logger.debug(f"Drew {len(element.children)} spans using precise bbox positioning")
return
# --- FIX 2: Block-level Fallback (Corrected Y-Axis Logic) ---
# Used when no spans are available (e.g. filtered text or modified structures)
# Calculate font size from bbox height
font_size = bbox_height * 0.75
font_size = max(min(font_size, 72), 4) # Clamp 4-72pt
# Apply style if available
alignment = 'left' # Default alignment
if hasattr(element, 'style') and element.style:
self._apply_text_style(pdf_canvas, element.style, default_size=font_size)
# Get alignment from style
if hasattr(element.style, 'alignment') and element.style.alignment:
alignment = element.style.alignment
else:
# Use default font
font_name = self.font_name if self.font_registered else 'Helvetica'
pdf_canvas.setFont(font_name, font_size)
# Detect list items and extract list properties
is_list_item = (element.type == ElementType.LIST_ITEM)
list_level = element.metadata.get('list_level', 0) if element.metadata else 0
# Get pre-computed list marker from metadata (set by _draw_list_elements_direct)
list_marker = element.metadata.get('_list_marker', '') if element.metadata else ''
list_type = element.metadata.get('_list_type') if element.metadata else None
# If no pre-computed marker, remove original marker from text
if is_list_item and list_marker:
# Remove original marker from text content
text_stripped = text_content.lstrip()
# Remove ordered list marker
text_content = re.sub(r'^\d+[\.\)]\s', '', text_stripped)
# Remove unordered list marker
text_content = re.sub(r'^[•·▪▫◦‣⁃]\s', '', text_content)
# Get indentation from metadata (in points)
indent = element.metadata.get('indent', 0) if element.metadata else 0
first_line_indent = element.metadata.get('first_line_indent', indent) if element.metadata else indent
# Apply list indentation (20pt per level)
if is_list_item:
list_indent = list_level * 20 # 20pt per level
indent += list_indent
first_line_indent += list_indent
# Get paragraph spacing
paragraph_spacing_before = element.metadata.get('spacing_before', 0) if element.metadata else 0
paragraph_spacing_after = element.metadata.get('spacing_after', 0) if element.metadata else 0
# --- CRITICAL FIX: Start from TOP of block (y0), not bottom (y1) ---
pdf_x = bbox.x0
pdf_y_top = page_height - bbox.y0 - paragraph_spacing_before + y_offset
# Handle line breaks
lines = text_content.split('\n')
line_height = font_size * 1.2 # 120% of font size
# Calculate list marker width for multi-line alignment
marker_width = 0
if is_list_item and list_marker:
# Use current font to calculate marker width
marker_width = pdf_canvas.stringWidth(list_marker, pdf_canvas._fontname, font_size)
# Draw each line with alignment
for i, line in enumerate(lines):
if not line.strip():
# Empty line: skip
continue
# Calculate Y position: Start from top, move down by line_height for each line
# The first line's baseline is approx 1 line_height below the top
line_y = pdf_y_top - ((i + 1) * line_height) + (font_size * 0.25) # 0.25 adjust for baseline
# Get current font info
font_name = pdf_canvas._fontname
current_font_size = pdf_canvas._fontsize
# Calculate line indentation
line_indent = first_line_indent if i == 0 else indent
# For list items: align subsequent lines with text after marker
if is_list_item and i > 0 and marker_width > 0:
line_indent += marker_width
# Prepend list marker to first line
rendered_line = line
if is_list_item and i == 0 and list_marker:
rendered_line = list_marker + line
# Calculate text width
text_width = pdf_canvas.stringWidth(rendered_line, font_name, current_font_size)
available_width = bbox_width - line_indent
# Scale font if needed
if text_width > available_width and available_width > 0:
scale_factor = available_width / text_width
scaled_size = current_font_size * scale_factor * 0.95
scaled_size = max(scaled_size, 3)
pdf_canvas.setFont(font_name, scaled_size)
text_width = pdf_canvas.stringWidth(rendered_line, font_name, scaled_size)
current_font_size = scaled_size
# Calculate X position based on alignment
line_x = pdf_x + line_indent
if alignment == 'center':
line_x = pdf_x + (bbox_width - text_width) / 2
elif alignment == 'right':
line_x = pdf_x + bbox_width - text_width
elif alignment == 'justify' and i < len(lines) - 1:
# Justify: distribute extra space between words (except last line)
words = rendered_line.split()
if len(words) > 1:
total_word_width = sum(pdf_canvas.stringWidth(word, font_name, current_font_size) for word in words)
extra_space = available_width - total_word_width
if extra_space > 0:
word_spacing = extra_space / (len(words) - 1)
# Draw words with calculated spacing
x_pos = pdf_x + line_indent
for word in words:
pdf_canvas.drawString(x_pos, line_y, word)
word_width = pdf_canvas.stringWidth(word, font_name, current_font_size)
x_pos += word_width + word_spacing
# Reset font for next line and skip normal drawString
if text_width > available_width:
pdf_canvas.setFont(font_name, font_size)
continue
# Draw the line at calculated position
pdf_canvas.drawString(line_x, line_y, rendered_line)
# Reset font size for next line
if text_width > available_width:
pdf_canvas.setFont(font_name, font_size)
# Calculate actual text height used
actual_text_height = len(lines) * line_height
bbox_bottom_margin = bbox_height - actual_text_height - paragraph_spacing_before
# Note: For list items, spacing_after is applied via y_offset in _draw_list_elements_direct
# For other elements, spacing is inherent in element positioning (bbox-based layout)
list_info = f", list={list_type}, level={list_level}" if is_list_item else ""
y_offset_info = f", y_offset={y_offset:.1f}pt" if y_offset != 0 else ""
logger.debug(f"Drew text element (fallback): {text_content[:30]}... "
f"({len(lines)} lines, align={alignment}, indent={indent}{list_info}{y_offset_info}, "
f"spacing_before={paragraph_spacing_before}, spacing_after={paragraph_spacing_after}, "
f"actual_height={actual_text_height:.1f}, bbox_bottom_margin={bbox_bottom_margin:.1f})")
except Exception as e:
logger.error(f"Failed to draw text element {element.element_id}: {e}")
def _build_rows_from_cells_dict(self, content: dict) -> list:
"""
Build row structure from cells dict (from Direct extraction JSON).
The cells structure from Direct extraction:
{
"rows": 6,
"cols": 2,
"cells": [
{"row": 0, "col": 0, "content": "..."},
{"row": 0, "col": 1, "content": "..."},
...
]
}
Returns format compatible with HTMLTableParser output:
[
{"cells": [{"text": "..."}, {"text": "..."}]}, # row 0
{"cells": [{"text": "..."}, {"text": "..."}]}, # row 1
...
]
"""
try:
num_rows = content.get('rows', 0)
num_cols = content.get('cols', 0)
cells = content.get('cells', [])
if not cells or num_rows == 0 or num_cols == 0:
return []
# Initialize rows structure
rows_data = []
for _ in range(num_rows):
rows_data.append({'cells': [{'text': ''} for _ in range(num_cols)]})
# Fill in cell content
for cell in cells:
row_idx = cell.get('row', 0)
col_idx = cell.get('col', 0)
cell_content = cell.get('content', '')
if 0 <= row_idx < num_rows and 0 <= col_idx < num_cols:
rows_data[row_idx]['cells'][col_idx]['text'] = str(cell_content) if cell_content else ''
logger.debug(f"Built {num_rows} rows from cells dict")
return rows_data
except Exception as e:
logger.error(f"Error building rows from cells dict: {e}")
return []
def _draw_table_element_direct(
self,
pdf_canvas: canvas.Canvas,
element: 'DocumentElement',
page_height: float
):
"""
Draw table element with Direct track positioning.
Args:
pdf_canvas: ReportLab canvas object
element: DocumentElement with table content
page_height: Page height for coordinate transformation
"""
try:
# Get table data - can be TableData object or dict from JSON
rows_data = None
if isinstance(element.content, TableData):
# Direct TableData object - convert to HTML then parse
html_content = element.content.to_html()
parser = HTMLTableParser()
parser.feed(html_content)
if parser.tables and parser.tables[0]['rows']:
rows_data = parser.tables[0]['rows']
elif isinstance(element.content, dict):
# Dict from JSON - check if it has cells structure (from Direct extraction)
if 'cells' in element.content:
# Build rows from cells structure directly (avoid HTML round-trip)
rows_data = self._build_rows_from_cells_dict(element.content)
elif 'html' in element.content:
# Has HTML content - parse it
html_content = element.content['html']
parser = HTMLTableParser()
parser.feed(html_content)
if parser.tables and parser.tables[0]['rows']:
rows_data = parser.tables[0]['rows']
if not rows_data:
logger.warning(f"No table data for {element.element_id}")
return
rows = rows_data
# Get bbox
bbox = element.bbox
if not bbox:
logger.warning(f"No bbox for table {element.element_id}")
return
# Transform coordinates
pdf_x = bbox.x0
# Use exact bbox position (no buffer) - scaling will ensure table fits
pdf_y = page_height - bbox.y1 # Bottom of table (ReportLab Y coordinate)
table_width = bbox.x1 - bbox.x0
table_height = bbox.y1 - bbox.y0
# Build table data for ReportLab
table_content = []
for row in rows:
row_data = [cell['text'].strip() for cell in row['cells']]
table_content.append(row_data)
# Create table
from reportlab.platypus import Table, TableStyle
from reportlab.lib import colors
# Use original column widths from extraction if available
# Otherwise let ReportLab auto-calculate
col_widths = None
if element.metadata and 'column_widths' in element.metadata:
col_widths = element.metadata['column_widths']
logger.debug(f"Using extracted column widths: {col_widths}")
# NOTE: Don't use rowHeights from extraction - it causes content overlap
# The extracted row heights are based on cell boundaries, not text content height.
# When text wraps or uses different font sizes, the heights don't match.
# Let ReportLab auto-calculate row heights based on content, then use scaling
# to fit within the bbox (same approach as old commit ba8ddf2b).
# Create table without rowHeights - let ReportLab auto-calculate
t = Table(table_content, colWidths=col_widths)
# Apply style with minimal padding to reduce table extension
# Use Chinese font to support special characters (℃, μm, ≦, ×, Ω, etc.)
font_for_table = self.font_name if self.font_registered else 'Helvetica'
style = TableStyle([
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
('FONTNAME', (0, 0), (-1, -1), font_for_table),
('FONTSIZE', (0, 0), (-1, -1), 8),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
# Set minimal padding to prevent table from extending beyond bbox
# User reported padding=1 was still insufficient
('TOPPADDING', (0, 0), (-1, -1), 0),
('BOTTOMPADDING', (0, 0), (-1, -1), 0),
('LEFTPADDING', (0, 0), (-1, -1), 1),
('RIGHTPADDING', (0, 0), (-1, -1), 1),
])
t.setStyle(style)
# Use canvas scaling as fallback to fit table within bbox
# With proper row heights, scaling should be minimal (close to 1.0)
# Step 1: Wrap to get actual rendered size
actual_width, actual_height = t.wrapOn(pdf_canvas, table_width * 10, table_height * 10)
logger.info(f"Table natural size: {actual_width:.1f} × {actual_height:.1f}pt, bbox: {table_width:.1f} × {table_height:.1f}pt")
# Step 2: Calculate scale factor to fit within bbox
scale_x = table_width / actual_width if actual_width > table_width else 1.0
scale_y = table_height / actual_height if actual_height > table_height else 1.0
scale = min(scale_x, scale_y, 1.0) # Never scale up, only down
logger.info(f"Scale factor: {scale:.3f} (x={scale_x:.3f}, y={scale_y:.3f})")
# Step 3: Draw with scaling using canvas transform
pdf_canvas.saveState()
pdf_canvas.translate(pdf_x, pdf_y)
pdf_canvas.scale(scale, scale)
t.drawOn(pdf_canvas, 0, 0)
pdf_canvas.restoreState()
logger.info(f"Drew table at ({pdf_x:.1f}, {pdf_y:.1f}) with scale {scale:.3f}, final size: {actual_width * scale:.1f} × {actual_height * scale:.1f}pt")
logger.debug(f"Drew table element: {len(rows)} rows")
except Exception as e:
logger.error(f"Failed to draw table element {element.element_id}: {e}")
def _draw_image_element_direct(
self,
pdf_canvas: canvas.Canvas,
element: 'DocumentElement',
page_height: float,
result_dir: Path
):
"""
Draw image element with Direct track positioning.
Args:
pdf_canvas: ReportLab canvas object
element: DocumentElement with image content
page_height: Page height for coordinate transformation
result_dir: Directory containing image files
"""
try:
# Get image path
image_path_str = self._get_image_path(element)
if not image_path_str:
logger.warning(f"No image path for element {element.element_id}")
return
# Construct full path to image
# saved_path is relative to result_dir (e.g., "document_id_p1_img0.png")
image_path = result_dir / image_path_str
# Fallback for legacy data
if not image_path.exists():
image_path = result_dir / Path(image_path_str).name
if not image_path.exists():
logger.warning(f"Image not found: {image_path_str} (in {result_dir})")
return
# Get bbox
bbox = element.bbox
if not bbox:
logger.warning(f"No bbox for image {element.element_id}")
return
# Transform coordinates
pdf_x = bbox.x0
pdf_y = page_height - bbox.y1 # Bottom of image
image_width = bbox.x1 - bbox.x0
image_height = bbox.y1 - bbox.y0
# Draw image
pdf_canvas.drawImage(
str(image_path),
pdf_x,
pdf_y,
width=image_width,
height=image_height,
preserveAspectRatio=True
)
logger.debug(f"Drew image: {image_path} (from: {image_path_str})")
except Exception as e:
logger.error(f"Failed to draw image element {element.element_id}: {e}")
# Singleton instance
pdf_generator_service = PDFGeneratorService()