700 lines
28 KiB
Python
700 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
增强的PDF解析器 - 支持扫描PDF的OCR处理
|
||
|
||
Author: PANJIT IT Team
|
||
Created: 2024-09-23
|
||
Modified: 2024-09-23
|
||
"""
|
||
|
||
import io
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
from PyPDF2 import PdfReader
|
||
from app.utils.logger import get_logger
|
||
from app.utils.exceptions import FileProcessingError
|
||
from app.services.dify_client import DifyClient
|
||
from app.services.ocr_cache import OCRCache
|
||
from app.utils.image_preprocessor import ImagePreprocessor
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# 检查PyMuPDF依赖
|
||
try:
|
||
import fitz # PyMuPDF
|
||
_HAS_PYMUPDF = True
|
||
except ImportError:
|
||
_HAS_PYMUPDF = False
|
||
logger.warning("PyMuPDF not available. Scanned PDF processing will be disabled.")
|
||
|
||
|
||
class EnhancedPdfParser:
|
||
"""支持扫描PDF的增强解析器"""
|
||
|
||
def __init__(self, file_path: str):
|
||
self.file_path = Path(file_path)
|
||
self.dify_client = DifyClient()
|
||
self.ocr_cache = OCRCache()
|
||
self.image_preprocessor = ImagePreprocessor(use_opencv=True)
|
||
|
||
if not self.file_path.exists():
|
||
raise FileProcessingError(f"PDF文件不存在: {file_path}")
|
||
|
||
def is_scanned_pdf(self) -> bool:
|
||
"""检测PDF是否为扫描件"""
|
||
try:
|
||
reader = PdfReader(str(self.file_path))
|
||
text_content = ""
|
||
|
||
# 检查前3页的文字内容
|
||
pages_to_check = min(3, len(reader.pages))
|
||
for i in range(pages_to_check):
|
||
page_text = reader.pages[i].extract_text()
|
||
text_content += page_text
|
||
|
||
# 如果文字内容很少,很可能是扫描件
|
||
text_length = len(text_content.strip())
|
||
logger.info(f"PDF text extraction found {text_length} characters in first {pages_to_check} pages")
|
||
|
||
# 阈值:少于100个字符认为是扫描件
|
||
is_scanned = text_length < 100
|
||
|
||
if is_scanned:
|
||
logger.info("PDF detected as scanned document, will use OCR processing")
|
||
else:
|
||
logger.info("PDF detected as text-based document, will use direct text extraction")
|
||
|
||
return is_scanned
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to analyze PDF type: {e}, treating as scanned document")
|
||
return True # 默认当作扫描件处理
|
||
|
||
def extract_text_segments(self, user_id: int = None, job_id: int = None) -> List[str]:
|
||
"""智能提取PDF文字片段"""
|
||
try:
|
||
# 首先尝试直接文字提取
|
||
if not self.is_scanned_pdf():
|
||
return self._extract_from_text_pdf()
|
||
|
||
# 扫描PDF则转换为图片后使用Dify OCR
|
||
if not _HAS_PYMUPDF:
|
||
raise FileProcessingError("处理扫描PDF需要PyMuPDF库,请安装: pip install PyMuPDF")
|
||
|
||
return self._extract_from_scanned_pdf(user_id, job_id)
|
||
|
||
except Exception as e:
|
||
logger.error(f"PDF文字提取失败: {str(e)}")
|
||
raise FileProcessingError(f"PDF文件解析失败: {str(e)}")
|
||
|
||
def _extract_from_text_pdf(self) -> List[str]:
|
||
"""从文字型PDF提取文字片段"""
|
||
try:
|
||
reader = PdfReader(str(self.file_path))
|
||
text_segments = []
|
||
|
||
for page_num, page in enumerate(reader.pages, 1):
|
||
page_text = page.extract_text()
|
||
|
||
if page_text.strip():
|
||
# 简单的句子分割
|
||
sentences = self._split_text_into_sentences(page_text)
|
||
|
||
# 过滤掉太短的片段
|
||
valid_sentences = [s for s in sentences if len(s.strip()) > 10]
|
||
text_segments.extend(valid_sentences)
|
||
|
||
logger.debug(f"Page {page_num}: extracted {len(valid_sentences)} sentences")
|
||
|
||
logger.info(f"Text PDF extraction completed: {len(text_segments)} segments")
|
||
|
||
# 合併短段落以減少不必要的翻譯調用
|
||
merged_segments = self._merge_short_segments(text_segments)
|
||
return merged_segments
|
||
|
||
except Exception as e:
|
||
logger.error(f"Text PDF extraction failed: {str(e)}")
|
||
raise FileProcessingError(f"文字PDF提取失败: {str(e)}")
|
||
|
||
def _extract_from_scanned_pdf(self, user_id: int = None, job_id: int = None) -> List[str]:
|
||
"""从扫描PDF提取文字片段(使用Dify OCR)"""
|
||
try:
|
||
doc = fitz.open(str(self.file_path))
|
||
text_segments = []
|
||
total_pages = doc.page_count
|
||
|
||
logger.info(f"Processing scanned PDF with {total_pages} pages using Dify OCR")
|
||
|
||
for page_num in range(total_pages):
|
||
try:
|
||
logger.info(f"[PDF-OCR] Processing page {page_num + 1}/{total_pages}")
|
||
page = doc[page_num]
|
||
|
||
# 转换页面为高分辨率图片
|
||
# 使用2倍缩放提高OCR准确度
|
||
zoom = 2.0
|
||
mat = fitz.Matrix(zoom, zoom)
|
||
pix = page.get_pixmap(matrix=mat, alpha=False)
|
||
|
||
# 转换为PNG字节数据
|
||
# 轉換為 PNG 並進行圖像預處理以提升 OCR 準確度
|
||
img_data_raw = pix.tobytes("png")
|
||
img_data = self.image_preprocessor.preprocess_smart(img_data_raw)
|
||
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Image preprocessed ({len(img_data_raw)} -> {len(img_data)} bytes)")
|
||
filename = f"page_{page_num + 1}.png"
|
||
|
||
logger.info(f"[PDF-OCR] Page {page_num + 1}: Converted to image ({len(img_data)} bytes)")
|
||
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Image zoom={zoom}, format=PNG")
|
||
|
||
# 检查OCR快取
|
||
cache_key_info = f"{self.file_path.name}_page_{page_num + 1}_zoom_{zoom}"
|
||
cached_text = self.ocr_cache.get_cached_text(
|
||
file_data=img_data,
|
||
filename=filename,
|
||
additional_info=cache_key_info
|
||
)
|
||
|
||
if cached_text:
|
||
logger.info(f"[PDF-OCR] Page {page_num + 1}: ✓ 使用快取的OCR結果 (節省AI流量)")
|
||
ocr_text = cached_text
|
||
else:
|
||
# 使用Dify OCR识别文字
|
||
logger.info(f"[PDF-OCR] Page {page_num + 1}: Starting OCR recognition...")
|
||
ocr_text = self.dify_client.ocr_image_with_dify(
|
||
image_data=img_data,
|
||
filename=filename,
|
||
user_id=user_id,
|
||
job_id=job_id
|
||
)
|
||
|
||
# 保存OCR结果到快取
|
||
if ocr_text.strip():
|
||
self.ocr_cache.save_cached_text(
|
||
file_data=img_data,
|
||
extracted_text=ocr_text,
|
||
filename=filename,
|
||
additional_info=cache_key_info,
|
||
metadata={
|
||
'source_file': str(self.file_path),
|
||
'page_number': page_num + 1,
|
||
'total_pages': total_pages,
|
||
'zoom_level': zoom,
|
||
'image_size_bytes': len(img_data),
|
||
'user_id': user_id,
|
||
'job_id': job_id
|
||
}
|
||
)
|
||
logger.info(f"[PDF-OCR] Page {page_num + 1}: ✓ OCR結果已保存到快取")
|
||
|
||
logger.info(f"[PDF-OCR] Page {page_num + 1}: OCR completed")
|
||
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Raw OCR result length: {len(ocr_text)}")
|
||
|
||
if ocr_text.strip():
|
||
# 分割OCR结果为句子
|
||
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Splitting OCR text into sentences...")
|
||
sentences = self._split_ocr_text(ocr_text)
|
||
|
||
# 过滤有效句子
|
||
valid_sentences = [s for s in sentences if len(s.strip()) > 5]
|
||
text_segments.extend(valid_sentences)
|
||
|
||
logger.info(f"[PDF-OCR] Page {page_num + 1}: ✓ Extracted {len(valid_sentences)} valid sentences")
|
||
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Total sentences before filter: {len(sentences)}")
|
||
|
||
# 记录前50个字符用于调试
|
||
if valid_sentences:
|
||
preview = valid_sentences[0][:50] + "..." if len(valid_sentences[0]) > 50 else valid_sentences[0]
|
||
logger.debug(f"[PDF-OCR] Page {page_num + 1}: First sentence preview: {preview}")
|
||
else:
|
||
logger.warning(f"[PDF-OCR] Page {page_num + 1}: ⚠ OCR returned empty result")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[PDF-OCR] Page {page_num + 1}: ✗ Processing failed: {str(e)}")
|
||
logger.error(f"[PDF-OCR] Page {page_num + 1}: Exception type: {type(e).__name__}")
|
||
# 继续处理下一页,不中断整个流程
|
||
continue
|
||
|
||
doc.close()
|
||
|
||
logger.info(f"[PDF-OCR] OCR processing completed for all {total_pages} pages")
|
||
logger.info(f"[PDF-OCR] Total text segments extracted: {len(text_segments)}")
|
||
|
||
if not text_segments:
|
||
logger.error(f"[PDF-OCR] ✗ No text content extracted from any page")
|
||
raise FileProcessingError("OCR处理完成,但未提取到任何文字内容")
|
||
|
||
logger.info(f"[PDF-OCR] ✓ Scanned PDF processing completed successfully")
|
||
logger.info(f"[PDF-OCR] Final result: {len(text_segments)} text segments extracted")
|
||
|
||
# 合併短段落以減少不必要的翻譯調用
|
||
merged_segments = self._merge_short_segments(text_segments)
|
||
logger.info(f"[PDF-OCR] After merging: {len(merged_segments)} segments ready for translation")
|
||
return merged_segments
|
||
|
||
except Exception as e:
|
||
logger.error(f"Scanned PDF processing failed: {str(e)}")
|
||
raise FileProcessingError(f"扫描PDF处理失败: {str(e)}")
|
||
|
||
def _split_text_into_sentences(self, text: str) -> List[str]:
|
||
"""将文字分割成句子"""
|
||
if not text.strip():
|
||
return []
|
||
|
||
# 简单的分句逻辑
|
||
sentences = []
|
||
separators = ['. ', '。', '!', '?', '!', '?', '\n\n']
|
||
|
||
current_sentences = [text]
|
||
|
||
for sep in separators:
|
||
new_sentences = []
|
||
for sentence in current_sentences:
|
||
parts = sentence.split(sep)
|
||
if len(parts) > 1:
|
||
# 保留分隔符
|
||
for i, part in enumerate(parts[:-1]):
|
||
if part.strip():
|
||
new_sentences.append(part.strip() + sep.rstrip())
|
||
# 最后一部分
|
||
if parts[-1].strip():
|
||
new_sentences.append(parts[-1].strip())
|
||
else:
|
||
new_sentences.append(sentence)
|
||
current_sentences = new_sentences
|
||
|
||
# 过滤掉太短的句子
|
||
valid_sentences = [s for s in current_sentences if len(s.strip()) > 3]
|
||
return valid_sentences
|
||
|
||
def _split_ocr_text(self, ocr_text: str) -> List[str]:
|
||
"""分割OCR识别的文字"""
|
||
if not ocr_text.strip():
|
||
return []
|
||
|
||
# OCR结果可能包含表格或特殊格式,需要特殊处理
|
||
lines = ocr_text.split('\n')
|
||
sentences = []
|
||
|
||
current_paragraph = []
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
# 空行表示段落结束
|
||
if current_paragraph:
|
||
paragraph_text = ' '.join(current_paragraph)
|
||
if len(paragraph_text) > 10:
|
||
sentences.append(paragraph_text)
|
||
current_paragraph = []
|
||
continue
|
||
|
||
# 检查是否是表格行(包含|或多个制表符)
|
||
if '|' in line or '\t' in line:
|
||
# 表格行单独处理
|
||
if current_paragraph:
|
||
paragraph_text = ' '.join(current_paragraph)
|
||
if len(paragraph_text) > 10:
|
||
sentences.append(paragraph_text)
|
||
current_paragraph = []
|
||
|
||
if len(line) > 10:
|
||
sentences.append(line)
|
||
else:
|
||
# 普通文字行
|
||
current_paragraph.append(line)
|
||
|
||
# 处理最后的段落
|
||
if current_paragraph:
|
||
paragraph_text = ' '.join(current_paragraph)
|
||
if len(paragraph_text) > 10:
|
||
sentences.append(paragraph_text)
|
||
|
||
return sentences
|
||
|
||
def generate_translated_document(self, translations: dict, target_language: str,
|
||
output_dir: Path) -> str:
|
||
"""生成翻译的Word文档(保持与DOCX相同的格式)"""
|
||
try:
|
||
from app.utils.helpers import generate_filename
|
||
|
||
translated_texts = translations.get(target_language, [])
|
||
|
||
# 生成Word文档而非文字文件
|
||
output_filename = f"{self.file_path.stem}_{target_language}_translated.docx"
|
||
output_path = output_dir / output_filename
|
||
|
||
# 创建Word文档
|
||
from docx import Document
|
||
from docx.shared import Pt
|
||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
||
|
||
doc = Document()
|
||
|
||
# 添加标题页
|
||
title = doc.add_heading(f"PDF翻译结果 - {target_language}", 0)
|
||
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
||
|
||
# 添加文档信息
|
||
info_para = doc.add_paragraph()
|
||
info_para.add_run("原始文件: ").bold = True
|
||
info_para.add_run(self.file_path.name)
|
||
info_para.add_run("\n处理方式: ").bold = True
|
||
info_para.add_run("OCR识别" if self.is_scanned_pdf() else "直接文字提取")
|
||
info_para.add_run(f"\n翻译语言: ").bold = True
|
||
info_para.add_run(target_language)
|
||
info_para.add_run(f"\n总段落数: ").bold = True
|
||
info_para.add_run(str(len(translated_texts)))
|
||
|
||
doc.add_paragraph() # 空行
|
||
|
||
# 添加翻译内容
|
||
for i, text in enumerate(translated_texts, 1):
|
||
content_type = self._detect_content_type(text)
|
||
|
||
if content_type == 'table':
|
||
# 尝试创建实际的表格
|
||
self._add_table_content(doc, text, i)
|
||
elif content_type == 'heading':
|
||
# 添加标题
|
||
self._add_heading_content(doc, text, i)
|
||
elif content_type == 'list':
|
||
# 添加列表
|
||
self._add_list_content(doc, text, i)
|
||
else:
|
||
# 普通段落
|
||
self._add_paragraph_content(doc, text, i)
|
||
|
||
# 保存Word文档
|
||
doc.save(output_path)
|
||
logger.info(f"Generated translated PDF Word document: {output_path}")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated Word document: {str(e)}")
|
||
raise FileProcessingError(f"生成翻译Word文档失败: {str(e)}")
|
||
|
||
def generate_combined_translated_document(self, all_translations: dict, target_languages: list,
|
||
output_dir: Path) -> str:
|
||
"""生成包含所有翻譯語言的組合Word文檔(譯文1/譯文2格式)"""
|
||
try:
|
||
from app.utils.helpers import generate_filename
|
||
|
||
# 生成組合文檔檔名
|
||
languages_suffix = '_'.join(target_languages)
|
||
output_filename = f"{self.file_path.stem}_{languages_suffix}_combined.docx"
|
||
output_path = output_dir / output_filename
|
||
|
||
# 创建Word文档
|
||
from docx import Document
|
||
from docx.shared import Pt
|
||
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
|
||
|
||
doc = Document()
|
||
|
||
# 添加标题页
|
||
title = doc.add_heading(f"PDF翻译結果 - 多語言組合文檔", 0)
|
||
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
|
||
|
||
# 添加文档信息
|
||
info_para = doc.add_paragraph()
|
||
info_para.add_run("原始文件: ").bold = True
|
||
info_para.add_run(self.file_path.name)
|
||
info_para.add_run("\n处理方式: ").bold = True
|
||
info_para.add_run("OCR识别" if self.is_scanned_pdf() else "直接文字提取")
|
||
info_para.add_run(f"\n翻译语言: ").bold = True
|
||
info_para.add_run(' / '.join(target_languages))
|
||
|
||
# 获取第一个語言的翻譯作為基準長度
|
||
first_language = target_languages[0]
|
||
segment_count = len(all_translations.get(first_language, []))
|
||
info_para.add_run(f"\n总段落数: ").bold = True
|
||
info_para.add_run(str(segment_count))
|
||
|
||
doc.add_paragraph() # 空行
|
||
|
||
# 添加翻译内容 - 譯文1/譯文2格式
|
||
for i in range(segment_count):
|
||
content_para = doc.add_paragraph()
|
||
|
||
# 添加段落编号
|
||
num_run = content_para.add_run(f"{i+1:03d}. ")
|
||
num_run.bold = True
|
||
num_run.font.size = Pt(12)
|
||
|
||
# 为每种语言添加翻譯
|
||
for j, target_language in enumerate(target_languages):
|
||
if i < len(all_translations.get(target_language, [])):
|
||
translation_text = all_translations[target_language][i]
|
||
|
||
# 添加語言標識
|
||
if j > 0:
|
||
content_para.add_run("\n\n") # 翻譯之間的間距
|
||
|
||
lang_run = content_para.add_run(f"[{target_language}] ")
|
||
lang_run.bold = True
|
||
lang_run.font.size = Pt(11)
|
||
|
||
# 添加翻譯内容
|
||
trans_run = content_para.add_run(translation_text)
|
||
trans_run.font.size = Pt(11)
|
||
|
||
# 段落間距
|
||
content_para.paragraph_format.space_after = Pt(12)
|
||
|
||
# 保存Word文档
|
||
doc.save(output_path)
|
||
logger.info(f"Generated combined translated PDF Word document: {output_path}")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined translated Word document: {str(e)}")
|
||
raise FileProcessingError(f"生成組合翻译Word文档失败: {str(e)}")
|
||
|
||
def _is_table_component(self, segment: str) -> bool:
|
||
"""檢查段落是否為表格組件(表格邊界、分隔線等)"""
|
||
segment = segment.strip()
|
||
|
||
# Markdown表格分隔線:如 |---|---|---| 或 |===|===|===|
|
||
if '|' in segment and ('-' in segment or '=' in segment):
|
||
# 移除 | 和 - = 後,如果剩餘內容很少,則判斷為表格分隔線
|
||
clean_segment = segment.replace('|', '').replace('-', '').replace('=', '').replace(' ', '').replace(':', '')
|
||
if len(clean_segment) <= 2: # 允許少量其他字符
|
||
return True
|
||
|
||
# 純分隔線
|
||
if segment.replace('=', '').replace('-', '').replace(' ', '') == '':
|
||
return True
|
||
|
||
return False
|
||
|
||
def _is_table_row(self, segment: str) -> bool:
|
||
"""檢查段落是否為表格行(包含實際數據的表格行)"""
|
||
segment = segment.strip()
|
||
|
||
# Markdown表格行:至少包含兩個 | 符號,且有實際內容
|
||
if segment.count('|') >= 2:
|
||
# 移除首尾的 | 並分割為單元格
|
||
cells = segment.strip('|').split('|')
|
||
# 檢查是否有實際的文字內容(不只是分隔符號)
|
||
has_content = any(
|
||
cell.strip() and
|
||
not cell.replace('-', '').replace('=', '').replace(' ', '').replace(':', '') == ''
|
||
for cell in cells
|
||
)
|
||
if has_content:
|
||
return True
|
||
|
||
return False
|
||
|
||
def _merge_table_segments(self, segments: List[str], start_idx: int) -> tuple[str, int]:
|
||
"""
|
||
合併表格相關的段落
|
||
|
||
Returns:
|
||
(merged_table_content, next_index)
|
||
"""
|
||
table_parts = []
|
||
current_idx = start_idx
|
||
|
||
# 收集連續的表格相關段落
|
||
while current_idx < len(segments):
|
||
segment = segments[current_idx].strip()
|
||
|
||
if self._is_table_component(segment) or self._is_table_row(segment):
|
||
table_parts.append(segment)
|
||
current_idx += 1
|
||
else:
|
||
break
|
||
|
||
# 將表格部分合併為一個段落
|
||
merged_table = '\n'.join(table_parts)
|
||
return merged_table, current_idx
|
||
|
||
def _merge_short_segments(self, text_segments: List[str], min_length: int = 10) -> List[str]:
|
||
"""
|
||
合併短段落以減少不必要的翻譯調用,特別處理表格結構
|
||
|
||
Args:
|
||
text_segments: 原始文字段落列表
|
||
min_length: 最小段落長度閾值,短於此長度的段落將被合併
|
||
|
||
Returns:
|
||
合併後的段落列表
|
||
"""
|
||
if not text_segments:
|
||
return text_segments
|
||
|
||
merged_segments = []
|
||
current_merge = ""
|
||
i = 0
|
||
|
||
while i < len(text_segments):
|
||
segment = text_segments[i].strip()
|
||
if not segment: # 跳過空段落
|
||
i += 1
|
||
continue
|
||
|
||
# 檢查是否為表格組件
|
||
if self._is_table_component(segment) or self._is_table_row(segment):
|
||
# 先處理之前積累的短段落
|
||
if current_merge:
|
||
merged_segments.append(current_merge.strip())
|
||
logger.debug(f"Merged short segments before table: '{current_merge[:50]}...'")
|
||
current_merge = ""
|
||
|
||
# 合併表格相關段落
|
||
table_content, next_i = self._merge_table_segments(text_segments, i)
|
||
merged_segments.append(table_content)
|
||
logger.debug(f"Merged table content: {next_i - i} segments -> 1 table block")
|
||
i = next_i
|
||
continue
|
||
|
||
# 檢查是否為短段落
|
||
if len(segment) < min_length:
|
||
# 檢查是否為純標點符號或數字(排除表格符號)
|
||
if segment.replace('*', '').replace('-', '').replace('_', '').replace('#', '').strip() == '':
|
||
logger.debug(f"Skipping pure symbol segment: '{segment}'")
|
||
i += 1
|
||
continue
|
||
|
||
# 短段落需要合併
|
||
if current_merge:
|
||
current_merge += " " + segment
|
||
else:
|
||
current_merge = segment
|
||
|
||
logger.debug(f"Adding short segment to merge: '{segment}' (length: {len(segment)})")
|
||
|
||
else:
|
||
# 長段落,先處理之前積累的短段落
|
||
if current_merge:
|
||
merged_segments.append(current_merge.strip())
|
||
logger.debug(f"Merged short segments: '{current_merge[:50]}...' (total length: {len(current_merge)})")
|
||
current_merge = ""
|
||
|
||
# 添加當前長段落
|
||
merged_segments.append(segment)
|
||
logger.debug(f"Added long segment: '{segment[:50]}...' (length: {len(segment)})")
|
||
|
||
i += 1
|
||
|
||
# 處理最後剩餘的短段落
|
||
if current_merge:
|
||
merged_segments.append(current_merge.strip())
|
||
logger.debug(f"Final merged short segments: '{current_merge[:50]}...' (total length: {len(current_merge)})")
|
||
|
||
logger.info(f"Segment merging: {len(text_segments)} -> {len(merged_segments)} segments")
|
||
return merged_segments
|
||
|
||
def _detect_content_type(self, text: str) -> str:
|
||
"""检测内容类型"""
|
||
text_lower = text.lower().strip()
|
||
|
||
# 检测表格(包含多个|或制表符)
|
||
if ('|' in text and text.count('|') >= 2) or '\t' in text:
|
||
return 'table'
|
||
|
||
# 检测标题
|
||
if (text_lower.startswith(('第', '章', 'chapter', 'section', '#')) or
|
||
any(keyword in text_lower for keyword in ['章', '节', '第']) and len(text) < 100):
|
||
return 'heading'
|
||
|
||
# 检测列表
|
||
if (text_lower.startswith(('•', '-', '*', '1.', '2.', '3.', '4.', '5.')) or
|
||
any(text_lower.startswith(f"{i}.") for i in range(1, 20))):
|
||
return 'list'
|
||
|
||
return 'paragraph'
|
||
|
||
def _add_table_content(self, doc, text: str, index: int):
|
||
"""添加表格内容"""
|
||
from docx.shared import Pt
|
||
|
||
# 添加表格标题
|
||
title_para = doc.add_paragraph()
|
||
title_run = title_para.add_run(f"表格 {index}: ")
|
||
title_run.bold = True
|
||
title_run.font.size = Pt(12)
|
||
|
||
# 解析表格
|
||
if '|' in text:
|
||
# Markdown风格表格
|
||
lines = [line.strip() for line in text.split('\n') if line.strip()]
|
||
rows = []
|
||
for line in lines:
|
||
if line.startswith('|') and line.endswith('|'):
|
||
cells = [cell.strip() for cell in line.split('|')[1:-1]]
|
||
if cells: # 过滤掉分隔行(如|---|---|)
|
||
if not all(cell.replace('-', '').replace(' ', '') == '' for cell in cells):
|
||
rows.append(cells)
|
||
|
||
if rows:
|
||
# 创建表格
|
||
table = doc.add_table(rows=len(rows), cols=len(rows[0]))
|
||
table.style = 'Table Grid'
|
||
|
||
for i, row_data in enumerate(rows):
|
||
for j, cell_data in enumerate(row_data):
|
||
if j < len(table.rows[i].cells):
|
||
cell = table.rows[i].cells[j]
|
||
cell.text = cell_data
|
||
# 设置字体
|
||
for paragraph in cell.paragraphs:
|
||
for run in paragraph.runs:
|
||
run.font.size = Pt(10)
|
||
else:
|
||
# 制表符分隔的表格
|
||
para = doc.add_paragraph()
|
||
content_run = para.add_run(text)
|
||
content_run.font.name = 'Courier New'
|
||
content_run.font.size = Pt(10)
|
||
|
||
def _add_heading_content(self, doc, text: str, index: int):
|
||
"""添加标题内容"""
|
||
from docx.shared import Pt
|
||
|
||
# 移除段落编号,直接作为标题
|
||
clean_text = text.strip()
|
||
if len(clean_text) < 100:
|
||
heading = doc.add_heading(clean_text, level=2)
|
||
else:
|
||
# 长文本作为普通段落但使用标题样式
|
||
para = doc.add_paragraph()
|
||
run = para.add_run(clean_text)
|
||
run.bold = True
|
||
run.font.size = Pt(14)
|
||
|
||
def _add_list_content(self, doc, text: str, index: int):
|
||
"""添加列表内容"""
|
||
from docx.shared import Pt
|
||
|
||
# 检查是否已经有编号
|
||
if any(text.strip().startswith(f"{i}.") for i in range(1, 20)):
|
||
# 已编号列表
|
||
para = doc.add_paragraph(text.strip(), style='List Number')
|
||
else:
|
||
# 项目符号列表
|
||
para = doc.add_paragraph(text.strip(), style='List Bullet')
|
||
|
||
# 设置字体大小
|
||
for run in para.runs:
|
||
run.font.size = Pt(11)
|
||
|
||
def _add_paragraph_content(self, doc, text: str, index: int):
|
||
"""添加普通段落内容"""
|
||
from docx.shared import Pt
|
||
|
||
para = doc.add_paragraph()
|
||
|
||
# 添加段落编号(可选)
|
||
num_run = para.add_run(f"{index:03d}. ")
|
||
num_run.bold = True
|
||
num_run.font.size = Pt(12)
|
||
|
||
# 添加内容
|
||
content_run = para.add_run(text)
|
||
content_run.font.size = Pt(11)
|
||
|
||
# 设置段落间距
|
||
para.paragraph_format.space_after = Pt(6) |