Fix bug introduced in previous commit where image_path=None caused
AttributeError when calling .lower() on None value.
**Problem**:
Setting image_path to None for table placeholders caused crashes at:
- Line 415: 'table' in img.get('image_path', '').lower()
- Line 453: 'table' not in img.get('image_path', '').lower()
When key exists but value is None, .get('image_path', '') returns None
(not default value), causing .lower() to fail.
**Solution**:
Use img.get('type') == 'table' to identify table entries instead of
checking image_path string. This is:
- More explicit and reliable
- Safer (no string operations on potentially None values)
- Cleaner code
**Changes**:
- Line 415: Check img.get('type') == 'table' for table count
- Line 453: Filter using img.get('type') != 'table' and image_path is not None
- Added informative log message showing table count
**Verification**:
draw_image_region already safely handles None/empty image_path (lines 1013-1015)
by returning early if not image_path_str.
Task 2.1 now fully functional without crashes.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
1116 lines
44 KiB
Python
1116 lines
44 KiB
Python
"""
|
||
Layout-Preserving PDF Generation Service
|
||
Generates PDF files that preserve the original document layout using OCR JSON data
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple, Union
|
||
from datetime import datetime
|
||
|
||
from reportlab.lib.pagesizes import A4, letter
|
||
from reportlab.lib.units import mm
|
||
from reportlab.pdfgen import canvas
|
||
from reportlab.pdfbase import pdfmetrics
|
||
from reportlab.pdfbase.ttfonts import TTFont
|
||
from reportlab.platypus import Table, TableStyle
|
||
from reportlab.lib import colors
|
||
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
|
||
from reportlab.platypus import Paragraph
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from PIL import Image
|
||
from html.parser import HTMLParser
|
||
|
||
from app.core.config import settings
|
||
|
||
# Import UnifiedDocument for dual-track support
|
||
try:
|
||
from app.models.unified_document import (
|
||
UnifiedDocument, DocumentElement, ElementType,
|
||
BoundingBox, TableData, ProcessingTrack
|
||
)
|
||
UNIFIED_DOCUMENT_AVAILABLE = True
|
||
except ImportError:
|
||
UNIFIED_DOCUMENT_AVAILABLE = False
|
||
UnifiedDocument = None
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class HTMLTableParser(HTMLParser):
|
||
"""Parse HTML table to extract structure and data"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.tables = []
|
||
self.current_table = None
|
||
self.current_row = None
|
||
self.current_cell = None
|
||
self.in_table = False
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
attrs_dict = dict(attrs)
|
||
|
||
if tag == 'table':
|
||
self.in_table = True
|
||
self.current_table = {'rows': []}
|
||
|
||
elif tag == 'tr' and self.in_table:
|
||
self.current_row = {'cells': []}
|
||
|
||
elif tag in ('td', 'th') and self.in_table and self.current_row is not None:
|
||
colspan = int(attrs_dict.get('colspan', 1))
|
||
rowspan = int(attrs_dict.get('rowspan', 1))
|
||
self.current_cell = {
|
||
'text': '',
|
||
'is_header': tag == 'th',
|
||
'colspan': colspan,
|
||
'rowspan': rowspan
|
||
}
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag == 'table' and self.in_table:
|
||
if self.current_table and self.current_table['rows']:
|
||
self.tables.append(self.current_table)
|
||
self.current_table = None
|
||
self.in_table = False
|
||
|
||
elif tag == 'tr' and self.current_row is not None:
|
||
if self.current_table is not None:
|
||
self.current_table['rows'].append(self.current_row)
|
||
self.current_row = None
|
||
|
||
elif tag in ('td', 'th') and self.current_cell is not None:
|
||
if self.current_row is not None:
|
||
self.current_row['cells'].append(self.current_cell)
|
||
self.current_cell = None
|
||
|
||
def handle_data(self, data):
|
||
if self.current_cell is not None:
|
||
self.current_cell['text'] += data.strip() + ' '
|
||
|
||
|
||
class PDFGeneratorService:
|
||
"""Service for generating layout-preserving PDFs from OCR JSON data"""
|
||
|
||
def __init__(self):
|
||
"""Initialize PDF generator with font configuration"""
|
||
self.font_name = 'NotoSansSC'
|
||
self.font_path = None
|
||
self.font_registered = False
|
||
|
||
self._register_chinese_font()
|
||
|
||
def _register_chinese_font(self):
|
||
"""Register Chinese font for PDF generation"""
|
||
try:
|
||
# Get font path from settings
|
||
font_path = Path(settings.chinese_font_path)
|
||
|
||
# Try relative path from project root
|
||
if not font_path.is_absolute():
|
||
# Adjust path - settings.chinese_font_path starts with ./backend/
|
||
project_root = Path(__file__).resolve().parent.parent.parent.parent
|
||
font_path = project_root / font_path
|
||
|
||
if not font_path.exists():
|
||
logger.error(f"Chinese font not found at {font_path}")
|
||
return
|
||
|
||
# Register font
|
||
pdfmetrics.registerFont(TTFont(self.font_name, str(font_path)))
|
||
self.font_path = font_path
|
||
self.font_registered = True
|
||
logger.info(f"Chinese font registered: {self.font_name} from {font_path}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to register Chinese font: {e}")
|
||
self.font_registered = False
|
||
|
||
def load_ocr_json(self, json_path: Path) -> Optional[Dict]:
|
||
"""
|
||
Load and parse OCR JSON result file
|
||
|
||
Args:
|
||
json_path: Path to JSON file
|
||
|
||
Returns:
|
||
Parsed JSON data or None if failed
|
||
"""
|
||
try:
|
||
with open(json_path, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
logger.info(f"Loaded OCR JSON: {json_path.name}")
|
||
return data
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to load JSON {json_path}: {e}")
|
||
return None
|
||
|
||
def _get_image_path(self, element) -> Optional[str]:
|
||
"""
|
||
Get image path with fallback logic.
|
||
|
||
Checks multiple locations in order:
|
||
1. element.content["saved_path"] - Direct track saved path
|
||
2. element.content["path"] - Legacy path
|
||
3. element.content["image_path"] - Alternative path
|
||
4. element.saved_path - Direct attribute
|
||
5. element.metadata["path"] - Metadata fallback
|
||
|
||
Args:
|
||
element: DocumentElement object
|
||
|
||
Returns:
|
||
Path to image file or None if not found
|
||
"""
|
||
# Check content dictionary
|
||
if isinstance(element.content, dict):
|
||
for key in ['saved_path', 'path', 'image_path']:
|
||
if key in element.content:
|
||
return element.content[key]
|
||
|
||
# Check direct attribute
|
||
if hasattr(element, 'saved_path') and element.saved_path:
|
||
return element.saved_path
|
||
|
||
# Check metadata
|
||
if element.metadata and isinstance(element.metadata, dict):
|
||
if 'path' in element.metadata:
|
||
return element.metadata['path']
|
||
if 'saved_path' in element.metadata:
|
||
return element.metadata['saved_path']
|
||
|
||
return None
|
||
|
||
def convert_unified_document_to_ocr_data(self, unified_doc: 'UnifiedDocument') -> Dict:
|
||
"""
|
||
Convert UnifiedDocument to OCR data format for PDF generation.
|
||
|
||
This method transforms the UnifiedDocument structure into the legacy
|
||
OCR data format that the PDF generator expects, supporting both
|
||
OCR and DIRECT processing tracks.
|
||
|
||
Args:
|
||
unified_doc: UnifiedDocument object from either processing track
|
||
|
||
Returns:
|
||
Dictionary in OCR data format with text_regions, images_metadata, layout_data
|
||
"""
|
||
text_regions = []
|
||
images_metadata = []
|
||
layout_elements = []
|
||
|
||
for page in unified_doc.pages:
|
||
page_num = page.page_number # 1-based
|
||
|
||
for element in page.elements:
|
||
# Convert BoundingBox to polygon format [[x,y], [x,y], [x,y], [x,y]]
|
||
bbox_polygon = [
|
||
[element.bbox.x0, element.bbox.y0], # top-left
|
||
[element.bbox.x1, element.bbox.y0], # top-right
|
||
[element.bbox.x1, element.bbox.y1], # bottom-right
|
||
[element.bbox.x0, element.bbox.y1], # bottom-left
|
||
]
|
||
|
||
# Handle text elements
|
||
if element.is_text or element.type in [
|
||
ElementType.TEXT, ElementType.TITLE, ElementType.HEADER,
|
||
ElementType.FOOTER, ElementType.PARAGRAPH, ElementType.CAPTION,
|
||
ElementType.LIST_ITEM, ElementType.FOOTNOTE, ElementType.REFERENCE
|
||
]:
|
||
text_content = element.get_text()
|
||
if text_content:
|
||
text_regions.append({
|
||
'text': text_content,
|
||
'bbox': bbox_polygon,
|
||
'confidence': element.confidence or 1.0,
|
||
'page': page_num
|
||
})
|
||
|
||
# Handle table elements
|
||
elif element.type == ElementType.TABLE:
|
||
# Convert TableData to HTML for layout_data
|
||
if isinstance(element.content, TableData):
|
||
html_content = element.content.to_html()
|
||
elif isinstance(element.content, dict):
|
||
html_content = element.content.get('html', str(element.content))
|
||
else:
|
||
html_content = str(element.content)
|
||
|
||
layout_elements.append({
|
||
'type': 'table',
|
||
'content': html_content,
|
||
'bbox': [element.bbox.x0, element.bbox.y0,
|
||
element.bbox.x1, element.bbox.y1],
|
||
'page': page_num - 1 # layout uses 0-based
|
||
})
|
||
|
||
# Add bbox to images_metadata for text overlap filtering
|
||
# (no actual image file, just bbox for filtering)
|
||
images_metadata.append({
|
||
'image_path': None, # No fake table image
|
||
'bbox': bbox_polygon,
|
||
'page': page_num - 1, # 0-based for images_metadata
|
||
'type': 'table',
|
||
'element_id': element.element_id
|
||
})
|
||
|
||
# Handle image/visual elements
|
||
elif element.is_visual or element.type in [
|
||
ElementType.IMAGE, ElementType.FIGURE, ElementType.CHART,
|
||
ElementType.DIAGRAM, ElementType.LOGO
|
||
]:
|
||
# Get image path using fallback logic
|
||
image_path = self._get_image_path(element)
|
||
|
||
# Only add if we found a valid path
|
||
if image_path:
|
||
images_metadata.append({
|
||
'image_path': image_path,
|
||
'bbox': bbox_polygon,
|
||
'page': page_num - 1, # 0-based
|
||
'type': element.type.value
|
||
})
|
||
logger.debug(f"Found image path: {image_path} for element {element.element_id}")
|
||
else:
|
||
logger.warning(f"No image path found for visual element {element.element_id}")
|
||
|
||
# Build OCR data structure
|
||
ocr_data = {
|
||
'text_regions': text_regions,
|
||
'images_metadata': images_metadata,
|
||
'layout_data': {
|
||
'elements': layout_elements,
|
||
'total_elements': len(layout_elements)
|
||
},
|
||
'total_pages': unified_doc.page_count,
|
||
'ocr_dimensions': {
|
||
'width': unified_doc.pages[0].dimensions.width if unified_doc.pages else 0,
|
||
'height': unified_doc.pages[0].dimensions.height if unified_doc.pages else 0
|
||
},
|
||
# Metadata for tracking
|
||
'_from_unified_document': True,
|
||
'_processing_track': unified_doc.metadata.processing_track.value
|
||
}
|
||
|
||
logger.info(f"Converted UnifiedDocument to OCR data: "
|
||
f"{len(text_regions)} text regions, "
|
||
f"{len(images_metadata)} images, "
|
||
f"{len(layout_elements)} layout elements, "
|
||
f"track={unified_doc.metadata.processing_track.value}")
|
||
|
||
return ocr_data
|
||
|
||
def generate_from_unified_document(
|
||
self,
|
||
unified_doc: 'UnifiedDocument',
|
||
output_path: Path,
|
||
source_file_path: Optional[Path] = None
|
||
) -> bool:
|
||
"""
|
||
Generate layout-preserving PDF directly from UnifiedDocument.
|
||
|
||
This method supports both OCR and DIRECT processing tracks,
|
||
preserving layout and coordinate information from either source.
|
||
|
||
Args:
|
||
unified_doc: UnifiedDocument object
|
||
output_path: Path to save generated PDF
|
||
source_file_path: Optional path to original source file
|
||
|
||
Returns:
|
||
True if successful, False otherwise
|
||
"""
|
||
if not UNIFIED_DOCUMENT_AVAILABLE:
|
||
logger.error("UnifiedDocument support not available")
|
||
return False
|
||
|
||
try:
|
||
# Convert UnifiedDocument to OCR data format
|
||
ocr_data = self.convert_unified_document_to_ocr_data(unified_doc)
|
||
|
||
# Use internal generation with pre-loaded data
|
||
return self._generate_pdf_from_data(
|
||
ocr_data=ocr_data,
|
||
output_path=output_path,
|
||
source_file_path=source_file_path
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate PDF from UnifiedDocument: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def _generate_pdf_from_data(
|
||
self,
|
||
ocr_data: Dict,
|
||
output_path: Path,
|
||
source_file_path: Optional[Path] = None,
|
||
json_parent_dir: Optional[Path] = None
|
||
) -> bool:
|
||
"""
|
||
Internal method to generate PDF from OCR data dictionary.
|
||
|
||
This is the core generation logic extracted for reuse by both
|
||
JSON-based and UnifiedDocument-based generation paths.
|
||
|
||
Args:
|
||
ocr_data: OCR data dictionary
|
||
output_path: Path to save generated PDF
|
||
source_file_path: Optional path to original source file
|
||
json_parent_dir: Directory containing images (for JSON-based generation)
|
||
|
||
Returns:
|
||
True if successful, False otherwise
|
||
"""
|
||
try:
|
||
# Check if PDF already exists (caching)
|
||
if output_path.exists():
|
||
logger.info(f"PDF already exists: {output_path.name}")
|
||
return True
|
||
|
||
# Get text regions
|
||
text_regions = ocr_data.get('text_regions', [])
|
||
if not text_regions:
|
||
logger.warning("No text regions found in data")
|
||
# Don't fail - might have only tables/images
|
||
|
||
# Get images metadata
|
||
images_metadata = ocr_data.get('images_metadata', [])
|
||
|
||
# Get layout data
|
||
layout_data = ocr_data.get('layout_data', {})
|
||
|
||
# Step 1: Get OCR processing dimensions
|
||
ocr_width, ocr_height = self.calculate_page_dimensions(ocr_data, source_file_path=None)
|
||
logger.info(f"OCR 處理時使用的座標系尺寸: {ocr_width:.1f} x {ocr_height:.1f}")
|
||
|
||
# Step 2: Get target PDF dimensions
|
||
if source_file_path:
|
||
target_dims = self.get_original_page_size(source_file_path)
|
||
if target_dims:
|
||
target_width, target_height = target_dims
|
||
logger.info(f"目標 PDF 尺寸(來自原始文件): {target_width:.1f} x {target_height:.1f}")
|
||
else:
|
||
target_width, target_height = ocr_width, ocr_height
|
||
logger.warning(f"無法獲取原始文件尺寸,使用 OCR 尺寸作為目標")
|
||
else:
|
||
target_width, target_height = ocr_width, ocr_height
|
||
logger.info(f"無原始文件,使用 OCR 尺寸作為目標: {target_width:.1f} x {target_height:.1f}")
|
||
|
||
# Step 3: Calculate scale factors
|
||
scale_w = target_width / ocr_width if ocr_width > 0 else 1.0
|
||
scale_h = target_height / ocr_height if ocr_height > 0 else 1.0
|
||
logger.info(f"縮放因子: X={scale_w:.3f}, Y={scale_h:.3f}")
|
||
|
||
# Create PDF canvas
|
||
pdf_canvas = canvas.Canvas(str(output_path), pagesize=(target_width, target_height))
|
||
|
||
# Filter text regions to avoid overlap with tables/images
|
||
regions_to_avoid = images_metadata
|
||
table_count = len([img for img in images_metadata if img.get('type') == 'table'])
|
||
|
||
logger.info(f"過濾文字區域: {len(regions_to_avoid)} 個區域需要避免 (含 {table_count} 個表格)")
|
||
|
||
filtered_text_regions = self._filter_text_in_regions(text_regions, regions_to_avoid)
|
||
|
||
# Group regions by page
|
||
pages_data = {}
|
||
for region in filtered_text_regions:
|
||
page_num = region.get('page', 1)
|
||
if page_num not in pages_data:
|
||
pages_data[page_num] = []
|
||
pages_data[page_num].append(region)
|
||
|
||
# Get table elements from layout_data
|
||
table_elements = []
|
||
if layout_data and layout_data.get('elements'):
|
||
table_elements = [e for e in layout_data['elements'] if e.get('type') == 'table']
|
||
|
||
# Process each page
|
||
total_pages = ocr_data.get('total_pages', 1)
|
||
logger.info(f"開始處理 {total_pages} 頁 PDF")
|
||
|
||
# Determine image directory
|
||
if json_parent_dir is None:
|
||
json_parent_dir = output_path.parent
|
||
|
||
for page_num in range(1, total_pages + 1):
|
||
logger.info(f">>> 處理第 {page_num}/{total_pages} 頁")
|
||
if page_num > 1:
|
||
pdf_canvas.showPage()
|
||
|
||
# Get regions for this page
|
||
page_text_regions = pages_data.get(page_num, [])
|
||
page_table_regions = [t for t in table_elements if t.get('page') == page_num - 1]
|
||
page_image_regions = [
|
||
img for img in images_metadata
|
||
if img.get('page') == page_num - 1
|
||
and img.get('type') != 'table'
|
||
and img.get('image_path') is not None # Skip table placeholders
|
||
]
|
||
|
||
# Draw in layers: images → tables → text
|
||
|
||
# 1. Draw images (bottom layer)
|
||
for img_meta in page_image_regions:
|
||
self.draw_image_region(
|
||
pdf_canvas, img_meta, target_height,
|
||
json_parent_dir, scale_w, scale_h
|
||
)
|
||
|
||
# 2. Draw tables (middle layer)
|
||
for table_elem in page_table_regions:
|
||
self.draw_table_region(
|
||
pdf_canvas, table_elem, images_metadata,
|
||
target_height, scale_w, scale_h
|
||
)
|
||
|
||
# 3. Draw text (top layer)
|
||
for region in page_text_regions:
|
||
self.draw_text_region(
|
||
pdf_canvas, region, target_height,
|
||
scale_w, scale_h
|
||
)
|
||
|
||
logger.info(f"<<< 第 {page_num} 頁完成")
|
||
|
||
# Save PDF
|
||
pdf_canvas.save()
|
||
|
||
file_size = output_path.stat().st_size
|
||
logger.info(f"Generated PDF: {output_path.name} ({file_size} bytes)")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate PDF: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def calculate_page_dimensions(self, ocr_data: Dict, source_file_path: Optional[Path] = None) -> Tuple[float, float]:
|
||
"""
|
||
從 OCR JSON 數據中推斷 OCR 處理時的實際頁面尺寸。
|
||
這非常重要,因為 OCR 可能在高解析度影像上運行。
|
||
|
||
Args:
|
||
ocr_data: Complete OCR data dictionary with text_regions and layout
|
||
source_file_path: Optional path to source file (fallback only)
|
||
|
||
Returns:
|
||
Tuple of (width, height) in points
|
||
"""
|
||
max_x = 0
|
||
max_y = 0
|
||
|
||
# *** 關鍵修復:檢查所有可能包含 bbox 的字段 ***
|
||
# 不同版本的 OCR 輸出可能使用不同的字段名
|
||
all_regions = []
|
||
|
||
# 1. text_regions - 包含所有文字區域(最常見)
|
||
if 'text_regions' in ocr_data and isinstance(ocr_data['text_regions'], list):
|
||
all_regions.extend(ocr_data['text_regions'])
|
||
|
||
# 2. image_regions - 包含圖片區域
|
||
if 'image_regions' in ocr_data and isinstance(ocr_data['image_regions'], list):
|
||
all_regions.extend(ocr_data['image_regions'])
|
||
|
||
# 3. tables - 包含表格區域
|
||
if 'tables' in ocr_data and isinstance(ocr_data['tables'], list):
|
||
all_regions.extend(ocr_data['tables'])
|
||
|
||
# 4. layout - 可能包含布局信息(可能是空列表)
|
||
if 'layout' in ocr_data and isinstance(ocr_data['layout'], list):
|
||
all_regions.extend(ocr_data['layout'])
|
||
|
||
# 5. layout_data.elements - PP-StructureV3 格式
|
||
if 'layout_data' in ocr_data and isinstance(ocr_data['layout_data'], dict):
|
||
elements = ocr_data['layout_data'].get('elements', [])
|
||
if elements:
|
||
all_regions.extend(elements)
|
||
|
||
if not all_regions:
|
||
# 如果 JSON 為空,回退到原始檔案尺寸
|
||
logger.warning("JSON 中沒有找到 text_regions, image_regions, tables, layout 或 layout_data.elements,回退到原始檔案尺寸。")
|
||
if source_file_path:
|
||
dims = self.get_original_page_size(source_file_path)
|
||
if dims:
|
||
return dims
|
||
return A4
|
||
|
||
region_count = 0
|
||
for region in all_regions:
|
||
try:
|
||
bbox = region.get('bbox')
|
||
if not bbox:
|
||
continue
|
||
|
||
region_count += 1
|
||
|
||
# *** 關鍵修復:正確處理多邊形 [[x, y], ...] 格式 ***
|
||
if isinstance(bbox[0], (int, float)):
|
||
# 處理簡單的 [x1, y1, x2, y2] 格式
|
||
max_x = max(max_x, bbox[2])
|
||
max_y = max(max_y, bbox[3])
|
||
elif isinstance(bbox[0], (list, tuple)):
|
||
# 處理多邊形 [[x, y], ...] 格式
|
||
x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
|
||
y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
|
||
if x_coords and y_coords:
|
||
max_x = max(max_x, max(x_coords))
|
||
max_y = max(max_y, max(y_coords))
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Error processing bbox {bbox}: {e}")
|
||
|
||
if max_x > 0 and max_y > 0:
|
||
logger.info(f"從 {region_count} 個區域中推斷出的 OCR 座標系尺寸: {max_x:.1f} x {max_y:.1f}")
|
||
return (max_x, max_y)
|
||
else:
|
||
# 如果所有 bbox 都解析失敗,才回退
|
||
logger.warning("無法從 bbox 推斷尺寸,回退到原始檔案尺寸。")
|
||
if source_file_path:
|
||
dims = self.get_original_page_size(source_file_path)
|
||
if dims:
|
||
return dims
|
||
return A4
|
||
|
||
def get_original_page_size(self, file_path: Path) -> Optional[Tuple[float, float]]:
|
||
"""
|
||
Extract page dimensions from original source file
|
||
|
||
Args:
|
||
file_path: Path to original file (image or PDF)
|
||
|
||
Returns:
|
||
Tuple of (width, height) in points or None
|
||
"""
|
||
try:
|
||
if not file_path.exists():
|
||
return None
|
||
|
||
# For images, get dimensions from PIL
|
||
if file_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
|
||
img = Image.open(file_path)
|
||
# Use pixel dimensions directly as points (1:1 mapping)
|
||
# This matches how PaddleOCR reports coordinates
|
||
width_pt = float(img.width)
|
||
height_pt = float(img.height)
|
||
logger.info(f"Extracted dimensions from image: {width_pt:.1f} x {height_pt:.1f} points (1:1 pixel mapping)")
|
||
return (width_pt, height_pt)
|
||
|
||
# For PDFs, extract dimensions using PyPDF2
|
||
if file_path.suffix.lower() == '.pdf':
|
||
try:
|
||
from PyPDF2 import PdfReader
|
||
reader = PdfReader(file_path)
|
||
if len(reader.pages) > 0:
|
||
page = reader.pages[0]
|
||
# MediaBox gives [x1, y1, x2, y2] in points
|
||
mediabox = page.mediabox
|
||
width_pt = float(mediabox.width)
|
||
height_pt = float(mediabox.height)
|
||
logger.info(f"Extracted dimensions from PDF: {width_pt:.1f} x {height_pt:.1f} points")
|
||
return (width_pt, height_pt)
|
||
except ImportError:
|
||
logger.warning("PyPDF2 not available, cannot extract PDF dimensions")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to extract PDF dimensions: {e}")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to get page size from {file_path}: {e}")
|
||
|
||
return None
|
||
|
||
def _get_bbox_coords(self, bbox: Union[List[List[float]], List[float]]) -> Optional[Tuple[float, float, float, float]]:
|
||
"""將任何 bbox 格式 (多邊形或 [x1,y1,x2,y2]) 轉換為 [x_min, y_min, x_max, y_max]"""
|
||
try:
|
||
if isinstance(bbox[0], (list, tuple)):
|
||
# 處理多邊形 [[x, y], ...]
|
||
x_coords = [p[0] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
|
||
y_coords = [p[1] for p in bbox if isinstance(p, (list, tuple)) and len(p) >= 2]
|
||
if not x_coords or not y_coords:
|
||
return None
|
||
return min(x_coords), min(y_coords), max(x_coords), max(y_coords)
|
||
elif isinstance(bbox[0], (int, float)) and len(bbox) == 4:
|
||
# 處理 [x1, y1, x2, y2]
|
||
return bbox[0], bbox[1], bbox[2], bbox[3]
|
||
else:
|
||
logger.warning(f"未知的 bbox 格式: {bbox}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"解析 bbox {bbox} 時出錯: {e}")
|
||
return None
|
||
|
||
def _is_bbox_inside(self, inner_bbox_data: Dict, outer_bbox_data: Dict, tolerance: float = 5.0) -> bool:
|
||
"""
|
||
檢查 'inner_bbox' 是否在 'outer_bbox' 內部(帶有容錯)。
|
||
此版本可處理多邊形和矩形。
|
||
"""
|
||
inner_coords = self._get_bbox_coords(inner_bbox_data.get('bbox'))
|
||
outer_coords = self._get_bbox_coords(outer_bbox_data.get('bbox'))
|
||
|
||
if not inner_coords or not outer_coords:
|
||
return False
|
||
|
||
inner_x1, inner_y1, inner_x2, inner_y2 = inner_coords
|
||
outer_x1, outer_y1, outer_x2, outer_y2 = outer_coords
|
||
|
||
# 檢查 inner 是否在 outer 內部 (加入 tolerance)
|
||
is_inside = (
|
||
(inner_x1 >= outer_x1 - tolerance) and
|
||
(inner_y1 >= outer_y1 - tolerance) and
|
||
(inner_x2 <= outer_x2 + tolerance) and
|
||
(inner_y2 <= outer_y2 + tolerance)
|
||
)
|
||
return is_inside
|
||
|
||
def _bbox_overlaps(self, bbox1_data: Dict, bbox2_data: Dict, tolerance: float = 5.0) -> bool:
|
||
"""
|
||
檢查兩個 bbox 是否有重疊(帶有容錯)。
|
||
如果有任何重疊,返回 True。
|
||
|
||
Args:
|
||
bbox1_data: 第一個 bbox 數據
|
||
bbox2_data: 第二個 bbox 數據
|
||
tolerance: 容錯值(像素)
|
||
|
||
Returns:
|
||
True 如果兩個 bbox 有重疊
|
||
"""
|
||
coords1 = self._get_bbox_coords(bbox1_data.get('bbox'))
|
||
coords2 = self._get_bbox_coords(bbox2_data.get('bbox'))
|
||
|
||
if not coords1 or not coords2:
|
||
return False
|
||
|
||
x1_min, y1_min, x1_max, y1_max = coords1
|
||
x2_min, y2_min, x2_max, y2_max = coords2
|
||
|
||
# 擴展 bbox2(表格/圖片區域)的範圍
|
||
x2_min -= tolerance
|
||
y2_min -= tolerance
|
||
x2_max += tolerance
|
||
y2_max += tolerance
|
||
|
||
# 檢查是否有重疊:如果沒有重疊,則必定滿足以下條件之一
|
||
no_overlap = (
|
||
x1_max < x2_min or # bbox1 在 bbox2 左側
|
||
x1_min > x2_max or # bbox1 在 bbox2 右側
|
||
y1_max < y2_min or # bbox1 在 bbox2 上方
|
||
y1_min > y2_max # bbox1 在 bbox2 下方
|
||
)
|
||
|
||
return not no_overlap
|
||
|
||
def _filter_text_in_regions(self, text_regions: List[Dict], regions_to_avoid: List[Dict], tolerance: float = 10.0) -> List[Dict]:
|
||
"""
|
||
過濾掉與 'regions_to_avoid'(例如表格、圖片)重疊的文字區域。
|
||
|
||
Args:
|
||
text_regions: 文字區域列表
|
||
regions_to_avoid: 需要避免的區域列表(表格、圖片)
|
||
tolerance: 容錯值(像素),增加到 10.0 以更好地處理邊界情況
|
||
|
||
Returns:
|
||
過濾後的文字區域列表
|
||
"""
|
||
filtered_text = []
|
||
filtered_count = 0
|
||
|
||
for text_region in text_regions:
|
||
should_filter = False
|
||
|
||
for avoid_region in regions_to_avoid:
|
||
# 使用重疊檢測:只要有任何重疊就過濾掉
|
||
if self._bbox_overlaps(text_region, avoid_region, tolerance=tolerance):
|
||
should_filter = True
|
||
filtered_count += 1
|
||
logger.debug(f"過濾掉重疊文字: {text_region.get('text', '')[:20]}...")
|
||
break # 找到一個重疊區域就足夠了
|
||
|
||
if not should_filter:
|
||
filtered_text.append(text_region)
|
||
|
||
logger.info(f"原始文字區域: {len(text_regions)}, 過濾後: {len(filtered_text)}, 移除: {filtered_count}")
|
||
return filtered_text
|
||
|
||
def draw_text_region(
|
||
self,
|
||
pdf_canvas: canvas.Canvas,
|
||
region: Dict,
|
||
page_height: float,
|
||
scale_w: float = 1.0,
|
||
scale_h: float = 1.0
|
||
):
|
||
"""
|
||
Draw a text region at precise coordinates
|
||
|
||
Args:
|
||
pdf_canvas: ReportLab canvas object
|
||
region: Text region dict with text, bbox, confidence
|
||
page_height: Height of page (for coordinate transformation)
|
||
scale_w: Scale factor for X coordinates (PDF width / OCR width)
|
||
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
|
||
"""
|
||
text = region.get('text', '')
|
||
bbox = region.get('bbox', [])
|
||
confidence = region.get('confidence', 1.0)
|
||
|
||
if not text or not bbox or len(bbox) < 4:
|
||
return
|
||
|
||
try:
|
||
# bbox from OCR: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
|
||
# Points: top-left, top-right, bottom-right, bottom-left
|
||
# OCR coordinates: origin (0,0) at top-left, Y increases downward
|
||
ocr_x_left = bbox[0][0] # Left X
|
||
ocr_y_top = bbox[0][1] # Top Y in OCR coordinates
|
||
ocr_x_right = bbox[2][0] # Right X
|
||
ocr_y_bottom = bbox[2][1] # Bottom Y in OCR coordinates
|
||
|
||
logger.info(f"[文字] '{text[:20]}...' OCR原始座標: L={ocr_x_left:.0f}, T={ocr_y_top:.0f}, R={ocr_x_right:.0f}, B={ocr_y_bottom:.0f}")
|
||
|
||
# Apply scale factors to convert from OCR space to PDF space
|
||
scaled_x_left = ocr_x_left * scale_w
|
||
scaled_y_top = ocr_y_top * scale_h
|
||
scaled_x_right = ocr_x_right * scale_w
|
||
scaled_y_bottom = ocr_y_bottom * scale_h
|
||
|
||
logger.info(f"[文字] '{text[:20]}...' 縮放後(scale={scale_w:.3f},{scale_h:.3f}): L={scaled_x_left:.1f}, T={scaled_y_top:.1f}, R={scaled_x_right:.1f}, B={scaled_y_bottom:.1f}")
|
||
|
||
# Calculate bbox dimensions (after scaling)
|
||
bbox_width = abs(scaled_x_right - scaled_x_left)
|
||
bbox_height = abs(scaled_y_bottom - scaled_y_top)
|
||
|
||
# Calculate font size using heuristics
|
||
# Font size is typically 70-90% of bbox height
|
||
# Testing shows 0.75 works well for most cases
|
||
font_size = bbox_height * 0.75
|
||
font_size = max(min(font_size, 72), 4) # Clamp between 4pt and 72pt
|
||
|
||
# Transform coordinates: OCR (top-left origin) → PDF (bottom-left origin)
|
||
# CRITICAL: Y-axis flip!
|
||
pdf_x = scaled_x_left
|
||
pdf_y = page_height - scaled_y_bottom # Flip Y-axis using bottom coordinate
|
||
|
||
logger.info(f"[文字] '{text[:30]}' → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 字體:{font_size:.1f}pt, 寬x高:{bbox_width:.0f}x{bbox_height:.0f}")
|
||
|
||
# Set font
|
||
font_name = self.font_name if self.font_registered else 'Helvetica'
|
||
pdf_canvas.setFont(font_name, font_size)
|
||
|
||
# Calculate text width to prevent overflow
|
||
text_width = pdf_canvas.stringWidth(text, font_name, font_size)
|
||
|
||
# If text is too wide for bbox, scale down font
|
||
if text_width > bbox_width:
|
||
scale_factor = bbox_width / text_width
|
||
font_size = font_size * scale_factor * 0.95 # 95% to add small margin
|
||
font_size = max(font_size, 3) # Minimum 3pt
|
||
pdf_canvas.setFont(font_name, font_size)
|
||
|
||
# Draw text at calculated position
|
||
pdf_canvas.drawString(pdf_x, pdf_y, text)
|
||
|
||
# Debug: Draw bounding box (optional)
|
||
if settings.pdf_enable_bbox_debug:
|
||
pdf_canvas.setStrokeColorRGB(1, 0, 0, 0.3) # Red, semi-transparent
|
||
pdf_canvas.setLineWidth(0.5)
|
||
# Transform all bbox points to PDF coordinates (apply scaling first)
|
||
pdf_points = [(p[0] * scale_w, page_height - p[1] * scale_h) for p in bbox]
|
||
# Draw bbox rectangle
|
||
for i in range(4):
|
||
x1, y1 = pdf_points[i]
|
||
x2, y2 = pdf_points[(i + 1) % 4]
|
||
pdf_canvas.line(x1, y1, x2, y2)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to draw text region '{text[:20]}...': {e}")
|
||
|
||
def draw_table_region(
|
||
self,
|
||
pdf_canvas: canvas.Canvas,
|
||
table_element: Dict,
|
||
images_metadata: List[Dict],
|
||
page_height: float,
|
||
scale_w: float = 1.0,
|
||
scale_h: float = 1.0
|
||
):
|
||
"""
|
||
Draw a table region by parsing HTML and rebuilding with ReportLab Table
|
||
|
||
Args:
|
||
pdf_canvas: ReportLab canvas object
|
||
table_element: Table element dict with HTML content
|
||
images_metadata: List of image metadata to find table bbox
|
||
page_height: Height of page
|
||
scale_w: Scale factor for X coordinates (PDF width / OCR width)
|
||
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
|
||
"""
|
||
try:
|
||
html_content = table_element.get('content', '')
|
||
if not html_content:
|
||
return
|
||
|
||
# Parse HTML to extract table structure
|
||
parser = HTMLTableParser()
|
||
parser.feed(html_content)
|
||
|
||
if not parser.tables:
|
||
logger.warning("No tables found in HTML content")
|
||
return
|
||
|
||
# Get the first table (PP-StructureV3 usually provides one table per element)
|
||
table_data = parser.tables[0]
|
||
rows = table_data['rows']
|
||
|
||
if not rows:
|
||
return
|
||
|
||
# Get bbox directly from table element
|
||
table_bbox = table_element.get('bbox')
|
||
|
||
# If no bbox directly, check for bbox_polygon
|
||
if not table_bbox:
|
||
bbox_polygon = table_element.get('bbox_polygon')
|
||
if bbox_polygon and len(bbox_polygon) >= 4:
|
||
# Convert polygon format to simple bbox [x0, y0, x1, y1]
|
||
table_bbox = [
|
||
bbox_polygon[0][0], # x0
|
||
bbox_polygon[0][1], # y0
|
||
bbox_polygon[2][0], # x1
|
||
bbox_polygon[2][1] # y1
|
||
]
|
||
|
||
if not table_bbox:
|
||
logger.warning(f"No bbox found for table element")
|
||
return
|
||
|
||
# Handle different bbox formats
|
||
if isinstance(table_bbox, list) and len(table_bbox) == 4:
|
||
# Simple bbox format [x0, y0, x1, y1]
|
||
if isinstance(table_bbox[0], (int, float)):
|
||
ocr_x_left_raw = table_bbox[0]
|
||
ocr_y_top_raw = table_bbox[1]
|
||
ocr_x_right_raw = table_bbox[2]
|
||
ocr_y_bottom_raw = table_bbox[3]
|
||
# Polygon format [[x,y], [x,y], [x,y], [x,y]]
|
||
elif isinstance(table_bbox[0], list):
|
||
ocr_x_left_raw = table_bbox[0][0]
|
||
ocr_y_top_raw = table_bbox[0][1]
|
||
ocr_x_right_raw = table_bbox[2][0]
|
||
ocr_y_bottom_raw = table_bbox[2][1]
|
||
else:
|
||
logger.error(f"Unexpected bbox format: {table_bbox}")
|
||
return
|
||
else:
|
||
logger.error(f"Invalid table_bbox format: {table_bbox}")
|
||
return
|
||
|
||
logger.info(f"[表格] OCR原始座標: L={ocr_x_left_raw:.0f}, T={ocr_y_top_raw:.0f}, R={ocr_x_right_raw:.0f}, B={ocr_y_bottom_raw:.0f}")
|
||
|
||
# Apply scaling
|
||
ocr_x_left = ocr_x_left_raw * scale_w
|
||
ocr_y_top = ocr_y_top_raw * scale_h
|
||
ocr_x_right = ocr_x_right_raw * scale_w
|
||
ocr_y_bottom = ocr_y_bottom_raw * scale_h
|
||
|
||
table_width = abs(ocr_x_right - ocr_x_left)
|
||
table_height = abs(ocr_y_bottom - ocr_y_top)
|
||
|
||
# Transform coordinates
|
||
pdf_x = ocr_x_left
|
||
pdf_y = page_height - ocr_y_bottom
|
||
|
||
# Build table data for ReportLab
|
||
# Convert parsed structure to simple 2D array
|
||
max_cols = max(len(row['cells']) for row in rows)
|
||
|
||
logger.info(f"[表格] {len(rows)}行x{max_cols}列 → PDF位置: ({pdf_x:.1f}, {pdf_y:.1f}), 寬x高: {table_width:.0f}x{table_height:.0f}")
|
||
reportlab_data = []
|
||
|
||
for row in rows:
|
||
row_data = []
|
||
for cell in row['cells']:
|
||
text = cell['text'].strip()
|
||
row_data.append(text)
|
||
# Pad row if needed
|
||
while len(row_data) < max_cols:
|
||
row_data.append('')
|
||
reportlab_data.append(row_data)
|
||
|
||
# Calculate column widths (equal distribution)
|
||
col_widths = [table_width / max_cols] * max_cols
|
||
|
||
# Create ReportLab Table
|
||
# Use smaller font size to fit in bbox
|
||
font_size = min(table_height / len(rows) * 0.5, 10)
|
||
font_size = max(font_size, 6)
|
||
|
||
# Create table with font
|
||
table = Table(reportlab_data, colWidths=col_widths)
|
||
|
||
# Apply table style
|
||
style = TableStyle([
|
||
('FONT', (0, 0), (-1, -1), self.font_name if self.font_registered else 'Helvetica', font_size),
|
||
('GRID', (0, 0), (-1, -1), 0.5, colors.black),
|
||
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
|
||
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
|
||
('LEFTPADDING', (0, 0), (-1, -1), 2),
|
||
('RIGHTPADDING', (0, 0), (-1, -1), 2),
|
||
('TOPPADDING', (0, 0), (-1, -1), 2),
|
||
('BOTTOMPADDING', (0, 0), (-1, -1), 2),
|
||
])
|
||
|
||
# Add header style if first row has headers
|
||
if rows and rows[0]['cells'] and rows[0]['cells'][0].get('is_header'):
|
||
style.add('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey)
|
||
style.add('FONT', (0, 0), (-1, 0), self.font_name if self.font_registered else 'Helvetica-Bold', font_size)
|
||
|
||
table.setStyle(style)
|
||
|
||
# Calculate table size
|
||
table.wrapOn(pdf_canvas, table_width, table_height)
|
||
|
||
# Draw table at position
|
||
table.drawOn(pdf_canvas, pdf_x, pdf_y)
|
||
|
||
logger.info(f"Drew table at ({pdf_x:.0f}, {pdf_y:.0f}) size {table_width:.0f}x{table_height:.0f} with {len(rows)} rows")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to draw table region: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def draw_image_region(
|
||
self,
|
||
pdf_canvas: canvas.Canvas,
|
||
region: Dict,
|
||
page_height: float,
|
||
result_dir: Path,
|
||
scale_w: float = 1.0,
|
||
scale_h: float = 1.0
|
||
):
|
||
"""
|
||
Draw an image region by embedding the extracted image
|
||
|
||
Handles images extracted by PP-StructureV3 (tables, figures, charts, etc.)
|
||
|
||
Args:
|
||
pdf_canvas: ReportLab canvas object
|
||
region: Image metadata dict with image_path and bbox
|
||
page_height: Height of page (for coordinate transformation)
|
||
result_dir: Directory containing result files
|
||
scale_w: Scale factor for X coordinates (PDF width / OCR width)
|
||
scale_h: Scale factor for Y coordinates (PDF height / OCR height)
|
||
"""
|
||
try:
|
||
image_path_str = region.get('image_path', '')
|
||
if not image_path_str:
|
||
return
|
||
|
||
# Construct full path to image
|
||
image_path = result_dir / image_path_str
|
||
|
||
if not image_path.exists():
|
||
logger.warning(f"Image not found: {image_path}")
|
||
return
|
||
|
||
# Get bbox for positioning
|
||
bbox = region.get('bbox', [])
|
||
if not bbox or len(bbox) < 4:
|
||
# If no bbox, skip for now
|
||
logger.warning(f"No bbox for image {image_path_str}")
|
||
return
|
||
|
||
# bbox from OCR: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
|
||
# OCR coordinates: origin (0,0) at top-left, Y increases downward
|
||
ocr_x_left_raw = bbox[0][0]
|
||
ocr_y_top_raw = bbox[0][1]
|
||
ocr_x_right_raw = bbox[2][0]
|
||
ocr_y_bottom_raw = bbox[2][1]
|
||
|
||
logger.info(f"[圖片] '{image_path_str}' OCR原始座標: L={ocr_x_left_raw:.0f}, T={ocr_y_top_raw:.0f}, R={ocr_x_right_raw:.0f}, B={ocr_y_bottom_raw:.0f}")
|
||
|
||
# Apply scaling
|
||
ocr_x_left = ocr_x_left_raw * scale_w
|
||
ocr_y_top = ocr_y_top_raw * scale_h
|
||
ocr_x_right = ocr_x_right_raw * scale_w
|
||
ocr_y_bottom = ocr_y_bottom_raw * scale_h
|
||
|
||
# Calculate bbox dimensions (after scaling)
|
||
bbox_width = abs(ocr_x_right - ocr_x_left)
|
||
bbox_height = abs(ocr_y_bottom - ocr_y_top)
|
||
|
||
# Transform coordinates: OCR (top-left origin) → PDF (bottom-left origin)
|
||
# CRITICAL: Y-axis flip!
|
||
# For images, we position at bottom-left corner
|
||
pdf_x_left = ocr_x_left
|
||
pdf_y_bottom = page_height - ocr_y_bottom # Flip Y-axis
|
||
|
||
logger.info(f"[圖片] '{image_path_str}' → PDF位置: ({pdf_x_left:.1f}, {pdf_y_bottom:.1f}), 寬x高: {bbox_width:.0f}x{bbox_height:.0f}")
|
||
|
||
# Draw image using ReportLab
|
||
# drawImage expects: (path, x, y, width, height)
|
||
# where (x, y) is the bottom-left corner of the image
|
||
pdf_canvas.drawImage(
|
||
str(image_path),
|
||
pdf_x_left,
|
||
pdf_y_bottom,
|
||
width=bbox_width,
|
||
height=bbox_height,
|
||
preserveAspectRatio=True,
|
||
mask='auto' # Handle transparency
|
||
)
|
||
|
||
logger.info(f"[圖片] ✓ 成功繪製 '{image_path_str}'")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to draw image region: {e}")
|
||
|
||
def generate_layout_pdf(
|
||
self,
|
||
json_path: Path,
|
||
output_path: Path,
|
||
source_file_path: Optional[Path] = None
|
||
) -> bool:
|
||
"""
|
||
Generate layout-preserving PDF from OCR JSON data
|
||
|
||
Args:
|
||
json_path: Path to OCR JSON file
|
||
output_path: Path to save generated PDF
|
||
source_file_path: Optional path to original source file for dimension extraction
|
||
|
||
Returns:
|
||
True if successful, False otherwise
|
||
"""
|
||
try:
|
||
# Load JSON data
|
||
ocr_data = self.load_ocr_json(json_path)
|
||
if not ocr_data:
|
||
return False
|
||
|
||
# Use internal generation with pre-loaded data
|
||
return self._generate_pdf_from_data(
|
||
ocr_data=ocr_data,
|
||
output_path=output_path,
|
||
source_file_path=source_file_path,
|
||
json_parent_dir=json_path.parent
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate PDF: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
# Singleton instance
|
||
pdf_generator_service = PDFGeneratorService()
|