Files
Document_translator_1panel/app/services/enhanced_pdf_parser.py
beabigegg 6599716481 1panel
2025-10-03 08:19:40 +08:00

700 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
增强的PDF解析器 - 支持扫描PDF的OCR处理
Author: PANJIT IT Team
Created: 2024-09-23
Modified: 2024-09-23
"""
import io
from pathlib import Path
from typing import List, Optional
from PyPDF2 import PdfReader
from app.utils.logger import get_logger
from app.utils.exceptions import FileProcessingError
from app.services.dify_client import DifyClient
from app.services.ocr_cache import OCRCache
from app.utils.image_preprocessor import ImagePreprocessor
logger = get_logger(__name__)
# 检查PyMuPDF依赖
try:
import fitz # PyMuPDF
_HAS_PYMUPDF = True
except ImportError:
_HAS_PYMUPDF = False
logger.warning("PyMuPDF not available. Scanned PDF processing will be disabled.")
class EnhancedPdfParser:
"""支持扫描PDF的增强解析器"""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.dify_client = DifyClient()
self.ocr_cache = OCRCache()
self.image_preprocessor = ImagePreprocessor(use_opencv=True)
if not self.file_path.exists():
raise FileProcessingError(f"PDF文件不存在: {file_path}")
def is_scanned_pdf(self) -> bool:
"""检测PDF是否为扫描件"""
try:
reader = PdfReader(str(self.file_path))
text_content = ""
# 检查前3页的文字内容
pages_to_check = min(3, len(reader.pages))
for i in range(pages_to_check):
page_text = reader.pages[i].extract_text()
text_content += page_text
# 如果文字内容很少,很可能是扫描件
text_length = len(text_content.strip())
logger.info(f"PDF text extraction found {text_length} characters in first {pages_to_check} pages")
# 阈值少于100个字符认为是扫描件
is_scanned = text_length < 100
if is_scanned:
logger.info("PDF detected as scanned document, will use OCR processing")
else:
logger.info("PDF detected as text-based document, will use direct text extraction")
return is_scanned
except Exception as e:
logger.warning(f"Failed to analyze PDF type: {e}, treating as scanned document")
return True # 默认当作扫描件处理
def extract_text_segments(self, user_id: int = None, job_id: int = None) -> List[str]:
"""智能提取PDF文字片段"""
try:
# 首先尝试直接文字提取
if not self.is_scanned_pdf():
return self._extract_from_text_pdf()
# 扫描PDF则转换为图片后使用Dify OCR
if not _HAS_PYMUPDF:
raise FileProcessingError("处理扫描PDF需要PyMuPDF库请安装: pip install PyMuPDF")
return self._extract_from_scanned_pdf(user_id, job_id)
except Exception as e:
logger.error(f"PDF文字提取失败: {str(e)}")
raise FileProcessingError(f"PDF文件解析失败: {str(e)}")
def _extract_from_text_pdf(self) -> List[str]:
"""从文字型PDF提取文字片段"""
try:
reader = PdfReader(str(self.file_path))
text_segments = []
for page_num, page in enumerate(reader.pages, 1):
page_text = page.extract_text()
if page_text.strip():
# 简单的句子分割
sentences = self._split_text_into_sentences(page_text)
# 过滤掉太短的片段
valid_sentences = [s for s in sentences if len(s.strip()) > 10]
text_segments.extend(valid_sentences)
logger.debug(f"Page {page_num}: extracted {len(valid_sentences)} sentences")
logger.info(f"Text PDF extraction completed: {len(text_segments)} segments")
# 合併短段落以減少不必要的翻譯調用
merged_segments = self._merge_short_segments(text_segments)
return merged_segments
except Exception as e:
logger.error(f"Text PDF extraction failed: {str(e)}")
raise FileProcessingError(f"文字PDF提取失败: {str(e)}")
def _extract_from_scanned_pdf(self, user_id: int = None, job_id: int = None) -> List[str]:
"""从扫描PDF提取文字片段使用Dify OCR"""
try:
doc = fitz.open(str(self.file_path))
text_segments = []
total_pages = doc.page_count
logger.info(f"Processing scanned PDF with {total_pages} pages using Dify OCR")
for page_num in range(total_pages):
try:
logger.info(f"[PDF-OCR] Processing page {page_num + 1}/{total_pages}")
page = doc[page_num]
# 转换页面为高分辨率图片
# 使用2倍缩放提高OCR准确度
zoom = 2.0
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
# 转换为PNG字节数据
# 轉換為 PNG 並進行圖像預處理以提升 OCR 準確度
img_data_raw = pix.tobytes("png")
img_data = self.image_preprocessor.preprocess_smart(img_data_raw)
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Image preprocessed ({len(img_data_raw)} -> {len(img_data)} bytes)")
filename = f"page_{page_num + 1}.png"
logger.info(f"[PDF-OCR] Page {page_num + 1}: Converted to image ({len(img_data)} bytes)")
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Image zoom={zoom}, format=PNG")
# 检查OCR快取
cache_key_info = f"{self.file_path.name}_page_{page_num + 1}_zoom_{zoom}"
cached_text = self.ocr_cache.get_cached_text(
file_data=img_data,
filename=filename,
additional_info=cache_key_info
)
if cached_text:
logger.info(f"[PDF-OCR] Page {page_num + 1}: ✓ 使用快取的OCR結果 (節省AI流量)")
ocr_text = cached_text
else:
# 使用Dify OCR识别文字
logger.info(f"[PDF-OCR] Page {page_num + 1}: Starting OCR recognition...")
ocr_text = self.dify_client.ocr_image_with_dify(
image_data=img_data,
filename=filename,
user_id=user_id,
job_id=job_id
)
# 保存OCR结果到快取
if ocr_text.strip():
self.ocr_cache.save_cached_text(
file_data=img_data,
extracted_text=ocr_text,
filename=filename,
additional_info=cache_key_info,
metadata={
'source_file': str(self.file_path),
'page_number': page_num + 1,
'total_pages': total_pages,
'zoom_level': zoom,
'image_size_bytes': len(img_data),
'user_id': user_id,
'job_id': job_id
}
)
logger.info(f"[PDF-OCR] Page {page_num + 1}: ✓ OCR結果已保存到快取")
logger.info(f"[PDF-OCR] Page {page_num + 1}: OCR completed")
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Raw OCR result length: {len(ocr_text)}")
if ocr_text.strip():
# 分割OCR结果为句子
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Splitting OCR text into sentences...")
sentences = self._split_ocr_text(ocr_text)
# 过滤有效句子
valid_sentences = [s for s in sentences if len(s.strip()) > 5]
text_segments.extend(valid_sentences)
logger.info(f"[PDF-OCR] Page {page_num + 1}: ✓ Extracted {len(valid_sentences)} valid sentences")
logger.debug(f"[PDF-OCR] Page {page_num + 1}: Total sentences before filter: {len(sentences)}")
# 记录前50个字符用于调试
if valid_sentences:
preview = valid_sentences[0][:50] + "..." if len(valid_sentences[0]) > 50 else valid_sentences[0]
logger.debug(f"[PDF-OCR] Page {page_num + 1}: First sentence preview: {preview}")
else:
logger.warning(f"[PDF-OCR] Page {page_num + 1}: ⚠ OCR returned empty result")
except Exception as e:
logger.error(f"[PDF-OCR] Page {page_num + 1}: ✗ Processing failed: {str(e)}")
logger.error(f"[PDF-OCR] Page {page_num + 1}: Exception type: {type(e).__name__}")
# 继续处理下一页,不中断整个流程
continue
doc.close()
logger.info(f"[PDF-OCR] OCR processing completed for all {total_pages} pages")
logger.info(f"[PDF-OCR] Total text segments extracted: {len(text_segments)}")
if not text_segments:
logger.error(f"[PDF-OCR] ✗ No text content extracted from any page")
raise FileProcessingError("OCR处理完成但未提取到任何文字内容")
logger.info(f"[PDF-OCR] ✓ Scanned PDF processing completed successfully")
logger.info(f"[PDF-OCR] Final result: {len(text_segments)} text segments extracted")
# 合併短段落以減少不必要的翻譯調用
merged_segments = self._merge_short_segments(text_segments)
logger.info(f"[PDF-OCR] After merging: {len(merged_segments)} segments ready for translation")
return merged_segments
except Exception as e:
logger.error(f"Scanned PDF processing failed: {str(e)}")
raise FileProcessingError(f"扫描PDF处理失败: {str(e)}")
def _split_text_into_sentences(self, text: str) -> List[str]:
"""将文字分割成句子"""
if not text.strip():
return []
# 简单的分句逻辑
sentences = []
separators = ['. ', '', '', '', '!', '?', '\n\n']
current_sentences = [text]
for sep in separators:
new_sentences = []
for sentence in current_sentences:
parts = sentence.split(sep)
if len(parts) > 1:
# 保留分隔符
for i, part in enumerate(parts[:-1]):
if part.strip():
new_sentences.append(part.strip() + sep.rstrip())
# 最后一部分
if parts[-1].strip():
new_sentences.append(parts[-1].strip())
else:
new_sentences.append(sentence)
current_sentences = new_sentences
# 过滤掉太短的句子
valid_sentences = [s for s in current_sentences if len(s.strip()) > 3]
return valid_sentences
def _split_ocr_text(self, ocr_text: str) -> List[str]:
"""分割OCR识别的文字"""
if not ocr_text.strip():
return []
# OCR结果可能包含表格或特殊格式需要特殊处理
lines = ocr_text.split('\n')
sentences = []
current_paragraph = []
for line in lines:
line = line.strip()
if not line:
# 空行表示段落结束
if current_paragraph:
paragraph_text = ' '.join(current_paragraph)
if len(paragraph_text) > 10:
sentences.append(paragraph_text)
current_paragraph = []
continue
# 检查是否是表格行(包含|或多个制表符)
if '|' in line or '\t' in line:
# 表格行单独处理
if current_paragraph:
paragraph_text = ' '.join(current_paragraph)
if len(paragraph_text) > 10:
sentences.append(paragraph_text)
current_paragraph = []
if len(line) > 10:
sentences.append(line)
else:
# 普通文字行
current_paragraph.append(line)
# 处理最后的段落
if current_paragraph:
paragraph_text = ' '.join(current_paragraph)
if len(paragraph_text) > 10:
sentences.append(paragraph_text)
return sentences
def generate_translated_document(self, translations: dict, target_language: str,
output_dir: Path) -> str:
"""生成翻译的Word文档保持与DOCX相同的格式"""
try:
from app.utils.helpers import generate_filename
translated_texts = translations.get(target_language, [])
# 生成Word文档而非文字文件
output_filename = f"{self.file_path.stem}_{target_language}_translated.docx"
output_path = output_dir / output_filename
# 创建Word文档
from docx import Document
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
doc = Document()
# 添加标题页
title = doc.add_heading(f"PDF翻译结果 - {target_language}", 0)
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加文档信息
info_para = doc.add_paragraph()
info_para.add_run("原始文件: ").bold = True
info_para.add_run(self.file_path.name)
info_para.add_run("\n处理方式: ").bold = True
info_para.add_run("OCR识别" if self.is_scanned_pdf() else "直接文字提取")
info_para.add_run(f"\n翻译语言: ").bold = True
info_para.add_run(target_language)
info_para.add_run(f"\n总段落数: ").bold = True
info_para.add_run(str(len(translated_texts)))
doc.add_paragraph() # 空行
# 添加翻译内容
for i, text in enumerate(translated_texts, 1):
content_type = self._detect_content_type(text)
if content_type == 'table':
# 尝试创建实际的表格
self._add_table_content(doc, text, i)
elif content_type == 'heading':
# 添加标题
self._add_heading_content(doc, text, i)
elif content_type == 'list':
# 添加列表
self._add_list_content(doc, text, i)
else:
# 普通段落
self._add_paragraph_content(doc, text, i)
# 保存Word文档
doc.save(output_path)
logger.info(f"Generated translated PDF Word document: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Failed to generate translated Word document: {str(e)}")
raise FileProcessingError(f"生成翻译Word文档失败: {str(e)}")
def generate_combined_translated_document(self, all_translations: dict, target_languages: list,
output_dir: Path) -> str:
"""生成包含所有翻譯語言的組合Word文檔譯文1/譯文2格式"""
try:
from app.utils.helpers import generate_filename
# 生成組合文檔檔名
languages_suffix = '_'.join(target_languages)
output_filename = f"{self.file_path.stem}_{languages_suffix}_combined.docx"
output_path = output_dir / output_filename
# 创建Word文档
from docx import Document
from docx.shared import Pt
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
doc = Document()
# 添加标题页
title = doc.add_heading(f"PDF翻译結果 - 多語言組合文檔", 0)
title.alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
# 添加文档信息
info_para = doc.add_paragraph()
info_para.add_run("原始文件: ").bold = True
info_para.add_run(self.file_path.name)
info_para.add_run("\n处理方式: ").bold = True
info_para.add_run("OCR识别" if self.is_scanned_pdf() else "直接文字提取")
info_para.add_run(f"\n翻译语言: ").bold = True
info_para.add_run(' / '.join(target_languages))
# 获取第一个語言的翻譯作為基準長度
first_language = target_languages[0]
segment_count = len(all_translations.get(first_language, []))
info_para.add_run(f"\n总段落数: ").bold = True
info_para.add_run(str(segment_count))
doc.add_paragraph() # 空行
# 添加翻译内容 - 譯文1/譯文2格式
for i in range(segment_count):
content_para = doc.add_paragraph()
# 添加段落编号
num_run = content_para.add_run(f"{i+1:03d}. ")
num_run.bold = True
num_run.font.size = Pt(12)
# 为每种语言添加翻譯
for j, target_language in enumerate(target_languages):
if i < len(all_translations.get(target_language, [])):
translation_text = all_translations[target_language][i]
# 添加語言標識
if j > 0:
content_para.add_run("\n\n") # 翻譯之間的間距
lang_run = content_para.add_run(f"[{target_language}] ")
lang_run.bold = True
lang_run.font.size = Pt(11)
# 添加翻譯内容
trans_run = content_para.add_run(translation_text)
trans_run.font.size = Pt(11)
# 段落間距
content_para.paragraph_format.space_after = Pt(12)
# 保存Word文档
doc.save(output_path)
logger.info(f"Generated combined translated PDF Word document: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Failed to generate combined translated Word document: {str(e)}")
raise FileProcessingError(f"生成組合翻译Word文档失败: {str(e)}")
def _is_table_component(self, segment: str) -> bool:
"""檢查段落是否為表格組件(表格邊界、分隔線等)"""
segment = segment.strip()
# Markdown表格分隔線如 |---|---|---| 或 |===|===|===|
if '|' in segment and ('-' in segment or '=' in segment):
# 移除 | 和 - = 後,如果剩餘內容很少,則判斷為表格分隔線
clean_segment = segment.replace('|', '').replace('-', '').replace('=', '').replace(' ', '').replace(':', '')
if len(clean_segment) <= 2: # 允許少量其他字符
return True
# 純分隔線
if segment.replace('=', '').replace('-', '').replace(' ', '') == '':
return True
return False
def _is_table_row(self, segment: str) -> bool:
"""檢查段落是否為表格行(包含實際數據的表格行)"""
segment = segment.strip()
# Markdown表格行至少包含兩個 | 符號,且有實際內容
if segment.count('|') >= 2:
# 移除首尾的 | 並分割為單元格
cells = segment.strip('|').split('|')
# 檢查是否有實際的文字內容(不只是分隔符號)
has_content = any(
cell.strip() and
not cell.replace('-', '').replace('=', '').replace(' ', '').replace(':', '') == ''
for cell in cells
)
if has_content:
return True
return False
def _merge_table_segments(self, segments: List[str], start_idx: int) -> tuple[str, int]:
"""
合併表格相關的段落
Returns:
(merged_table_content, next_index)
"""
table_parts = []
current_idx = start_idx
# 收集連續的表格相關段落
while current_idx < len(segments):
segment = segments[current_idx].strip()
if self._is_table_component(segment) or self._is_table_row(segment):
table_parts.append(segment)
current_idx += 1
else:
break
# 將表格部分合併為一個段落
merged_table = '\n'.join(table_parts)
return merged_table, current_idx
def _merge_short_segments(self, text_segments: List[str], min_length: int = 10) -> List[str]:
"""
合併短段落以減少不必要的翻譯調用,特別處理表格結構
Args:
text_segments: 原始文字段落列表
min_length: 最小段落長度閾值,短於此長度的段落將被合併
Returns:
合併後的段落列表
"""
if not text_segments:
return text_segments
merged_segments = []
current_merge = ""
i = 0
while i < len(text_segments):
segment = text_segments[i].strip()
if not segment: # 跳過空段落
i += 1
continue
# 檢查是否為表格組件
if self._is_table_component(segment) or self._is_table_row(segment):
# 先處理之前積累的短段落
if current_merge:
merged_segments.append(current_merge.strip())
logger.debug(f"Merged short segments before table: '{current_merge[:50]}...'")
current_merge = ""
# 合併表格相關段落
table_content, next_i = self._merge_table_segments(text_segments, i)
merged_segments.append(table_content)
logger.debug(f"Merged table content: {next_i - i} segments -> 1 table block")
i = next_i
continue
# 檢查是否為短段落
if len(segment) < min_length:
# 檢查是否為純標點符號或數字(排除表格符號)
if segment.replace('*', '').replace('-', '').replace('_', '').replace('#', '').strip() == '':
logger.debug(f"Skipping pure symbol segment: '{segment}'")
i += 1
continue
# 短段落需要合併
if current_merge:
current_merge += " " + segment
else:
current_merge = segment
logger.debug(f"Adding short segment to merge: '{segment}' (length: {len(segment)})")
else:
# 長段落,先處理之前積累的短段落
if current_merge:
merged_segments.append(current_merge.strip())
logger.debug(f"Merged short segments: '{current_merge[:50]}...' (total length: {len(current_merge)})")
current_merge = ""
# 添加當前長段落
merged_segments.append(segment)
logger.debug(f"Added long segment: '{segment[:50]}...' (length: {len(segment)})")
i += 1
# 處理最後剩餘的短段落
if current_merge:
merged_segments.append(current_merge.strip())
logger.debug(f"Final merged short segments: '{current_merge[:50]}...' (total length: {len(current_merge)})")
logger.info(f"Segment merging: {len(text_segments)} -> {len(merged_segments)} segments")
return merged_segments
def _detect_content_type(self, text: str) -> str:
"""检测内容类型"""
text_lower = text.lower().strip()
# 检测表格(包含多个|或制表符)
if ('|' in text and text.count('|') >= 2) or '\t' in text:
return 'table'
# 检测标题
if (text_lower.startswith(('', '', 'chapter', 'section', '#')) or
any(keyword in text_lower for keyword in ['', '', '']) and len(text) < 100):
return 'heading'
# 检测列表
if (text_lower.startswith(('', '-', '*', '1.', '2.', '3.', '4.', '5.')) or
any(text_lower.startswith(f"{i}.") for i in range(1, 20))):
return 'list'
return 'paragraph'
def _add_table_content(self, doc, text: str, index: int):
"""添加表格内容"""
from docx.shared import Pt
# 添加表格标题
title_para = doc.add_paragraph()
title_run = title_para.add_run(f"表格 {index}: ")
title_run.bold = True
title_run.font.size = Pt(12)
# 解析表格
if '|' in text:
# Markdown风格表格
lines = [line.strip() for line in text.split('\n') if line.strip()]
rows = []
for line in lines:
if line.startswith('|') and line.endswith('|'):
cells = [cell.strip() for cell in line.split('|')[1:-1]]
if cells: # 过滤掉分隔行(如|---|---|
if not all(cell.replace('-', '').replace(' ', '') == '' for cell in cells):
rows.append(cells)
if rows:
# 创建表格
table = doc.add_table(rows=len(rows), cols=len(rows[0]))
table.style = 'Table Grid'
for i, row_data in enumerate(rows):
for j, cell_data in enumerate(row_data):
if j < len(table.rows[i].cells):
cell = table.rows[i].cells[j]
cell.text = cell_data
# 设置字体
for paragraph in cell.paragraphs:
for run in paragraph.runs:
run.font.size = Pt(10)
else:
# 制表符分隔的表格
para = doc.add_paragraph()
content_run = para.add_run(text)
content_run.font.name = 'Courier New'
content_run.font.size = Pt(10)
def _add_heading_content(self, doc, text: str, index: int):
"""添加标题内容"""
from docx.shared import Pt
# 移除段落编号,直接作为标题
clean_text = text.strip()
if len(clean_text) < 100:
heading = doc.add_heading(clean_text, level=2)
else:
# 长文本作为普通段落但使用标题样式
para = doc.add_paragraph()
run = para.add_run(clean_text)
run.bold = True
run.font.size = Pt(14)
def _add_list_content(self, doc, text: str, index: int):
"""添加列表内容"""
from docx.shared import Pt
# 检查是否已经有编号
if any(text.strip().startswith(f"{i}.") for i in range(1, 20)):
# 已编号列表
para = doc.add_paragraph(text.strip(), style='List Number')
else:
# 项目符号列表
para = doc.add_paragraph(text.strip(), style='List Bullet')
# 设置字体大小
for run in para.runs:
run.font.size = Pt(11)
def _add_paragraph_content(self, doc, text: str, index: int):
"""添加普通段落内容"""
from docx.shared import Pt
para = doc.add_paragraph()
# 添加段落编号(可选)
num_run = para.add_run(f"{index:03d}. ")
num_run.bold = True
num_run.font.size = Pt(12)
# 添加内容
content_run = para.add_run(text)
content_run.font.size = Pt(11)
# 设置段落间距
para.paragraph_format.space_after = Pt(6)