Files
OCR/backend/app/services/pdf_generator_service.py
egg 3fc32bcdd7 feat: implement Phase 2 - Basic Style Preservation
Implement style application system and track-specific rendering for
PDF generation, enabling proper formatting preservation for Direct track.

**Font System** (Task 3.1):
- Added FONT_MAPPING with 20 common fonts → PDF standard fonts
- Implemented _map_font() with case-insensitive and partial matching
- Fallback to Helvetica for unknown fonts

**Style Application** (Task 3.2):
- Implemented _apply_text_style() to apply StyleInfo to canvas
- Supports both StyleInfo objects and dict formats
- Handles font family, size, color, and flags (bold/italic)
- Applies compound font variants (BoldOblique, BoldItalic)
- Graceful error handling with fallback to defaults

**Color Parsing** (Task 3.3):
- Implemented _parse_color() for multiple formats
- Supports hex colors (#RRGGBB, #RGB)
- Supports RGB tuples/lists (0-255 and 0-1 ranges)
- Automatic normalization to ReportLab's 0-1 range

**Track Detection** (Task 4.1):
- Added current_processing_track instance variable
- Detect processing_track from UnifiedDocument.metadata
- Support both object attribute and dict access
- Auto-reset after PDF generation

**Track-Specific Rendering** (Task 4.2, 4.3):
- Preserve StyleInfo in convert_unified_document_to_ocr_data
- Apply styles in draw_text_region for Direct track
- Simplified rendering for OCR track (unchanged behavior)
- Track detection: is_direct_track check

**Implementation Details**:
- Lines 97-125: Font mapping and style flag constants
- Lines 161-201: _parse_color() method
- Lines 203-236: _map_font() method
- Lines 238-326: _apply_text_style() method
- Lines 530-538: Track detection in generate_from_unified_document
- Lines 431-433: Style preservation in conversion
- Lines 1022-1037: Track-specific styling in draw_text_region

**Status**:
- Phase 2 Task 3:  Completed (3.1, 3.2, 3.3)
- Phase 2 Task 4:  Completed (4.1, 4.2, 4.3)
- Testing pending: 4.4 (requires backend)

Direct track PDFs will now preserve fonts, colors, and text styling
while maintaining backward compatibility with OCR track rendering.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-24 07:44:24 +08:00

1348 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Layout-Preserving PDF Generation Service
Generates PDF files that preserve the original document layout using OCR JSON data
"""
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime
from reportlab.lib.pagesizes import A4, letter
from reportlab.lib.units import mm
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import Table, TableStyle
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
from reportlab.platypus import Paragraph
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from PIL import Image
from html.parser import HTMLParser
from app.core.config import settings
# Import UnifiedDocument for dual-track support
try:
from app.models.unified_document import (
UnifiedDocument, DocumentElement, ElementType,
BoundingBox, TableData, ProcessingTrack
)
UNIFIED_DOCUMENT_AVAILABLE = True
except ImportError:
UNIFIED_DOCUMENT_AVAILABLE = False
UnifiedDocument = None
logger = logging.getLogger(__name__)
class HTMLTableParser(HTMLParser):
"""Parse HTML table to extract structure and data"""
def __init__(self):
super().__init__()
self.tables = []
self.current_table = None
self.current_row = None
self.current_cell = None
self.in_table = False
def handle_starttag(self, tag, attrs):
attrs_dict = dict(attrs)
if tag == 'table':
self.in_table = True
self.current_table = {'rows': []}
elif tag == 'tr' and self.in_table:
self.current_row = {'cells': []}
elif tag in ('td', 'th') and self.in_table and self.current_row is not None:
colspan = int(attrs_dict.get('colspan', 1))
rowspan = int(attrs_dict.get('rowspan', 1))
self.current_cell = {
'text': '',
'is_header': tag == 'th',
'colspan': colspan,
'rowspan': rowspan
}
def handle_endtag(self, tag):
if tag == 'table' and self.in_table:
if self.current_table and self.current_table['rows']:
self.tables.append(self.current_table)
self.current_table = None
self.in_table = False
elif tag == 'tr' and self.current_row is not None:
if self.current_table is not None:
self.current_table['rows'].append(self.current_row)
self.current_row = None
elif tag in ('td', 'th') and self.current_cell is not None:
if self.current_row is not None:
self.current_row['cells'].append(self.current_cell)
self.current_cell = None
def handle_data(self, data):
if self.current_cell is not None:
self.current_cell['text'] += data.strip() + ' '
class PDFGeneratorService:
"""Service for generating layout-preserving PDFs from OCR JSON data"""
# Font mapping from common fonts to PDF standard fonts
FONT_MAPPING = {
'Arial': 'Helvetica',
'Arial Black': 'Helvetica-Bold',
'Times New Roman': 'Times-Roman',
'Times': 'Times-Roman',
'Courier New': 'Courier',
'Courier': 'Courier',
'Calibri': 'Helvetica',
'Cambria': 'Times-Roman',
'Georgia': 'Times-Roman',
'Verdana': 'Helvetica',
'Tahoma': 'Helvetica',
'Trebuchet MS': 'Helvetica',
'Comic Sans MS': 'Helvetica',
'Impact': 'Helvetica-Bold',
'Lucida Console': 'Courier',
'Palatino': 'Times-Roman',
'Garamond': 'Times-Roman',
'Bookman': 'Times-Roman',
'Century Gothic': 'Helvetica',
'Franklin Gothic': 'Helvetica',
}
# Style flags for text formatting
STYLE_FLAG_BOLD = 1
STYLE_FLAG_ITALIC = 2
STYLE_FLAG_UNDERLINE = 4
STYLE_FLAG_STRIKETHROUGH = 8
def __init__(self):
"""Initialize PDF generator with font configuration"""
self.font_name = 'NotoSansSC'
self.font_path = None
self.font_registered = False
self.current_processing_track = None # Track type for current document
self._register_chinese_font()
def _register_chinese_font(self):
"""Register Chinese font for PDF generation"""
try:
# Get font path from settings
font_path = Path(settings.chinese_font_path)
# Try relative path from project root
if not font_path.is_absolute():
# Adjust path - settings.chinese_font_path starts with ./backend/
project_root = Path(__file__).resolve().parent.parent.parent.parent
font_path = project_root / font_path
if not font_path.exists():
logger.error(f"Chinese font not found at {font_path}")
return
# Register font
pdfmetrics.registerFont(TTFont(self.font_name, str(font_path)))
self.font_path = font_path
self.font_registered = True
logger.info(f"Chinese font registered: {self.font_name} from {font_path}")
except Exception as e:
logger.error(f"Failed to register Chinese font: {e}")
self.font_registered = False
def _parse_color(self, color_value) -> Tuple[float, float, float]:
"""
Parse color value to RGB tuple.
Args:
color_value: Color as hex string (#RRGGBB), RGB tuple, or color name
Returns:
RGB tuple with values 0-1 for ReportLab
"""
if not color_value:
return (0, 0, 0) # Default to black
try:
# Handle hex color (#RRGGBB or #RGB)
if isinstance(color_value, str) and color_value.startswith('#'):
hex_color = color_value.lstrip('#')
# Expand short form (#RGB -> #RRGGBB)
if len(hex_color) == 3:
hex_color = ''.join([c*2 for c in hex_color])
if len(hex_color) == 6:
r = int(hex_color[0:2], 16) / 255.0
g = int(hex_color[2:4], 16) / 255.0
b = int(hex_color[4:6], 16) / 255.0
return (r, g, b)
# Handle RGB tuple or list
elif isinstance(color_value, (tuple, list)) and len(color_value) >= 3:
r, g, b = color_value[0:3]
# Normalize to 0-1 if values are 0-255
if any(v > 1 for v in [r, g, b]):
return (r/255.0, g/255.0, b/255.0)
return (r, g, b)
except (ValueError, TypeError) as e:
logger.warning(f"Failed to parse color {color_value}: {e}")
# Default to black
return (0, 0, 0)
def _map_font(self, font_name: Optional[str]) -> str:
"""
Map font name to PDF standard font.
Args:
font_name: Original font name
Returns:
PDF standard font name
"""
if not font_name:
return 'Helvetica'
# Direct lookup
if font_name in self.FONT_MAPPING:
return self.FONT_MAPPING[font_name]
# Case-insensitive lookup
font_lower = font_name.lower()
for orig_font, pdf_font in self.FONT_MAPPING.items():
if orig_font.lower() == font_lower:
return pdf_font
# Partial match for common patterns
if 'arial' in font_lower:
return 'Helvetica'
elif 'times' in font_lower:
return 'Times-Roman'
elif 'courier' in font_lower:
return 'Courier'
# Default fallback
logger.debug(f"Font '{font_name}' not found in mapping, using Helvetica")
return 'Helvetica'
def _apply_text_style(self, c: canvas.Canvas, style_info, default_size: float = 12):
"""
Apply text styling from StyleInfo to PDF canvas.
Args:
c: ReportLab canvas object
style_info: StyleInfo object or dict with font, size, color, flags
default_size: Default font size if not specified
"""
if not style_info:
# Apply default styling
c.setFont('Helvetica', default_size)
c.setFillColorRGB(0, 0, 0)
return
try:
# Extract style attributes
if hasattr(style_info, '__dict__'):
# StyleInfo object
font_family = getattr(style_info, 'font', None)
font_size = getattr(style_info, 'size', default_size)
color = getattr(style_info, 'color', None)
flags = getattr(style_info, 'flags', 0)
elif isinstance(style_info, dict):
# Dictionary
font_family = style_info.get('font')
font_size = style_info.get('size', default_size)
color = style_info.get('color')
flags = style_info.get('flags', 0)
else:
# Unknown format, use defaults
c.setFont('Helvetica', default_size)
c.setFillColorRGB(0, 0, 0)
return
# Map font name
base_font = self._map_font(font_family) if font_family else 'Helvetica'
# Apply bold/italic modifiers
if flags:
is_bold = bool(flags & self.STYLE_FLAG_BOLD)
is_italic = bool(flags & self.STYLE_FLAG_ITALIC)
if is_bold and is_italic:
# Try bold-italic variant
if 'Helvetica' in base_font:
base_font = 'Helvetica-BoldOblique'
elif 'Times' in base_font:
base_font = 'Times-BoldItalic'
elif 'Courier' in base_font:
base_font = 'Courier-BoldOblique'
elif is_bold:
# Try bold variant
if 'Helvetica' in base_font:
base_font = 'Helvetica-Bold'
elif 'Times' in base_font:
base_font = 'Times-Bold'
elif 'Courier' in base_font:
base_font = 'Courier-Bold'
elif is_italic:
# Try italic variant
if 'Helvetica' in base_font:
base_font = 'Helvetica-Oblique'
elif 'Times' in base_font:
base_font = 'Times-Italic'
elif 'Courier' in base_font:
base_font = 'Courier-Oblique'
# Apply font and size
actual_size = font_size if font_size and font_size > 0 else default_size
try:
c.setFont(base_font, actual_size)
except KeyError:
# Font not available, fallback
logger.warning(f"Font '{base_font}' not available, using Helvetica")
c.setFont('Helvetica', actual_size)
# Apply color
if color:
r, g, b = self._parse_color(color)
c.setFillColorRGB(r, g, b)
else:
c.setFillColorRGB(0, 0, 0) # Default black
except Exception as e:
logger.error(f"Failed to apply text style: {e}")
# Fallback to defaults
c.setFont('Helvetica', default_size)
c.setFillColorRGB(0, 0, 0)
def load_ocr_json(self, json_path: Path) -> Optional[Dict]:
"""
Load and parse OCR JSON result file
Args:
json_path: Path to JSON file
Returns:
Parsed JSON data or None if failed
"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Loaded OCR JSON: {json_path.name}")
return data
except Exception as e:
logger.error(f"Failed to load JSON {json_path}: {e}")
return None
def _get_image_path(self, element) -> Optional[str]:
"""
Get image path with fallback logic.
Checks multiple locations in order:
1. element.content["saved_path"] - Direct track saved path
2. element.content["path"] - Legacy path
3. element.content["image_path"] - Alternative path
4. element.saved_path - Direct attribute
5. element.metadata["path"] - Metadata fallback
Args:
element: DocumentElement object
Returns:
Path to image file or None if not found
"""
# Check content dictionary
if isinstance(element.content, dict):
for key in ['saved_path', 'path', 'image_path']:
if key in element.content:
return element.content[key]
# Check direct attribute
if hasattr(element, 'saved_path') and element.saved_path:
return element.saved_path
# Check metadata
if element.metadata and isinstance(element.metadata, dict):
if 'path' in element.metadata:
return element.metadata['path']
if 'saved_path' in element.metadata:
return element.metadata['saved_path']
return None
def convert_unified_document_to_ocr_data(self, unified_doc: 'UnifiedDocument') -> Dict:
"""
Convert UnifiedDocument to OCR data format for PDF generation.
This method transforms the UnifiedDocument structure into the legacy
OCR data format that the PDF generator expects, supporting both
OCR and DIRECT processing tracks.
Args:
unified_doc: UnifiedDocument object from either processing track
Returns:
Dictionary in OCR data format with text_regions, images_metadata, layout_data
"""
text_regions = []
images_metadata = []
layout_elements = []
for page in unified_doc.pages:
page_num = page.page_number # 1-based
for element in page.elements:
# Convert BoundingBox to polygon format [[x,y], [x,y], [x,y], [x,y]]
bbox_polygon = [
[element.bbox.x0, element.bbox.y0], # top-left
[element.bbox.x1, element.bbox.y0], # top-right
[element.bbox.x1, element.bbox.y1], # bottom-right
[element.bbox.x0, element.bbox.y1], # bottom-left
]
# Handle text elements
if element.is_text or element.type in [
ElementType.TEXT, ElementType.TITLE, ElementType.HEADER,
ElementType.FOOTER, ElementType.PARAGRAPH, ElementType.CAPTION,
ElementType.LIST_ITEM, ElementType.FOOTNOTE, ElementType.REFERENCE
]:
text_content = element.get_text()
if text_content:
text_region = {
'text': text_content,
'bbox': bbox_polygon,
'confidence': element.confidence or 1.0,
'page': page_num
}
# Include style information if available (for Direct track)
if hasattr(element, 'style') and element.style:
text_region['style'] = element.style
text_regions.append(text_region)
# Handle table elements
elif element.type == ElementType.TABLE:
# Convert TableData to HTML for layout_data
if isinstance(element.content, TableData):
html_content = element.content.to_html()
elif isinstance(element.content, dict):
html_content = element.content.get('html', str(element.content))
else:
html_content = str(element.content)
layout_elements.append({
'type': 'table',
'content': html_content,
'bbox': [element.bbox.x0, element.bbox.y0,
element.bbox.x1, element.bbox.y1],
'page': page_num - 1 # layout uses 0-based
})
# Add bbox to images_metadata for text overlap filtering
# (no actual image file, just bbox for filtering)
images_metadata.append({
'image_path': None, # No fake table image
'bbox': bbox_polygon,
'page': page_num - 1, # 0-based for images_metadata
'type': 'table',
'element_id': element.element_id
})
# Handle image/visual elements
elif element.is_visual or element.type in [
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
ElementType.DIAGRAM, ElementType.LOGO
]:
# Get image path using fallback logic
image_path = self._get_image_path(element)
# Only add if we found a valid path
if image_path:
images_metadata.append({
'image_path': image_path,
'bbox': bbox_polygon,
'page': page_num - 1, # 0-based
'type': element.type.value
})
logger.debug(f"Found image path: {image_path} for element {element.element_id}")
else:
logger.warning(f"No image path found for visual element {element.element_id}")
# Build OCR data structure
ocr_data = {
'text_regions': text_regions,
'images_metadata': images_metadata,
'layout_data': {
'elements': layout_elements,
'total_elements': len(layout_elements)
},
'total_pages': unified_doc.page_count,
'ocr_dimensions': {
'width': unified_doc.pages[0].dimensions.width if unified_doc.pages else 0,
'height': unified_doc.pages[0].dimensions.height if unified_doc.pages else 0
},
# Metadata for tracking
'_from_unified_document': True,
'_processing_track': unified_doc.metadata.processing_track.value
}
logger.info(f"Converted UnifiedDocument to OCR data: "
f"{len(text_regions)} text regions, "
f"{len(images_metadata)} images, "
f"{len(layout_elements)} layout elements, "
f"track={unified_doc.metadata.processing_track.value}")
return ocr_data
def generate_from_unified_document(
self,
unified_doc: 'UnifiedDocument',
output_path: Path,
source_file_path: Optional[Path] = None
) -> bool:
"""
Generate layout-preserving PDF directly from UnifiedDocument.
This method supports both OCR and DIRECT processing tracks,
preserving layout and coordinate information from either source.
Args:
unified_doc: UnifiedDocument object
output_path: Path to save generated PDF
source_file_path: Optional path to original source file
Returns:
True if successful, False otherwise
"""
if not UNIFIED_DOCUMENT_AVAILABLE:
logger.error("UnifiedDocument support not available")
return False
try:
# Detect processing track for track-specific rendering
self.current_processing_track = None
if hasattr(unified_doc, 'metadata') and unified_doc.metadata:
if hasattr(unified_doc.metadata, 'processing_track'):
self.current_processing_track = unified_doc.metadata.processing_track
logger.info(f"Processing track detected: {self.current_processing_track}")
elif isinstance(unified_doc.metadata, dict):
self.current_processing_track = unified_doc.metadata.get('processing_track')
logger.info(f"Processing track detected: {self.current_processing_track}")
# Convert UnifiedDocument to OCR data format
ocr_data = self.convert_unified_document_to_ocr_data(unified_doc)
# Use internal generation with pre-loaded data
result = self._generate_pdf_from_data(
ocr_data=ocr_data,
output_path=output_path,
source_file_path=source_file_path
)
# Reset track after generation
self.current_processing_track = None
return result
except Exception as e:
logger.error(f"Failed to generate PDF from UnifiedDocument: {e}")
import traceback
traceback.print_exc()
self.current_processing_track = None
return False
def _generate_pdf_from_data(
self,
ocr_data: Dict,
output_path: Path,
source_file_path: Optional[Path] = None,
json_parent_dir: Optional[Path] = None
) -> bool:
"""
Internal method to generate PDF from OCR data dictionary.
This is the core generation logic extracted for reuse by both
JSON-based and UnifiedDocument-based generation paths.
Args:
ocr_data: OCR data dictionary
output_path: Path to save generated PDF
source_file_path: Optional path to original source file
json_parent_dir: Directory containing images (for JSON-based generation)
Returns:
True if successful, False otherwise
"""
try:
# Check if PDF already exists (caching)
if output_path.exists():
logger.info(f"PDF already exists: {output_path.name}")
return True
# Get text regions
text_regions = ocr_data.get('text_regions', [])
if not text_regions:
logger.warning("No text regions found in data")
# Don't fail - might have only tables/images
# Get images metadata
images_metadata = ocr_data.get('images_metadata', [])
# Get layout data
layout_data = ocr_data.get('layout_data', {})
# Step 1: Get OCR processing dimensions
ocr_width, ocr_height = self.calculate_page_dimensions(ocr_data, source_file_path=None)
logger.info(f"OCR 處理時使用的座標系尺寸: {ocr_width:.1f} x {ocr_height:.1f}")
# Step 2: Get target PDF dimensions
if source_file_path:
target_dims = self.get_original_page_size(source_file_path)
if target_dims:
target_width, target_height = target_dims
logger.info(f"目標 PDF 尺寸(來自原始文件): {target_width:.1f} x {target_height:.1f}")
else:
target_width, target_height = ocr_width, ocr_height
logger.warning(f"無法獲取原始文件尺寸,使用 OCR 尺寸作為目標")
else:
target_width, target_height = ocr_width, ocr_height
logger.info(f"無原始文件,使用 OCR 尺寸作為目標: {target_width:.1f} x {target_height:.1f}")
# Step 3: Calculate scale factors
scale_w = target_width / ocr_width if ocr_width > 0 else 1.0
scale_h = target_height / ocr_height if ocr_height > 0 else 1.0
logger.info(f"縮放因子: X={scale_w:.3f}, Y={scale_h:.3f}")
# Create PDF canvas
pdf_canvas = canvas.Canvas(str(output_path), pagesize=(target_width, target_height))
# Filter text regions to avoid overlap with tables/images
regions_to_avoid = images_metadata
table_count = len([img for img in images_metadata if img.get('type') == 'table'])
logger.info(f"過濾文字區域: {len(regions_to_avoid)} 個區域需要避免 (含 {table_count} 個表格)")
filtered_text_regions = self._filter_text_in_regions(text_regions, regions_to_avoid)
# Group regions by page
pages_data = {}
for region in filtered_text_regions:
page_num = region.get('page', 1)
if page_num not in pages_data:
pages_data[page_num] = []
pages_data[page_num].append(region)
# Get table elements from layout_data
table_elements = []
if layout_data and layout_data.get('elements'):
table_elements = [e for e in layout_data['elements'] if e.get('type') == 'table']
# Process each page
total_pages = ocr_data.get('total_pages', 1)
logger.info(f"開始處理 {total_pages} 頁 PDF")
# Determine image directory
if json_parent_dir is None:
json_parent_dir = output_path.parent
for page_num in range(1, total_pages + 1):
logger.info(f">>> 處理第 {page_num}/{total_pages}")
if page_num > 1:
pdf_canvas.showPage()
# Get regions for this page
page_text_regions = pages_data.get(page_num, [])
page_table_regions = [t for t in table_elements if t.get('page') == page_num - 1]
page_image_regions = [
img for img in images_metadata
if img.get('page') == page_num - 1
and img.get('type') != 'table'
and img.get('image_path') is not None # Skip table placeholders
]
# Draw in layers: images → tables → text
# 1. Draw images (bottom layer)
for img_meta in page_image_regions:
self.draw_image_region(
pdf_canvas, img_meta, target_height,
json_parent_dir, scale_w, scale_h
)
# 2. Draw tables (middle layer)
for table_elem in page_table_regions:
self.draw_table_region(
pdf_canvas, table_elem, images_metadata,
target_height, scale_w, scale_h
)
# 3. Draw text (top layer)
for region in page_text_regions:
self.draw_text_region(
pdf_canvas, region, target_height,
scale_w, scale_h
)
logger.info(f"<<< 第 {page_num} 頁完成")
# Save PDF
pdf_canvas.save()
file_size = output_path.stat().st_size
logger.info(f"Generated PDF: {output_path.name} ({file_size} bytes)")
return True
except Exception as e:
logger.error(f"Failed to generate PDF: {e}")
import traceback
traceback.print_exc()
return False
def calculate_page_dimensions(self, ocr_data: Dict, source_file_path: Optional[Path] = None) -> Tuple[float, float]:
"""
從 OCR JSON 數據中推斷 OCR 處理時的實際頁面尺寸。
這非常重要,因為 OCR 可能在高解析度影像上運行。
Args:
ocr_data: Complete OCR data dictionary with text_regions and layout
source_file_path: Optional path to source file (fallback only)
Returns:
Tuple of (width, height) in points
"""
max_x = 0
max_y = 0
# *** 關鍵修復:檢查所有可能包含 bbox 的字段 ***
# 不同版本的 OCR 輸出可能使用不同的字段名
all_regions = []
# 1. text_regions - 包含所有文字區域(最常見)
if 'text_regions' in ocr_data and isinstance(ocr_data['text_regions'], list):
all_regions.extend(ocr_data['text_regions'])
# 2. image_regions - 包含圖片區域
if 'image_regions' in ocr_data and isinstance(ocr_data['image_regions'], list):
all_regions.extend(ocr_data['image_regions'])
# 3. tables - 包含表格區域
if 'tables' in ocr_data and isinstance(ocr_data['tables'], list):
all_regions.extend(ocr_data['tables'])
# 4. layout - 可能包含布局信息(可能是空列表)
if 'layout' in ocr_data and isinstance(ocr_data['layout'], list):
all_regions.extend(ocr_data['layout'])
# 5. layout_data.elements - PP-StructureV3 格式
if 'layout_data' in ocr_data and isinstance(ocr_data['layout_data'], dict):
elements = ocr_data['layout_data'].get('elements', [])
if elements:
all_regions.extend(elements)
if not all_regions:
# 如果 JSON 為空,回退到原始檔案尺寸
logger.warning("JSON 中沒有找到 text_regions, image_regions, tables, layout 或 layout_data.elements回退到原始檔案尺寸。")
if source_file_path:
dims = self.get_original_page_size(source_file_path)
if dims:
return dims
return A4
region_count = 0
for region in all_regions:
try:
bbox = region.get('bbox')
if not bbox:
continue
region_count += 1
# *** 關鍵修復:正確處理多邊形 [[x, y], ...] 格式 ***
if isinstance(bbox[0], (int, float)):
# 處理簡單的 [x1, y1, x2, y2] 格式
max_x = max(max_x, bbox[2])
max_y = max(max_y, bbox[3])
elif isinstance(bbox[0], (list, tuple)):
# 處理多邊形 [[x, y], ...] 格式
x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
if x_coords and y_coords:
max_x = max(max_x, max(x_coords))
max_y = max(max_y, max(y_coords))
except Exception as e:
logger.warning(f"Error processing bbox {bbox}: {e}")
if max_x > 0 and max_y > 0:
logger.info(f"{region_count} 個區域中推斷出的 OCR 座標系尺寸: {max_x:.1f} x {max_y:.1f}")
return (max_x, max_y)
else:
# 如果所有 bbox 都解析失敗,才回退
logger.warning("無法從 bbox 推斷尺寸,回退到原始檔案尺寸。")
if source_file_path:
dims = self.get_original_page_size(source_file_path)
if dims:
return dims
return A4
def get_original_page_size(self, file_path: Path) -> Optional[Tuple[float, float]]:
"""
Extract page dimensions from original source file
Args:
file_path: Path to original file (image or PDF)
Returns:
Tuple of (width, height) in points or None
"""
try:
if not file_path.exists():
return None
# For images, get dimensions from PIL
if file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
img = Image.open(file_path)
# Use pixel dimensions directly as points (1:1 mapping)
# This matches how PaddleOCR reports coordinates
width_pt = float(img.width)
height_pt = float(img.height)
logger.info(f"Extracted dimensions from image: {width_pt:.1f} x {height_pt:.1f} points (1:1 pixel mapping)")
return (width_pt, height_pt)
# For PDFs, extract dimensions using PyPDF2
if file_path.suffix.lower() == '.pdf':
try:
from PyPDF2 import PdfReader
reader = PdfReader(file_path)
if len(reader.pages) > 0:
page = reader.pages[0]
# MediaBox gives [x1, y1, x2, y2] in points
mediabox = page.mediabox
width_pt = float(mediabox.width)
height_pt = float(mediabox.height)
logger.info(f"Extracted dimensions from PDF: {width_pt:.1f} x {height_pt:.1f} points")
return (width_pt, height_pt)
except ImportError:
logger.warning("PyPDF2 not available, cannot extract PDF dimensions")
except Exception as e:
logger.warning(f"Failed to extract PDF dimensions: {e}")
except Exception as e:
logger.warning(f"Failed to get page size from {file_path}: {e}")
return None
def _get_bbox_coords(self, bbox: Union[List[List[float]], List[float]]) -> Optional[Tuple[float, float, float, float]]:
"""將任何 bbox 格式 (多邊形或 [x1,y1,x2,y2]) 轉換為 [x_min, y_min, x_max, y_max]"""
try:
if isinstance(bbox[0], (list, tuple)):
# 處理多邊形 [[x, y], ...]
x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
if not x_coords or not y_coords:
return None
return min(x_coords), min(y_coords), max(x_coords), max(y_coords)
elif isinstance(bbox[0], (int, float)) and len(bbox) == 4:
# 處理 [x1, y1, x2, y2]
return bbox[0], bbox[1], bbox[2], bbox[3]
else:
logger.warning(f"未知的 bbox 格式: {bbox}")
return None
except Exception as e:
logger.error(f"解析 bbox {bbox} 時出錯: {e}")
return None
def _is_bbox_inside(self, inner_bbox_data: Dict, outer_bbox_data: Dict, tolerance: float = 5.0) -> bool:
"""
檢查 'inner_bbox' 是否在 'outer_bbox' 內部(帶有容錯)。
此版本可處理多邊形和矩形。
"""
inner_coords = self._get_bbox_coords(inner_bbox_data.get('bbox'))
outer_coords = self._get_bbox_coords(outer_bbox_data.get('bbox'))
if not inner_coords or not outer_coords:
return False
inner_x1, inner_y1, inner_x2, inner_y2 = inner_coords
outer_x1, outer_y1, outer_x2, outer_y2 = outer_coords
# 檢查 inner 是否在 outer 內部 (加入 tolerance)
is_inside = (
(inner_x1 >= outer_x1 - tolerance) and
(inner_y1 >= outer_y1 - tolerance) and
(inner_x2 <= outer_x2 + tolerance) and
(inner_y2 <= outer_y2 + tolerance)
)
return is_inside
def _bbox_overlaps(self, bbox1_data: Dict, bbox2_data: Dict, tolerance: float = 5.0) -> bool:
"""
檢查兩個 bbox 是否有重疊(帶有容錯)。
如果有任何重疊,返回 True。
Args:
bbox1_data: 第一個 bbox 數據
bbox2_data: 第二個 bbox 數據
tolerance: 容錯值(像素)
Returns:
True 如果兩個 bbox 有重疊
"""
coords1 = self._get_bbox_coords(bbox1_data.get('bbox'))
coords2 = self._get_bbox_coords(bbox2_data.get('bbox'))
if not coords1 or not coords2:
return False
x1_min, y1_min, x1_max, y1_max = coords1
x2_min, y2_min, x2_max, y2_max = coords2
# 擴展 bbox2表格/圖片區域)的範圍
x2_min -= tolerance
y2_min -= tolerance
x2_max += tolerance
y2_max += tolerance
# 檢查是否有重疊:如果沒有重疊,則必定滿足以下條件之一
no_overlap = (
x1_max < x2_min or # bbox1 在 bbox2 左側
x1_min > x2_max or # bbox1 在 bbox2 右側
y1_max < y2_min or # bbox1 在 bbox2 上方
y1_min > y2_max # bbox1 在 bbox2 下方
)
return not no_overlap
def _filter_text_in_regions(self, text_regions: List[Dict], regions_to_avoid: List[Dict], tolerance: float = 10.0) -> List[Dict]:
"""
過濾掉與 'regions_to_avoid'(例如表格、圖片)重疊的文字區域。
Args:
text_regions: 文字區域列表
regions_to_avoid: 需要避免的區域列表(表格、圖片)
tolerance: 容錯值(像素),增加到 10.0 以更好地處理邊界情況
Returns:
過濾後的文字區域列表
"""
filtered_text = []
filtered_count = 0
for text_region in text_regions:
should_filter = False
for avoid_region in regions_to_avoid:
# 使用重疊檢測:只要有任何重疊就過濾掉
if self._bbox_overlaps(text_region, avoid_region, tolerance=tolerance):
should_filter = True
filtered_count += 1
logger.debug(f"過濾掉重疊文字: {text_region.get('text', '')[:20]}...")
break # 找到一個重疊區域就足夠了
if not should_filter:
filtered_text.append(text_region)
logger.info(f"原始文字區域: {len(text_regions)}, 過濾後: {len(filtered_text)}, 移除: {filtered_count}")
return filtered_text
def draw_text_region(
self,
pdf_canvas: canvas.Canvas,
region: Dict,
page_height: float,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""
Draw a text region at precise coordinates
Args:
pdf_canvas: ReportLab canvas object
region: Text region dict with text, bbox, confidence
page_height: Height of page (for coordinate transformation)
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
"""
text = region.get('text', '')
bbox = region.get('bbox', [])
confidence = region.get('confidence', 1.0)
if not text or not bbox or len(bbox) < 4:
return
try:
# bbox from OCR: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
# Points: top-left, top-right, bottom-right, bottom-left
# OCR coordinates: origin (0,0) at top-left, Y increases downward
ocr_x_left = bbox[0][0] # Left X
ocr_y_top = bbox[0][1] # Top Y in OCR coordinates
ocr_x_right = bbox[2][0] # Right X
ocr_y_bottom = bbox[2][1] # Bottom Y in OCR coordinates
logger.info(f"[文字] '{text[:20]}...' OCR原始座標: L={ocr_x_left:.0f}, T={ocr_y_top:.0f}, R={ocr_x_right:.0f}, B={ocr_y_bottom:.0f}")
# Apply scale factors to convert from OCR space to PDF space
scaled_x_left = ocr_x_left * scale_w
scaled_y_top = ocr_y_top * scale_h
scaled_x_right = ocr_x_right * scale_w
scaled_y_bottom = ocr_y_bottom * scale_h
logger.info(f"[文字] '{text[:20]}...' 縮放後(scale={scale_w:.3f},{scale_h:.3f}): L={scaled_x_left:.1f}, T={scaled_y_top:.1f}, R={scaled_x_right:.1f}, B={scaled_y_bottom:.1f}")
# Calculate bbox dimensions (after scaling)
bbox_width = abs(scaled_x_right - scaled_x_left)
bbox_height = abs(scaled_y_bottom - scaled_y_top)
# Calculate font size using heuristics
# Font size is typically 70-90% of bbox height
# Testing shows 0.75 works well for most cases
font_size = bbox_height * 0.75
font_size = max(min(font_size, 72), 4) # Clamp between 4pt and 72pt
# Transform coordinates: OCR (top-left origin) → PDF (bottom-left origin)
# CRITICAL: Y-axis flip!
pdf_x = scaled_x_left
pdf_y = page_height - scaled_y_bottom # Flip Y-axis using bottom coordinate
logger.info(f"[文字] '{text[:30]}' → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 字體:{font_size:.1f}pt, 寬x高:{bbox_width:.0f}x{bbox_height:.0f}")
# Set font with track-specific styling
style_info = region.get('style')
is_direct_track = (self.current_processing_track == 'direct' or
self.current_processing_track == ProcessingTrack.DIRECT)
if style_info and is_direct_track:
# Direct track: Apply rich styling from StyleInfo
self._apply_text_style(pdf_canvas, style_info, default_size=font_size)
# Get current font for width calculation
font_name = pdf_canvas._fontname
font_size = pdf_canvas._fontsize
logger.debug(f"Applied Direct track style: font={font_name}, size={font_size}")
else:
# OCR track or no style: Use simple font selection
font_name = self.font_name if self.font_registered else 'Helvetica'
pdf_canvas.setFont(font_name, font_size)
# Calculate text width to prevent overflow
text_width = pdf_canvas.stringWidth(text, font_name, font_size)
# If text is too wide for bbox, scale down font
if text_width > bbox_width:
scale_factor = bbox_width / text_width
font_size = font_size * scale_factor * 0.95 # 95% to add small margin
font_size = max(font_size, 3) # Minimum 3pt
pdf_canvas.setFont(font_name, font_size)
# Draw text at calculated position
pdf_canvas.drawString(pdf_x, pdf_y, text)
# Debug: Draw bounding box (optional)
if settings.pdf_enable_bbox_debug:
pdf_canvas.setStrokeColorRGB(1, 0, 0, 0.3) # Red, semi-transparent
pdf_canvas.setLineWidth(0.5)
# Transform all bbox points to PDF coordinates (apply scaling first)
pdf_points = [(p[0] * scale_w, page_height - p[1] * scale_h) for p in bbox]
# Draw bbox rectangle
for i in range(4):
x1, y1 = pdf_points[i]
x2, y2 = pdf_points[(i + 1) % 4]
pdf_canvas.line(x1, y1, x2, y2)
except Exception as e:
logger.warning(f"Failed to draw text region '{text[:20]}...': {e}")
def draw_table_region(
self,
pdf_canvas: canvas.Canvas,
table_element: Dict,
images_metadata: List[Dict],
page_height: float,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""
Draw a table region by parsing HTML and rebuilding with ReportLab Table
Args:
pdf_canvas: ReportLab canvas object
table_element: Table element dict with HTML content
images_metadata: List of image metadata to find table bbox
page_height: Height of page
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
"""
try:
html_content = table_element.get('content', '')
if not html_content:
return
# Parse HTML to extract table structure
parser = HTMLTableParser()
parser.feed(html_content)
if not parser.tables:
logger.warning("No tables found in HTML content")
return
# Get the first table (PP-StructureV3 usually provides one table per element)
table_data = parser.tables[0]
rows = table_data['rows']
if not rows:
return
# Get bbox directly from table element
table_bbox = table_element.get('bbox')
# If no bbox directly, check for bbox_polygon
if not table_bbox:
bbox_polygon = table_element.get('bbox_polygon')
if bbox_polygon and len(bbox_polygon) >= 4:
# Convert polygon format to simple bbox [x0, y0, x1, y1]
table_bbox = [
bbox_polygon[0][0], # x0
bbox_polygon[0][1], # y0
bbox_polygon[2][0], # x1
bbox_polygon[2][1] # y1
]
if not table_bbox:
logger.warning(f"No bbox found for table element")
return
# Handle different bbox formats
if isinstance(table_bbox, list) and len(table_bbox) == 4:
# Simple bbox format [x0, y0, x1, y1]
if isinstance(table_bbox[0], (int, float)):
ocr_x_left_raw = table_bbox[0]
ocr_y_top_raw = table_bbox[1]
ocr_x_right_raw = table_bbox[2]
ocr_y_bottom_raw = table_bbox[3]
# Polygon format [[x,y], [x,y], [x,y], [x,y]]
elif isinstance(table_bbox[0], list):
ocr_x_left_raw = table_bbox[0][0]
ocr_y_top_raw = table_bbox[0][1]
ocr_x_right_raw = table_bbox[2][0]
ocr_y_bottom_raw = table_bbox[2][1]
else:
logger.error(f"Unexpected bbox format: {table_bbox}")
return
else:
logger.error(f"Invalid table_bbox format: {table_bbox}")
return
logger.info(f"[表格] OCR原始座標: L={ocr_x_left_raw:.0f}, T={ocr_y_top_raw:.0f}, R={ocr_x_right_raw:.0f}, B={ocr_y_bottom_raw:.0f}")
# Apply scaling
ocr_x_left = ocr_x_left_raw * scale_w
ocr_y_top = ocr_y_top_raw * scale_h
ocr_x_right = ocr_x_right_raw * scale_w
ocr_y_bottom = ocr_y_bottom_raw * scale_h
table_width = abs(ocr_x_right - ocr_x_left)
table_height = abs(ocr_y_bottom - ocr_y_top)
# Transform coordinates
pdf_x = ocr_x_left
pdf_y = page_height - ocr_y_bottom
# Build table data for ReportLab
# Convert parsed structure to simple 2D array
max_cols = max(len(row['cells']) for row in rows)
logger.info(f"[表格] {len(rows)}行x{max_cols}列 → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 寬x高: {table_width:.0f}x{table_height:.0f}")
reportlab_data = []
for row in rows:
row_data = []
for cell in row['cells']:
text = cell['text'].strip()
row_data.append(text)
# Pad row if needed
while len(row_data) < max_cols:
row_data.append('')
reportlab_data.append(row_data)
# Calculate column widths (equal distribution)
col_widths = [table_width / max_cols] * max_cols
# Create ReportLab Table
# Use smaller font size to fit in bbox
font_size = min(table_height / len(rows) * 0.5, 10)
font_size = max(font_size, 6)
# Create table with font
table = Table(reportlab_data, colWidths=col_widths)
# Apply table style
style = TableStyle([
('FONT', (0, 0), (-1, -1), self.font_name if self.font_registered else 'Helvetica', font_size),
('GRID', (0, 0), (-1, -1), 0.5, colors.black),
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('LEFTPADDING', (0, 0), (-1, -1), 2),
('RIGHTPADDING', (0, 0), (-1, -1), 2),
('TOPPADDING', (0, 0), (-1, -1), 2),
('BOTTOMPADDING', (0, 0), (-1, -1), 2),
])
# Add header style if first row has headers
if rows and rows[0]['cells'] and rows[0]['cells'][0].get('is_header'):
style.add('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey)
style.add('FONT', (0, 0), (-1, 0), self.font_name if self.font_registered else 'Helvetica-Bold', font_size)
table.setStyle(style)
# Calculate table size
table.wrapOn(pdf_canvas, table_width, table_height)
# Draw table at position
table.drawOn(pdf_canvas, pdf_x, pdf_y)
logger.info(f"Drew table at ({pdf_x:.0f}, {pdf_y:.0f}) size {table_width:.0f}x{table_height:.0f} with {len(rows)} rows")
except Exception as e:
logger.warning(f"Failed to draw table region: {e}")
import traceback
traceback.print_exc()
def draw_image_region(
self,
pdf_canvas: canvas.Canvas,
region: Dict,
page_height: float,
result_dir: Path,
scale_w: float = 1.0,
scale_h: float = 1.0
):
"""
Draw an image region by embedding the extracted image
Handles images extracted by PP-StructureV3 (tables, figures, charts, etc.)
Args:
pdf_canvas: ReportLab canvas object
region: Image metadata dict with image_path and bbox
page_height: Height of page (for coordinate transformation)
result_dir: Directory containing result files
scale_w: Scale factor for X coordinates (PDF width / OCR width)
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
"""
try:
image_path_str = region.get('image_path', '')
if not image_path_str:
return
# Construct full path to image
image_path = result_dir / image_path_str
if not image_path.exists():
logger.warning(f"Image not found: {image_path}")
return
# Get bbox for positioning
bbox = region.get('bbox', [])
if not bbox or len(bbox) < 4:
# If no bbox, skip for now
logger.warning(f"No bbox for image {image_path_str}")
return
# bbox from OCR: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
# OCR coordinates: origin (0,0) at top-left, Y increases downward
ocr_x_left_raw = bbox[0][0]
ocr_y_top_raw = bbox[0][1]
ocr_x_right_raw = bbox[2][0]
ocr_y_bottom_raw = bbox[2][1]
logger.info(f"[圖片] '{image_path_str}' OCR原始座標: L={ocr_x_left_raw:.0f}, T={ocr_y_top_raw:.0f}, R={ocr_x_right_raw:.0f}, B={ocr_y_bottom_raw:.0f}")
# Apply scaling
ocr_x_left = ocr_x_left_raw * scale_w
ocr_y_top = ocr_y_top_raw * scale_h
ocr_x_right = ocr_x_right_raw * scale_w
ocr_y_bottom = ocr_y_bottom_raw * scale_h
# Calculate bbox dimensions (after scaling)
bbox_width = abs(ocr_x_right - ocr_x_left)
bbox_height = abs(ocr_y_bottom - ocr_y_top)
# Transform coordinates: OCR (top-left origin) → PDF (bottom-left origin)
# CRITICAL: Y-axis flip!
# For images, we position at bottom-left corner
pdf_x_left = ocr_x_left
pdf_y_bottom = page_height - ocr_y_bottom # Flip Y-axis
logger.info(f"[圖片] '{image_path_str}' → PDF位置: ({pdf_x_left:.1f}, {pdf_y_bottom:.1f}), 寬x高: {bbox_width:.0f}x{bbox_height:.0f}")
# Draw image using ReportLab
# drawImage expects: (path, x, y, width, height)
# where (x, y) is the bottom-left corner of the image
pdf_canvas.drawImage(
str(image_path),
pdf_x_left,
pdf_y_bottom,
width=bbox_width,
height=bbox_height,
preserveAspectRatio=True,
mask='auto' # Handle transparency
)
logger.info(f"[圖片] ✓ 成功繪製 '{image_path_str}'")
except Exception as e:
logger.warning(f"Failed to draw image region: {e}")
def generate_layout_pdf(
self,
json_path: Path,
output_path: Path,
source_file_path: Optional[Path] = None
) -> bool:
"""
Generate layout-preserving PDF from OCR JSON data
Args:
json_path: Path to OCR JSON file
output_path: Path to save generated PDF
source_file_path: Optional path to original source file for dimension extraction
Returns:
True if successful, False otherwise
"""
try:
# Load JSON data
ocr_data = self.load_ocr_json(json_path)
if not ocr_data:
return False
# Use internal generation with pre-loaded data
return self._generate_pdf_from_data(
ocr_data=ocr_data,
output_path=output_path,
source_file_path=source_file_path,
json_parent_dir=json_path.parent
)
except Exception as e:
logger.error(f"Failed to generate PDF: {e}")
import traceback
traceback.print_exc()
return False
# Singleton instance
pdf_generator_service = PDFGeneratorService()