2634 lines
123 KiB
Python
2634 lines
123 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
翻譯服務
|
||
|
||
Author: PANJIT IT Team
|
||
Created: 2024-01-28
|
||
Modified: 2024-01-28
|
||
"""
|
||
|
||
import hashlib
|
||
import time
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any, Optional, Tuple
|
||
from app.utils.logger import get_logger
|
||
from app.utils.exceptions import TranslationError, FileProcessingError
|
||
from app.services.dify_client import DifyClient
|
||
from app.services.document_processor import DocumentProcessor, Segment
|
||
from app.models.cache import TranslationCache
|
||
from app.models.job import TranslationJob
|
||
from app.utils.helpers import generate_filename, create_job_directory
|
||
from app import db
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class DocumentParser:
|
||
"""文件解析器基類"""
|
||
|
||
def __init__(self, file_path: str):
|
||
self.file_path = Path(file_path)
|
||
|
||
if not self.file_path.exists():
|
||
raise FileProcessingError(f"檔案不存在: {file_path}")
|
||
|
||
def extract_text_segments(self) -> List[str]:
|
||
"""提取文字片段"""
|
||
raise NotImplementedError
|
||
|
||
def generate_translated_document(self, translations: Dict[str, List[str]],
|
||
target_language: str, output_dir: Path) -> str:
|
||
"""生成翻譯後的文件"""
|
||
raise NotImplementedError
|
||
|
||
|
||
class DocxParser(DocumentParser):
|
||
"""DOCX 文件解析器 - 使用增強的 DocumentProcessor"""
|
||
|
||
def __init__(self, file_path: str):
|
||
super().__init__(file_path)
|
||
self.processor = DocumentProcessor()
|
||
|
||
def extract_text_segments(self) -> List[str]:
|
||
"""提取 DOCX 文件的文字片段 - 使用增強邏輯"""
|
||
try:
|
||
# 使用新的文檔處理器提取段落
|
||
segments = self.processor.extract_docx_segments(str(self.file_path))
|
||
|
||
# 轉換為文字列表
|
||
text_segments = []
|
||
for seg in segments:
|
||
if seg.text.strip() and len(seg.text.strip()) > 3:
|
||
text_segments.append(seg.text)
|
||
|
||
logger.info(f"Enhanced extraction: {len(text_segments)} text segments from DOCX")
|
||
return text_segments
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from DOCX: {str(e)}")
|
||
raise FileProcessingError(f"DOCX 文件解析失敗: {str(e)}")
|
||
|
||
def extract_segments_with_context(self) -> List[Segment]:
|
||
"""提取帶上下文的段落資訊"""
|
||
return self.processor.extract_docx_segments(str(self.file_path))
|
||
|
||
def generate_translated_document(self, translations: Dict[str, List[str]],
|
||
target_language: str, output_dir: Path) -> str:
|
||
"""生成翻譯後的 DOCX 文件 - 使用增強的翻譯插入邏輯(從快取讀取)"""
|
||
try:
|
||
from sqlalchemy import text as sql_text
|
||
from app import db
|
||
|
||
# 生成輸出檔名
|
||
output_filename = generate_filename(
|
||
self.file_path.name,
|
||
'translated',
|
||
'translated',
|
||
target_language
|
||
)
|
||
output_path = output_dir / output_filename
|
||
|
||
# 提取段落資訊
|
||
segments = self.extract_segments_with_context()
|
||
|
||
# 建立翻譯映射 - 從快取讀取而非使用傳入的translations參數
|
||
translation_map = {}
|
||
|
||
logger.info(f"Building translation map for {len(segments)} segments in language {target_language}")
|
||
|
||
for seg in segments:
|
||
# 從翻譯快取中查詢每個段落的翻譯
|
||
result = db.session.execute(sql_text("""
|
||
SELECT translated_text
|
||
FROM dt_translation_cache
|
||
WHERE source_text = :text AND target_language = :lang
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
"""), {'text': seg.text, 'lang': target_language})
|
||
|
||
row = result.fetchone()
|
||
if row and row[0]:
|
||
translation_map[(target_language, seg.text)] = row[0]
|
||
logger.debug(f"Found translation for: {seg.text[:50]}...")
|
||
else:
|
||
logger.warning(f"No translation found for: {seg.text[:50]}...")
|
||
|
||
logger.info(f"Translation map built with {len(translation_map)} mappings")
|
||
|
||
# 使用增強的翻譯插入邏輯
|
||
ok_count, skip_count = self.processor.insert_docx_translations(
|
||
str(self.file_path),
|
||
segments,
|
||
translation_map,
|
||
[target_language],
|
||
str(output_path)
|
||
)
|
||
|
||
logger.info(f"Enhanced translation: Generated {output_path} with {ok_count} insertions, {skip_count} skips")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated DOCX: {str(e)}")
|
||
raise FileProcessingError(f"生成翻譯 DOCX 失敗: {str(e)}")
|
||
|
||
|
||
class DocParser(DocumentParser):
|
||
"""DOC 文件解析器 - 需要先轉換為 DOCX"""
|
||
|
||
def extract_text_segments(self) -> List[str]:
|
||
"""提取 DOC 文件的文字片段 - 先轉換為 DOCX 再處理"""
|
||
try:
|
||
# 檢查是否有 Word COM 支援
|
||
import tempfile
|
||
import os
|
||
|
||
try:
|
||
import win32com.client as win32
|
||
import pythoncom
|
||
_WIN32COM_AVAILABLE = True
|
||
except ImportError:
|
||
_WIN32COM_AVAILABLE = False
|
||
|
||
if not _WIN32COM_AVAILABLE:
|
||
raise FileProcessingError("DOC 格式需要 Word COM 支援,請先手動轉換為 DOCX 格式或安裝 Microsoft Office")
|
||
|
||
# 創建臨時 DOCX 文件
|
||
temp_docx = None
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp:
|
||
temp_docx = tmp.name
|
||
|
||
# 使用 Word COM 轉換 DOC 到 DOCX (格式 16)
|
||
self._word_convert(str(self.file_path), temp_docx, 16)
|
||
|
||
# 使用 DOCX 解析器處理轉換後的文件
|
||
docx_parser = DocxParser(temp_docx)
|
||
segments = docx_parser.extract_text_segments()
|
||
|
||
logger.info(f"Converted DOC to DOCX and extracted {len(segments)} segments")
|
||
return segments
|
||
|
||
finally:
|
||
# 清理臨時文件
|
||
if temp_docx and os.path.exists(temp_docx):
|
||
try:
|
||
os.remove(temp_docx)
|
||
except Exception:
|
||
pass
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from DOC file: {str(e)}")
|
||
raise FileProcessingError(f"DOC 文件解析失敗: {str(e)}")
|
||
|
||
def _word_convert(self, input_path: str, output_path: str, target_format: int):
|
||
"""使用 Word COM 轉換文件格式(移植自參考檔案)"""
|
||
try:
|
||
import win32com.client as win32
|
||
import pythoncom
|
||
|
||
pythoncom.CoInitialize()
|
||
try:
|
||
word = win32.Dispatch("Word.Application")
|
||
word.Visible = False
|
||
doc = word.Documents.Open(os.path.abspath(input_path))
|
||
doc.SaveAs2(os.path.abspath(output_path), FileFormat=target_format)
|
||
doc.Close(False)
|
||
finally:
|
||
word.Quit()
|
||
pythoncom.CoUninitialize()
|
||
except Exception as e:
|
||
raise FileProcessingError(f"Word COM 轉換失敗: {str(e)}")
|
||
|
||
def generate_translated_document(self, translations: Dict[str, List[str]],
|
||
target_language: str, output_dir: Path) -> str:
|
||
"""生成翻譯後的 DOC 文件 - 先轉為 DOCX 處理後輸出為 DOCX"""
|
||
try:
|
||
import tempfile
|
||
import os
|
||
|
||
# 先轉換為 DOCX,然後使用 DOCX 處理邏輯
|
||
temp_docx = None
|
||
try:
|
||
with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp:
|
||
temp_docx = tmp.name
|
||
|
||
# 轉換 DOC 到 DOCX
|
||
self._word_convert(str(self.file_path), temp_docx, 16)
|
||
|
||
# 使用 DOCX 解析器生成翻譯文檔
|
||
docx_parser = DocxParser(temp_docx)
|
||
|
||
# 注意:最終輸出為 DOCX 格式,因為 DOC 格式較難直接處理
|
||
output_filename = f"{self.file_path.stem}_{target_language}_translated.docx"
|
||
output_path = output_dir / output_filename
|
||
|
||
result_path = docx_parser.generate_translated_document(translations, target_language, output_dir)
|
||
|
||
logger.info(f"Generated translated DOC file (as DOCX): {result_path}")
|
||
return result_path
|
||
|
||
finally:
|
||
# 清理臨時文件
|
||
if temp_docx and os.path.exists(temp_docx):
|
||
try:
|
||
os.remove(temp_docx)
|
||
except Exception:
|
||
pass
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated DOC file: {str(e)}")
|
||
raise FileProcessingError(f"DOC 翻譯檔生成失敗: {str(e)}")
|
||
|
||
|
||
class ExcelParser(DocumentParser):
|
||
"""Excel 文件解析器(XLSX/XLS)- 移植自參考檔案"""
|
||
|
||
def extract_text_segments(self) -> List[str]:
|
||
"""提取 Excel 文件的文字片段"""
|
||
try:
|
||
import openpyxl
|
||
from openpyxl.utils.exceptions import InvalidFileException
|
||
|
||
# 載入工作簿(移植自參考檔案邏輯)
|
||
try:
|
||
wb = openpyxl.load_workbook(str(self.file_path), data_only=False)
|
||
wb_vals = openpyxl.load_workbook(str(self.file_path), data_only=True)
|
||
except InvalidFileException:
|
||
if self.file_path.suffix.lower() == '.xls':
|
||
raise FileProcessingError("XLS 格式需要先轉換為 XLSX 格式")
|
||
raise
|
||
except Exception:
|
||
wb_vals = None
|
||
|
||
# 提取文字段落(完全按照參考檔案的邏輯)
|
||
segs = []
|
||
for ws in wb.worksheets:
|
||
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
|
||
max_row, max_col = ws.max_row, ws.max_column
|
||
|
||
for r in range(1, max_row + 1):
|
||
for c in range(1, max_col + 1):
|
||
src_text = self._get_display_text_for_translation(ws, ws_vals, r, c)
|
||
if not src_text:
|
||
continue
|
||
if not self._should_translate(src_text, 'auto'):
|
||
continue
|
||
segs.append(src_text)
|
||
|
||
# 去重保持順序
|
||
unique_segments = []
|
||
seen = set()
|
||
for seg in segs:
|
||
if seg not in seen:
|
||
unique_segments.append(seg)
|
||
seen.add(seg)
|
||
|
||
logger.info(f"Extracted {len(unique_segments)} unique text segments from Excel file")
|
||
return unique_segments
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from Excel file: {str(e)}")
|
||
raise FileProcessingError(f"Excel 文件解析失敗: {str(e)}")
|
||
|
||
def _get_display_text_for_translation(self, ws, ws_vals, r: int, c: int) -> Optional[str]:
|
||
"""取得儲存格用於翻譯的顯示文字(完全移植自參考檔案)"""
|
||
val = ws.cell(row=r, column=c).value
|
||
if isinstance(val, str) and val.startswith("="):
|
||
if ws_vals is not None:
|
||
shown = ws_vals.cell(row=r, column=c).value
|
||
return shown if isinstance(shown, str) and shown.strip() else None
|
||
return None
|
||
if isinstance(val, str) and val.strip():
|
||
return val
|
||
if ws_vals is not None:
|
||
shown = ws_vals.cell(row=r, column=c).value
|
||
if isinstance(shown, str) and shown.strip():
|
||
return shown
|
||
return None
|
||
|
||
def _should_translate(self, text: str, src_lang: str) -> bool:
|
||
"""判斷文字是否需要翻譯(只要有字就翻譯)"""
|
||
text = text.strip()
|
||
|
||
# 只要有字就翻譯 - 最小長度設為1
|
||
if len(text) < 1:
|
||
return False
|
||
|
||
# Skip pure numbers, dates, etc.
|
||
import re
|
||
if re.match(r'^[\d\s\.\-\:\/]+$', text):
|
||
return False
|
||
|
||
# For auto-detect, translate if has CJK or meaningful text
|
||
if src_lang.lower() in ('auto', 'auto-detect'):
|
||
return self._has_cjk(text) or len(text) > 5
|
||
|
||
return True
|
||
|
||
def _has_cjk(self, text: str) -> bool:
|
||
"""檢查是否包含中日韓文字(移植自參考檔案)"""
|
||
for char in text:
|
||
if '\u4e00' <= char <= '\u9fff' or \
|
||
'\u3400' <= char <= '\u4dbf' or \
|
||
'\u20000' <= char <= '\u2a6df' or \
|
||
'\u3040' <= char <= '\u309f' or \
|
||
'\u30a0' <= char <= '\u30ff' or \
|
||
'\uac00' <= char <= '\ud7af':
|
||
return True
|
||
return False
|
||
|
||
def generate_translated_document(self, translations: Dict[str, List[str]],
|
||
target_language: str, output_dir: Path) -> str:
|
||
"""生成翻譯後的 Excel 文件(使用翻譯快取確保正確映射)"""
|
||
try:
|
||
import openpyxl
|
||
from openpyxl.styles import Alignment
|
||
from openpyxl.comments import Comment
|
||
from sqlalchemy import text as sql_text
|
||
from app import db
|
||
|
||
# 載入原始工作簿
|
||
wb = openpyxl.load_workbook(str(self.file_path), data_only=False)
|
||
try:
|
||
wb_vals = openpyxl.load_workbook(str(self.file_path), data_only=True)
|
||
except Exception:
|
||
wb_vals = None
|
||
|
||
# 建立翻譯映射 - 改用翻譯快取查詢,確保正確對應
|
||
original_segments = self.extract_text_segments()
|
||
tmap = {}
|
||
|
||
logger.info(f"Building translation map for {len(original_segments)} segments in language {target_language}")
|
||
|
||
for original_text in original_segments:
|
||
# 從翻譯快取中查詢每個原文的翻譯
|
||
# 使用聯合查詢,優先使用最早的翻譯記錄(原始DIFY翻譯)
|
||
normalized_text = original_text.replace('\n', ' ').replace('\r', ' ').strip()
|
||
result = db.session.execute(sql_text("""
|
||
SELECT translated_text, created_at, 'exact' as match_type
|
||
FROM dt_translation_cache
|
||
WHERE source_text = :exact_text AND target_language = :lang
|
||
|
||
UNION ALL
|
||
|
||
SELECT translated_text, created_at, 'normalized' as match_type
|
||
FROM dt_translation_cache
|
||
WHERE REPLACE(REPLACE(TRIM(source_text), '\n', ' '), '\r', ' ') = :norm_text
|
||
AND target_language = :lang
|
||
AND source_text != :exact_text
|
||
|
||
ORDER BY created_at ASC
|
||
LIMIT 1
|
||
"""), {'exact_text': original_text, 'norm_text': normalized_text, 'lang': target_language})
|
||
|
||
row = result.fetchone()
|
||
if row and row[0]:
|
||
tmap[original_text] = row[0]
|
||
logger.debug(f"Cache hit for Excel: {original_text[:30]}... -> {row[0][:30]}...")
|
||
else:
|
||
logger.warning(f"No translation found in cache for: {original_text[:50]}...")
|
||
|
||
logger.info(f"Translation map built with {len(tmap)} mappings from cache")
|
||
|
||
# 處理每個工作表(加入詳細調試日誌)
|
||
translation_count = 0
|
||
skip_count = 0
|
||
|
||
for ws in wb.worksheets:
|
||
logger.info(f"Processing worksheet: {ws.title}")
|
||
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
|
||
max_row, max_col = ws.max_row, ws.max_column
|
||
|
||
for r in range(1, max_row + 1):
|
||
for c in range(1, max_col + 1):
|
||
cell_name = f"{openpyxl.utils.get_column_letter(c)}{r}"
|
||
src_text = self._get_display_text_for_translation(ws, ws_vals, r, c)
|
||
|
||
if not src_text:
|
||
continue
|
||
|
||
# 檢查是否需要翻譯
|
||
should_translate = self._should_translate(src_text, 'auto')
|
||
if not should_translate:
|
||
logger.debug(f"Skip {cell_name}: '{src_text[:30]}...' (should not translate)")
|
||
skip_count += 1
|
||
continue
|
||
|
||
# 檢查翻譯映射
|
||
if src_text not in tmap:
|
||
logger.warning(f"No translation mapping for {cell_name}: '{src_text[:30]}...'")
|
||
skip_count += 1
|
||
continue
|
||
|
||
val = ws.cell(row=r, column=c).value
|
||
is_formula = isinstance(val, str) and val.startswith("=")
|
||
translated_text = tmap[src_text]
|
||
|
||
cell = ws.cell(row=r, column=c)
|
||
|
||
if is_formula:
|
||
# 公式儲存格:添加註解
|
||
txt_comment = f"翻譯: {translated_text}"
|
||
exist = cell.comment
|
||
if not exist or exist.text.strip() != txt_comment:
|
||
cell.comment = Comment(txt_comment, "translator")
|
||
logger.debug(f"Added comment to {cell_name}: {translated_text[:30]}...")
|
||
translation_count += 1
|
||
else:
|
||
# 一般儲存格:單語言檔案只保留翻譯文,不包含原文
|
||
# 檢查是否已經是預期的格式
|
||
current_text = str(cell.value) if cell.value else ""
|
||
if current_text.strip() == translated_text.strip():
|
||
logger.debug(f"Skip {cell_name}: already translated")
|
||
continue
|
||
|
||
cell.value = translated_text # 只保留翻譯文
|
||
logger.info(f"Translated {cell_name}: '{src_text[:20]}...' -> '{translated_text[:20]}...'")
|
||
translation_count += 1
|
||
|
||
# 設定自動換行(移植自參考檔案)
|
||
try:
|
||
if cell.alignment:
|
||
cell.alignment = Alignment(
|
||
horizontal=cell.alignment.horizontal,
|
||
vertical=cell.alignment.vertical,
|
||
wrap_text=True
|
||
)
|
||
else:
|
||
cell.alignment = Alignment(wrap_text=True)
|
||
except Exception:
|
||
cell.alignment = Alignment(wrap_text=True)
|
||
|
||
# 儲存翻譯後的檔案
|
||
output_filename = f"{self.file_path.stem}_{target_language}_translated.xlsx"
|
||
output_path = output_dir / output_filename
|
||
wb.save(str(output_path))
|
||
|
||
logger.info(f"Excel translation completed: {translation_count} translations, {skip_count} skips")
|
||
logger.info(f"Generated translated Excel file: {output_path}")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated Excel file: {str(e)}")
|
||
raise FileProcessingError(f"Excel 翻譯檔生成失敗: {str(e)}")
|
||
|
||
|
||
class PdfParser(DocumentParser):
|
||
"""PDF 文件解析器 - 支持扫描PDF的OCR处理"""
|
||
|
||
def extract_text_segments(self, user_id: int = None, job_id: int = None) -> List[str]:
|
||
"""提取 PDF 文件的文字片段 - 支持扫描PDF的智能处理"""
|
||
try:
|
||
from app.services.enhanced_pdf_parser import EnhancedPdfParser
|
||
|
||
# 使用增强的PDF解析器
|
||
enhanced_parser = EnhancedPdfParser(str(self.file_path))
|
||
text_segments = enhanced_parser.extract_text_segments(user_id, job_id)
|
||
|
||
logger.info(f"Enhanced PDF extraction: {len(text_segments)} text segments")
|
||
return text_segments
|
||
|
||
except Exception as e:
|
||
logger.error(f"Enhanced PDF extraction failed, falling back to basic extraction: {str(e)}")
|
||
|
||
# 回退到基本文字提取
|
||
try:
|
||
from PyPDF2 import PdfReader
|
||
|
||
reader = PdfReader(str(self.file_path))
|
||
text_segments = []
|
||
|
||
for page in reader.pages:
|
||
text = page.extract_text()
|
||
|
||
# 簡單的句子分割
|
||
sentences = text.split('.')
|
||
for sentence in sentences:
|
||
sentence = sentence.strip()
|
||
if sentence and len(sentence) > 10:
|
||
text_segments.append(sentence)
|
||
|
||
logger.info(f"Basic PDF extraction: {len(text_segments)} text segments")
|
||
return text_segments
|
||
|
||
except Exception as e2:
|
||
logger.error(f"Basic PDF extraction also failed: {str(e2)}")
|
||
raise FileProcessingError(f"PDF 文件解析失敗: {str(e2)}")
|
||
|
||
def generate_translated_document(self, translations: Dict[str, List[str]],
|
||
target_language: str, output_dir: Path) -> str:
|
||
"""生成翻譯文字檔(PDF 不支援直接編輯)"""
|
||
try:
|
||
from app.services.enhanced_pdf_parser import EnhancedPdfParser
|
||
|
||
# 使用增强解析器生成翻译文档
|
||
enhanced_parser = EnhancedPdfParser(str(self.file_path))
|
||
return enhanced_parser.generate_translated_document(translations, target_language, output_dir)
|
||
|
||
except Exception as e:
|
||
# 回退到基本生成方式
|
||
logger.warning(f"Enhanced PDF generation failed, using basic method: {str(e)}")
|
||
|
||
translated_texts = translations.get(target_language, [])
|
||
|
||
# 生成純文字檔案
|
||
output_filename = f"{self.file_path.stem}_{target_language}_translated.txt"
|
||
output_path = output_dir / output_filename
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(f"翻譯結果 - {target_language}\n")
|
||
f.write("=" * 50 + "\n\n")
|
||
|
||
for i, text in enumerate(translated_texts):
|
||
f.write(f"{i+1}. {text}\n\n")
|
||
|
||
logger.info(f"Generated translated text file: {output_path}")
|
||
return str(output_path)
|
||
|
||
|
||
class PptxParser(DocumentParser):
|
||
"""PowerPoint 文件解析器"""
|
||
|
||
def extract_text_segments(self) -> List[str]:
|
||
"""提取 PPTX 文件的文字片段(包含表格)"""
|
||
try:
|
||
import pptx
|
||
|
||
prs = pptx.Presentation(str(self.file_path))
|
||
text_segments = []
|
||
|
||
for slide_idx, slide in enumerate(prs.slides, 1):
|
||
for shape_idx, shape in enumerate(slide.shapes, 1):
|
||
shape_processed = False
|
||
|
||
# 處理文字框 - 優先處理,因為大多數文字都在這裡
|
||
if getattr(shape, "has_text_frame", False):
|
||
text_frame = shape.text_frame
|
||
text = self._extract_text_from_frame(text_frame)
|
||
|
||
if text.strip():
|
||
text_segments.append(text)
|
||
logger.debug(f"Extracted text frame from slide {slide_idx}, shape {shape_idx}: {text[:50]}...")
|
||
shape_processed = True
|
||
|
||
# 處理表格
|
||
if getattr(shape, "has_table", False):
|
||
table_texts = self._extract_text_from_table(shape.table, slide_idx, shape_idx)
|
||
text_segments.extend(table_texts)
|
||
if table_texts:
|
||
shape_processed = True
|
||
|
||
# 處理圖表 (Charts)
|
||
if getattr(shape, "has_chart", False):
|
||
chart_texts = self._extract_text_from_chart(shape.chart, slide_idx, shape_idx)
|
||
text_segments.extend(chart_texts)
|
||
if chart_texts:
|
||
shape_processed = True
|
||
|
||
# 處理群組形狀 (Grouped Shapes) - 支援深度嵌套
|
||
if hasattr(shape, 'shapes'):
|
||
group_texts = self._extract_text_from_group(shape.shapes, slide_idx, shape_idx, depth=0)
|
||
text_segments.extend(group_texts)
|
||
if group_texts:
|
||
shape_processed = True
|
||
|
||
# 處理 GraphicFrame (可能包含 SmartArt 等)
|
||
if getattr(shape, "has_smart_art", False):
|
||
smartart_texts = self._extract_text_from_smartart(shape, slide_idx, shape_idx)
|
||
text_segments.extend(smartart_texts)
|
||
if smartart_texts:
|
||
shape_processed = True
|
||
|
||
# 處理基本形狀內的文字 - 作為備用方案,避免重複提取
|
||
if not shape_processed and hasattr(shape, 'text') and shape.text.strip():
|
||
text_segments.append(shape.text)
|
||
logger.debug(f"Extracted shape text from slide {slide_idx}, shape {shape_idx}: {shape.text[:50]}...")
|
||
shape_processed = True
|
||
|
||
# 如果以上都沒有處理到,檢查是否有其他可能的文字內容
|
||
if not shape_processed:
|
||
# 嘗試更深層的文字提取
|
||
fallback_texts = self._extract_fallback_text(shape, slide_idx, shape_idx)
|
||
text_segments.extend(fallback_texts)
|
||
|
||
logger.info(f"PowerPoint extraction: {len(text_segments)} text segments from PPTX (including tables)")
|
||
|
||
# 診斷特定關鍵字 - 增強版
|
||
target_keywords = [
|
||
"檢驗盤剔線作業時缺少線塌防護設計",
|
||
"治工具未標準化管理",
|
||
"彈匣裝載料片間距不足",
|
||
"彈匣未評估防震防傾倒風險",
|
||
"搬運台車選用錯誤"
|
||
]
|
||
|
||
logger.info("=== 關鍵字診斷開始 ===")
|
||
for keyword in target_keywords:
|
||
# 完全匹配
|
||
exact_matches = [seg for seg in text_segments if keyword == seg.strip()]
|
||
# 包含匹配
|
||
contains_matches = [seg for seg in text_segments if keyword in seg]
|
||
# 模糊匹配(去掉空白和換行符)
|
||
normalized_keyword = keyword.replace(' ', '').replace('\n', '').replace('\r', '')
|
||
fuzzy_matches = [seg for seg in text_segments
|
||
if normalized_keyword in seg.replace(' ', '').replace('\n', '').replace('\r', '')]
|
||
|
||
if exact_matches:
|
||
logger.info(f"✅ 完全匹配關鍵字: '{keyword}' 在 {len(exact_matches)} 個文字片段中")
|
||
for i, seg in enumerate(exact_matches):
|
||
logger.info(f" 完全匹配{i+1}: '{seg}'")
|
||
elif contains_matches:
|
||
logger.info(f"🔍 包含關鍵字: '{keyword}' 在 {len(contains_matches)} 個文字片段中")
|
||
for i, seg in enumerate(contains_matches):
|
||
logger.info(f" 包含匹配{i+1}: '{seg}'")
|
||
elif fuzzy_matches:
|
||
logger.info(f"🎯 模糊匹配關鍵字: '{keyword}' 在 {len(fuzzy_matches)} 個文字片段中")
|
||
for i, seg in enumerate(fuzzy_matches):
|
||
logger.info(f" 模糊匹配{i+1}: '{seg}'")
|
||
# 顯示標準化後的比較
|
||
normalized_seg = seg.replace(' ', '').replace('\n', '').replace('\r', '')
|
||
logger.info(f" 標準化後: 關鍵字='{normalized_keyword}' vs 片段='{normalized_seg}'")
|
||
else:
|
||
logger.warning(f"❌ 未找到關鍵字: '{keyword}'")
|
||
# 檢查是否有類似的文字
|
||
similar_segments = []
|
||
for seg in text_segments:
|
||
# 計算相似度(簡單的關鍵詞匹配)
|
||
keyword_chars = set(keyword)
|
||
seg_chars = set(seg)
|
||
intersection = keyword_chars.intersection(seg_chars)
|
||
if len(intersection) >= min(5, len(keyword_chars) * 0.5):
|
||
similar_segments.append(seg)
|
||
|
||
if similar_segments:
|
||
logger.info(f"💡 可能相似的片段 ({len(similar_segments)} 個):")
|
||
for i, seg in enumerate(similar_segments[:3]): # 只顯示前3個
|
||
logger.info(f" 相似{i+1}: '{seg}'")
|
||
|
||
logger.info("=== 關鍵字診斷結束 ===")
|
||
|
||
return text_segments
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from PPTX: {str(e)}")
|
||
raise FileProcessingError(f"PPTX 文件解析失敗: {str(e)}")
|
||
|
||
def _extract_text_from_frame(self, text_frame) -> str:
|
||
"""從文字框中提取文字內容,包含標準化處理"""
|
||
if not text_frame or not hasattr(text_frame, 'paragraphs'):
|
||
return ""
|
||
|
||
# 收集所有段落文字
|
||
paragraphs = []
|
||
for para in text_frame.paragraphs:
|
||
para_text = para.text
|
||
if para_text and para_text.strip():
|
||
paragraphs.append(para_text.strip())
|
||
|
||
if not paragraphs:
|
||
return ""
|
||
|
||
# 合併段落
|
||
text = "\n".join(paragraphs)
|
||
|
||
# 標準化文字處理
|
||
import re
|
||
# 1. 標準化換行符
|
||
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
||
# 2. 移除末尾的換行符(但保留中間的)
|
||
text = text.rstrip('\n')
|
||
# 3. 標準化多重空白(但保留單個換行符)
|
||
text = re.sub(r'[ \t]+', ' ', text)
|
||
# 4. 移除段落間多餘空行
|
||
text = re.sub(r'\n\s*\n', '\n', text)
|
||
|
||
return text
|
||
|
||
def _extract_text_from_table(self, table, slide_idx: int, shape_idx: int) -> List[str]:
|
||
"""從表格中提取文字內容"""
|
||
table_texts = []
|
||
|
||
try:
|
||
for row_idx, row in enumerate(table.rows):
|
||
for col_idx, cell in enumerate(row.cells):
|
||
cell_text = cell.text_frame.text.strip()
|
||
|
||
if cell_text:
|
||
table_texts.append(cell_text)
|
||
logger.debug(f"Extracted table cell text from slide {slide_idx}, shape {shape_idx}, "
|
||
f"row {row_idx+1}, col {col_idx+1}: {cell_text[:50]}...")
|
||
|
||
logger.info(f"Extracted {len(table_texts)} cells from table on slide {slide_idx}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from table on slide {slide_idx}: {str(e)}")
|
||
|
||
return table_texts
|
||
|
||
def _extract_text_from_chart(self, chart, slide_idx: int, shape_idx: int) -> List[str]:
|
||
"""從圖表中提取文字內容"""
|
||
chart_texts = []
|
||
|
||
try:
|
||
# 嘗試提取圖表標題
|
||
if hasattr(chart, 'chart_title') and chart.chart_title.has_text_frame:
|
||
title_text = chart.chart_title.text_frame.text.strip()
|
||
if title_text:
|
||
chart_texts.append(title_text)
|
||
logger.debug(f"Extracted chart title from slide {slide_idx}: {title_text[:50]}...")
|
||
|
||
# 嘗試提取其他圖表元素的文字(受限於 python-pptx 支援)
|
||
# 注意:python-pptx 對圖表的支援有限,無法直接存取軸標籤等
|
||
logger.info(f"Extracted {len(chart_texts)} text elements from chart on slide {slide_idx}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from chart on slide {slide_idx}: {str(e)}")
|
||
|
||
return chart_texts
|
||
|
||
def _extract_text_from_group(self, shapes, slide_idx: int, shape_idx: int, depth: int = 0) -> List[str]:
|
||
"""從群組形狀中提取文字內容 - 支援深度嵌套群組"""
|
||
group_texts = []
|
||
max_depth = 10 # 防止無限遞歸
|
||
|
||
if depth > max_depth:
|
||
logger.warning(f"Group nesting depth exceeded {max_depth} on slide {slide_idx}, skipping deeper levels")
|
||
return group_texts
|
||
|
||
try:
|
||
for sub_shape_idx, sub_shape in enumerate(shapes):
|
||
shape_processed = False
|
||
|
||
# 1. 優先處理嵌套群組(遞歸處理)
|
||
if hasattr(sub_shape, 'shapes') and hasattr(sub_shape, 'shape_type'):
|
||
try:
|
||
# 這是一個嵌套的群組
|
||
nested_texts = self._extract_text_from_group(sub_shape.shapes, slide_idx,
|
||
f"{shape_idx}.{sub_shape_idx}", depth + 1)
|
||
group_texts.extend(nested_texts)
|
||
if nested_texts:
|
||
shape_processed = True
|
||
logger.debug(f"Extracted {len(nested_texts)} texts from nested group "
|
||
f"at slide {slide_idx}, depth {depth + 1}")
|
||
except Exception as e:
|
||
logger.debug(f"Failed to process nested group at slide {slide_idx}, "
|
||
f"depth {depth + 1}: {str(e)}")
|
||
|
||
# 2. 處理文字框
|
||
if getattr(sub_shape, "has_text_frame", False):
|
||
text = self._extract_text_from_frame(sub_shape.text_frame)
|
||
if text.strip():
|
||
group_texts.append(text)
|
||
logger.debug(f"Extracted group text from slide {slide_idx}, group {shape_idx}, "
|
||
f"sub-shape {sub_shape_idx} (depth {depth}): {text[:50]}...")
|
||
shape_processed = True
|
||
|
||
# 3. 處理群組內的表格
|
||
if getattr(sub_shape, "has_table", False):
|
||
sub_table_texts = self._extract_text_from_table(sub_shape.table, slide_idx,
|
||
f"{shape_idx}.{sub_shape_idx}")
|
||
group_texts.extend(sub_table_texts)
|
||
if sub_table_texts:
|
||
shape_processed = True
|
||
|
||
# 4. 處理群組內的圖表
|
||
if getattr(sub_shape, "has_chart", False):
|
||
chart_texts = self._extract_text_from_chart(sub_shape.chart, slide_idx,
|
||
f"{shape_idx}.{sub_shape_idx}")
|
||
group_texts.extend(chart_texts)
|
||
if chart_texts:
|
||
shape_processed = True
|
||
|
||
# 5. 處理基本形狀文字(作為最後的備選方案)
|
||
if not shape_processed and hasattr(sub_shape, 'text') and sub_shape.text.strip():
|
||
group_texts.append(sub_shape.text)
|
||
logger.debug(f"Extracted group shape text from slide {slide_idx} "
|
||
f"(depth {depth}): {sub_shape.text[:50]}...")
|
||
shape_processed = True
|
||
|
||
# 6. 如果仍未處理,使用備用文字提取
|
||
if not shape_processed:
|
||
fallback_texts = self._extract_fallback_text(sub_shape, slide_idx,
|
||
f"{shape_idx}.{sub_shape_idx}")
|
||
group_texts.extend(fallback_texts)
|
||
|
||
logger.info(f"Extracted {len(group_texts)} text elements from grouped shapes "
|
||
f"on slide {slide_idx} (depth {depth})")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from grouped shapes on slide {slide_idx} "
|
||
f"(depth {depth}): {str(e)}")
|
||
|
||
return group_texts
|
||
|
||
def _extract_text_from_smartart(self, shape, slide_idx: int, shape_idx: int) -> List[str]:
|
||
"""從 SmartArt 中提取文字內容 - 有限支援"""
|
||
smartart_texts = []
|
||
|
||
try:
|
||
# python-pptx 對 SmartArt 支援有限,嘗試透過 XML 提取
|
||
# 這是一個基本實現,可能無法涵蓋所有 SmartArt 類型
|
||
|
||
logger.warning(f"SmartArt detected on slide {slide_idx}, shape {shape_idx} - limited support available")
|
||
logger.info("Consider using alternative libraries like Spire.Presentation for full SmartArt support")
|
||
|
||
# 暫時回傳空列表,避免錯誤
|
||
# 在未來版本中可以考慮整合 Spire.Presentation 或其他支援 SmartArt 的庫
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract text from SmartArt on slide {slide_idx}: {str(e)}")
|
||
|
||
return smartart_texts
|
||
|
||
def _extract_fallback_text(self, shape, slide_idx: int, shape_idx: int) -> List[str]:
|
||
"""備用文字提取方法,處理可能遺漏的文字內容,包括深層嵌套結構"""
|
||
fallback_texts = []
|
||
|
||
try:
|
||
# 檢查形狀類型和屬性
|
||
shape_type = getattr(shape, 'shape_type', None)
|
||
logger.debug(f"Fallback extraction for slide {slide_idx}, shape {shape_idx}, type: {shape_type}")
|
||
|
||
# 嘗試透過不同的方式取得文字
|
||
|
||
# 方法 1: 直接檢查 text 屬性(即使之前沒處理到)
|
||
if hasattr(shape, 'text'):
|
||
text = getattr(shape, 'text', '')
|
||
if text and text.strip():
|
||
fallback_texts.append(text)
|
||
logger.debug(f"Fallback: Found direct text - {text[:50]}...")
|
||
|
||
# 方法 2: 檢查是否有 text_frame 但之前沒有正確處理
|
||
try:
|
||
if hasattr(shape, 'text_frame'):
|
||
text_frame = shape.text_frame
|
||
if text_frame and hasattr(text_frame, 'text'):
|
||
text = text_frame.text
|
||
if text and text.strip():
|
||
fallback_texts.append(text)
|
||
logger.debug(f"Fallback: Found text_frame text - {text[:50]}...")
|
||
except:
|
||
pass
|
||
|
||
# 方法 2.5: 深度檢查 text_frame 內的段落結構
|
||
try:
|
||
if hasattr(shape, 'text_frame') and shape.text_frame:
|
||
text_frame = shape.text_frame
|
||
if hasattr(text_frame, 'paragraphs'):
|
||
for para_idx, paragraph in enumerate(text_frame.paragraphs):
|
||
if hasattr(paragraph, 'runs'):
|
||
for run_idx, run in enumerate(paragraph.runs):
|
||
if hasattr(run, 'text') and run.text.strip():
|
||
fallback_texts.append(run.text)
|
||
logger.debug(f"Fallback: Found run text {para_idx}.{run_idx} - {run.text[:30]}...")
|
||
except Exception as e:
|
||
logger.debug(f"Failed to extract paragraph runs: {str(e)}")
|
||
|
||
# 方法 2.6: 如果形狀有嵌套的 shapes,遞歸處理
|
||
if hasattr(shape, 'shapes') and shape.shapes:
|
||
try:
|
||
nested_texts = self._extract_text_from_group(shape.shapes, slide_idx,
|
||
f"fallback_{shape_idx}", depth=0)
|
||
fallback_texts.extend(nested_texts)
|
||
if nested_texts:
|
||
logger.debug(f"Fallback: Found {len(nested_texts)} texts from nested shapes")
|
||
except Exception as e:
|
||
logger.debug(f"Failed to extract from nested shapes: {str(e)}")
|
||
|
||
# 方法 3: 檢查特殊屬性
|
||
special_attrs = ['textFrame', 'text_frame', '_element']
|
||
for attr in special_attrs:
|
||
try:
|
||
if hasattr(shape, attr):
|
||
obj = getattr(shape, attr)
|
||
if hasattr(obj, 'text') and obj.text and obj.text.strip():
|
||
fallback_texts.append(obj.text)
|
||
logger.debug(f"Fallback: Found {attr} text - {obj.text[:30]}...")
|
||
except:
|
||
continue
|
||
|
||
# 方法 3: 如果是 GraphicFrame,嘗試更深入的提取
|
||
if hasattr(shape, 'element'):
|
||
try:
|
||
# 透過 XML 元素搜尋文字節點
|
||
element = shape.element
|
||
|
||
# 搜尋 XML 中的文字內容
|
||
text_elements = []
|
||
|
||
# 搜尋 <a:t> 標籤(文字內容)
|
||
for t_elem in element.iter():
|
||
if t_elem.tag.endswith('}t'): # 匹配 a:t 標籤
|
||
if t_elem.text and t_elem.text.strip():
|
||
text_elements.append(t_elem.text.strip())
|
||
|
||
# 去重並添加
|
||
for text in set(text_elements):
|
||
if text not in [existing_text for existing_text in fallback_texts]:
|
||
fallback_texts.append(text)
|
||
logger.debug(f"Fallback: Found XML text - {text[:50]}...")
|
||
|
||
except Exception as xml_e:
|
||
logger.debug(f"XML extraction failed for shape {shape_idx}: {str(xml_e)}")
|
||
|
||
if fallback_texts:
|
||
logger.info(f"Fallback extraction found {len(fallback_texts)} additional text elements on slide {slide_idx}, shape {shape_idx}")
|
||
else:
|
||
logger.debug(f"No additional text found in fallback for slide {slide_idx}, shape {shape_idx}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Fallback text extraction failed for slide {slide_idx}, shape {shape_idx}: {str(e)}")
|
||
|
||
return fallback_texts
|
||
|
||
def _normalize_text(self, text: str) -> str:
|
||
"""標準化文字用於比較"""
|
||
import re
|
||
return re.sub(r"\s+", " ", (text or "").strip()).lower()
|
||
|
||
def _check_existing_translations(self, text_frame, translations: List[str]) -> bool:
|
||
"""檢查翻譯是否已經存在於文字框末尾"""
|
||
if len(text_frame.paragraphs) < len(translations):
|
||
return False
|
||
|
||
# 檢查末尾的段落是否與翻譯匹配
|
||
tail_paragraphs = text_frame.paragraphs[-len(translations):]
|
||
for para, expected in zip(tail_paragraphs, translations):
|
||
if self._normalize_text(para.text) != self._normalize_text(expected):
|
||
return False
|
||
# 檢查是否為斜體格式(我們添加的翻譯標記)
|
||
if any((r.font.italic is not True) and (r.text or "").strip() for r in para.runs):
|
||
return False
|
||
return True
|
||
|
||
def _append_translation(self, text_frame, text_block: str):
|
||
"""在文字框末尾添加翻譯文字"""
|
||
try:
|
||
from pptx.util import Pt as PPTPt
|
||
|
||
para = text_frame.add_paragraph()
|
||
para.text = text_block
|
||
|
||
# 設定格式:斜體、字體大小
|
||
for run in para.runs:
|
||
run.font.italic = True
|
||
run.font.size = PPTPt(12)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to append translation to text frame: {str(e)}")
|
||
raise
|
||
|
||
def generate_translated_document(self, translations: Dict[str, List[str]],
|
||
target_language: str, output_dir: Path) -> str:
|
||
"""生成翻譯後的 PPTX 文件"""
|
||
try:
|
||
import pptx
|
||
from sqlalchemy import text as sql_text
|
||
from app import db
|
||
|
||
# 載入 PowerPoint 文件
|
||
prs = pptx.Presentation(str(self.file_path))
|
||
|
||
# 生成輸出檔名
|
||
output_filename = generate_filename(
|
||
self.file_path.name,
|
||
'translated',
|
||
'translated',
|
||
target_language
|
||
)
|
||
output_path = output_dir / output_filename
|
||
|
||
# 收集所有文字框
|
||
text_frames = []
|
||
for slide in prs.slides:
|
||
for shape in slide.shapes:
|
||
if getattr(shape, "has_text_frame", False):
|
||
text = self._extract_text_from_frame(shape.text_frame)
|
||
if text.strip():
|
||
text_frames.append((shape.text_frame, text))
|
||
|
||
# 建立翻譯映射 - 從快取讀取
|
||
translation_map = {}
|
||
logger.info(f"Building translation map for {len(text_frames)} text frames in language {target_language}")
|
||
|
||
for text_frame, text in text_frames:
|
||
# 從翻譯快取中查詢翻譯
|
||
result = db.session.execute(sql_text("""
|
||
SELECT translated_text
|
||
FROM dt_translation_cache
|
||
WHERE source_text = :text AND target_language = :lang
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
"""), {'text': text, 'lang': target_language})
|
||
|
||
row = result.fetchone()
|
||
if row and row[0]:
|
||
translation_map[text] = row[0]
|
||
logger.debug(f"Found translation for PowerPoint text: {text[:50]}...")
|
||
else:
|
||
logger.warning(f"No translation found for PowerPoint text: {text[:50]}...")
|
||
|
||
logger.info(f"Translation map built with {len(translation_map)} mappings")
|
||
|
||
# 插入翻譯
|
||
ok_count = skip_count = 0
|
||
|
||
for text_frame, original_text in text_frames:
|
||
if original_text not in translation_map:
|
||
skip_count += 1
|
||
logger.debug(f"Skip PowerPoint frame: no translation for {original_text[:30]}...")
|
||
continue
|
||
|
||
translated_text = translation_map[original_text]
|
||
translations_to_add = [translated_text] # 單一語言模式
|
||
|
||
# 檢查是否已存在翻譯
|
||
if self._check_existing_translations(text_frame, translations_to_add):
|
||
skip_count += 1
|
||
logger.debug(f"Skip PowerPoint frame: translation already exists for {original_text[:30]}...")
|
||
continue
|
||
|
||
# 添加翻譯
|
||
for translation in translations_to_add:
|
||
self._append_translation(text_frame, translation)
|
||
|
||
ok_count += 1
|
||
logger.debug(f"Added translation to PowerPoint frame: {original_text[:30]}...")
|
||
|
||
# 儲存文件
|
||
prs.save(str(output_path))
|
||
|
||
logger.info(f"PowerPoint translation completed: {ok_count} insertions, {skip_count} skips")
|
||
logger.info(f"Generated translated PowerPoint file: {output_path}")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated PPTX file: {str(e)}")
|
||
raise FileProcessingError(f"PPTX 翻譯檔生成失敗: {str(e)}")
|
||
|
||
def insert_pptx_translations(self, translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str], output_path: str) -> Tuple[int, int]:
|
||
"""插入翻譯到 PowerPoint 文件 - 單語言模式(僅翻譯文)"""
|
||
try:
|
||
import pptx
|
||
from shutil import copyfile
|
||
|
||
# 複製原始文件
|
||
copyfile(str(self.file_path), output_path)
|
||
|
||
# 載入 PowerPoint 文件
|
||
prs = pptx.Presentation(output_path)
|
||
ok_count = skip_count = 0
|
||
|
||
for slide_idx, slide in enumerate(prs.slides, 1):
|
||
for shape_idx, shape in enumerate(slide.shapes, 1):
|
||
# 使用與提取邏輯相同的處理順序(並行處理)
|
||
|
||
# 處理文字框
|
||
if getattr(shape, "has_text_frame", False):
|
||
text = self._extract_text_from_frame(shape.text_frame)
|
||
if text.strip():
|
||
ok, skip = self._insert_single_language_translation(
|
||
shape.text_frame, text, translation_map, target_languages[0]
|
||
)
|
||
ok_count += ok
|
||
skip_count += skip
|
||
|
||
# 處理表格
|
||
if getattr(shape, "has_table", False):
|
||
table_ok, table_skip = self._insert_table_translations(
|
||
shape.table, translation_map, target_languages[0]
|
||
)
|
||
ok_count += table_ok
|
||
skip_count += table_skip
|
||
|
||
# 處理圖表(並行處理)
|
||
if getattr(shape, "has_chart", False):
|
||
chart_ok, chart_skip = self._insert_chart_translations(
|
||
shape.chart, translation_map, target_languages[0]
|
||
)
|
||
ok_count += chart_ok
|
||
skip_count += chart_skip
|
||
|
||
# 處理群組形狀(並行處理,支援深度嵌套)
|
||
if hasattr(shape, 'shapes'):
|
||
group_ok, group_skip = self._insert_group_translations(
|
||
shape.shapes, translation_map, target_languages[0], slide_idx, shape_idx
|
||
)
|
||
ok_count += group_ok
|
||
skip_count += group_skip
|
||
|
||
# 處理基本形狀文字(並行處理)
|
||
if hasattr(shape, 'text') and shape.text.strip():
|
||
if (target_languages[0], shape.text) in translation_map:
|
||
translated_text = translation_map[(target_languages[0], shape.text)]
|
||
shape.text = translated_text
|
||
ok_count += 1
|
||
logger.debug(f"Inserted basic shape translation on slide {slide_idx}: {shape.text[:30]}...")
|
||
else:
|
||
skip_count += 1
|
||
|
||
# 儲存文件
|
||
prs.save(output_path)
|
||
logger.info(f"Saved PowerPoint file with {ok_count} translations, {skip_count} skips")
|
||
return ok_count, skip_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to insert PowerPoint translations: {str(e)}")
|
||
raise FileProcessingError(f"PowerPoint 翻譯插入失敗: {str(e)}")
|
||
|
||
def insert_pptx_combined_translations(self, translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str], output_path: str) -> Tuple[int, int]:
|
||
"""插入翻譯到 PowerPoint 文件 - 組合模式(原文+所有譯文)"""
|
||
try:
|
||
import pptx
|
||
from shutil import copyfile
|
||
|
||
# 複製原始文件
|
||
copyfile(str(self.file_path), output_path)
|
||
|
||
# 載入 PowerPoint 文件
|
||
prs = pptx.Presentation(output_path)
|
||
ok_count = skip_count = 0
|
||
|
||
for slide in prs.slides:
|
||
for shape in slide.shapes:
|
||
# 處理文字框
|
||
if getattr(shape, "has_text_frame", False):
|
||
text = self._extract_text_from_frame(shape.text_frame)
|
||
if text.strip():
|
||
ok, skip = self._insert_combined_language_translation(
|
||
shape.text_frame, text, translation_map, target_languages
|
||
)
|
||
ok_count += ok
|
||
skip_count += skip
|
||
|
||
# 處理表格
|
||
elif getattr(shape, "has_table", False):
|
||
table_ok, table_skip = self._insert_combined_table_translations(
|
||
shape.table, translation_map, target_languages
|
||
)
|
||
ok_count += table_ok
|
||
skip_count += table_skip
|
||
|
||
# 處理圖表
|
||
elif getattr(shape, "has_chart", False):
|
||
chart_ok, chart_skip = self._insert_combined_chart_translations(
|
||
shape.chart, translation_map, target_languages
|
||
)
|
||
ok_count += chart_ok
|
||
skip_count += chart_skip
|
||
|
||
# 處理群組形狀
|
||
elif hasattr(shape, 'shapes'):
|
||
group_ok, group_skip = self._insert_combined_group_translations(
|
||
shape.shapes, translation_map, target_languages
|
||
)
|
||
ok_count += group_ok
|
||
skip_count += group_skip
|
||
|
||
# 處理基本形狀文字
|
||
elif hasattr(shape, 'text') and shape.text.strip():
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for lang in target_languages:
|
||
if (lang, shape.text) in translation_map:
|
||
translations.append(translation_map[(lang, shape.text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{lang}】")
|
||
|
||
if translations:
|
||
# 組合原文和所有翻譯
|
||
combined_text = shape.text + '\n' + '\n'.join(translations)
|
||
shape.text = combined_text
|
||
ok_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
# 儲存文件
|
||
prs.save(output_path)
|
||
logger.info(f"Saved combined PowerPoint file with {ok_count} translations, {skip_count} skips")
|
||
return ok_count, skip_count
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to insert combined PowerPoint translations: {str(e)}")
|
||
raise FileProcessingError(f"PowerPoint 組合翻譯插入失敗: {str(e)}")
|
||
|
||
def _insert_single_language_translation(self, text_frame, original_text: str,
|
||
translation_map: Dict[Tuple[str, str], str],
|
||
target_language: str) -> Tuple[int, int]:
|
||
"""插入單語言翻譯到文字框"""
|
||
if (target_language, original_text) not in translation_map:
|
||
return 0, 1
|
||
|
||
translated_text = translation_map[(target_language, original_text)]
|
||
|
||
# 檢查是否已存在翻譯
|
||
if self._check_existing_translations(text_frame, [translated_text]):
|
||
return 0, 1
|
||
|
||
# 清除現有內容,只保留翻譯
|
||
text_frame.clear()
|
||
para = text_frame.add_paragraph()
|
||
para.text = translated_text
|
||
|
||
# 設定格式
|
||
for run in para.runs:
|
||
run.font.italic = True
|
||
try:
|
||
from pptx.util import Pt as PPTPt
|
||
run.font.size = PPTPt(12)
|
||
except:
|
||
pass
|
||
|
||
return 1, 0
|
||
|
||
def _insert_combined_language_translation(self, text_frame, original_text: str,
|
||
translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str]) -> Tuple[int, int]:
|
||
"""插入組合語言翻譯到文字框(原文+所有譯文)"""
|
||
translations = []
|
||
for lang in target_languages:
|
||
if (lang, original_text) in translation_map:
|
||
translations.append(translation_map[(lang, original_text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{lang}】")
|
||
|
||
if not any(trans for trans in translations if not trans.startswith("【翻譯缺失")):
|
||
return 0, 1
|
||
|
||
# 檢查是否已存在翻譯
|
||
combined_translations = [original_text] + translations
|
||
if self._check_existing_translations(text_frame, combined_translations):
|
||
return 0, 1
|
||
|
||
# 添加所有翻譯
|
||
for translation in translations:
|
||
self._append_translation(text_frame, translation)
|
||
|
||
return 1, 0
|
||
|
||
def _insert_table_translations(self, table, translation_map: Dict[Tuple[str, str], str],
|
||
target_language: str) -> Tuple[int, int]:
|
||
"""插入翻譯到表格 - 單語言模式"""
|
||
ok_count = skip_count = 0
|
||
|
||
for row in table.rows:
|
||
for cell in row.cells:
|
||
cell_text = cell.text_frame.text.strip()
|
||
if not cell_text:
|
||
continue
|
||
|
||
if (target_language, cell_text) in translation_map:
|
||
translated_text = translation_map[(target_language, cell_text)]
|
||
|
||
# 替換儲存格內容為翻譯文
|
||
cell.text_frame.clear()
|
||
para = cell.text_frame.add_paragraph()
|
||
para.text = translated_text
|
||
|
||
# 設定格式
|
||
for run in para.runs:
|
||
run.font.italic = True
|
||
try:
|
||
from pptx.util import Pt as PPTPt
|
||
run.font.size = PPTPt(10)
|
||
except:
|
||
pass
|
||
|
||
ok_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
return ok_count, skip_count
|
||
|
||
def _insert_combined_table_translations(self, table, translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str]) -> Tuple[int, int]:
|
||
"""插入翻譯到表格 - 組合模式"""
|
||
ok_count = skip_count = 0
|
||
|
||
for row in table.rows:
|
||
for cell in row.cells:
|
||
cell_text = cell.text_frame.text.strip()
|
||
if not cell_text:
|
||
continue
|
||
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for lang in target_languages:
|
||
if (lang, cell_text) in translation_map:
|
||
translations.append(translation_map[(lang, cell_text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{lang}】")
|
||
|
||
if translations:
|
||
# 組合原文和所有翻譯
|
||
combined_text = cell_text + '\n' + '\n'.join(translations)
|
||
|
||
# 替換儲存格內容
|
||
cell.text_frame.clear()
|
||
para = cell.text_frame.add_paragraph()
|
||
para.text = combined_text
|
||
|
||
# 設定格式
|
||
for run in para.runs:
|
||
try:
|
||
from pptx.util import Pt as PPTPt
|
||
run.font.size = PPTPt(9)
|
||
except:
|
||
pass
|
||
|
||
ok_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
return ok_count, skip_count
|
||
|
||
def _insert_chart_translations(self, chart, translation_map: Dict[Tuple[str, str], str],
|
||
target_language: str) -> Tuple[int, int]:
|
||
"""插入翻譯到圖表 - 有限支援"""
|
||
ok_count = skip_count = 0
|
||
|
||
try:
|
||
# 處理圖表標題
|
||
if hasattr(chart, 'chart_title') and chart.chart_title.has_text_frame:
|
||
title_text = chart.chart_title.text_frame.text.strip()
|
||
if title_text and (target_language, title_text) in translation_map:
|
||
translated_title = translation_map[(target_language, title_text)]
|
||
chart.chart_title.text_frame.text = translated_title
|
||
ok_count += 1
|
||
logger.debug(f"Translated chart title: {title_text[:30]} -> {translated_title[:30]}")
|
||
else:
|
||
skip_count += 1
|
||
|
||
# 注意:python-pptx 對圖表軸標籤等的支援非常有限
|
||
logger.info(f"Chart translation: {ok_count} successful, {skip_count} skipped (limited support)")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to insert chart translations: {str(e)}")
|
||
skip_count += 1
|
||
|
||
return ok_count, skip_count
|
||
|
||
def _insert_group_translations(self, shapes, translation_map: Dict[Tuple[str, str], str],
|
||
target_language: str, slide_idx: int = 0, shape_idx: int = 0, depth: int = 0) -> Tuple[int, int]:
|
||
"""插入翻譯到群組形狀 - 支援深度嵌套,與提取邏輯保持一致"""
|
||
ok_count = skip_count = 0
|
||
max_depth = 10 # 防止無限遞歸
|
||
|
||
if depth > max_depth:
|
||
logger.warning(f"Group nesting depth exceeded {max_depth} on slide {slide_idx}, skipping deeper levels")
|
||
return ok_count, skip_count
|
||
|
||
try:
|
||
for sub_shape_idx, sub_shape in enumerate(shapes):
|
||
shape_processed = False
|
||
|
||
# 1. 優先處理嵌套群組(遞歸處理)
|
||
if hasattr(sub_shape, 'shapes') and hasattr(sub_shape, 'shape_type'):
|
||
try:
|
||
nested_ok, nested_skip = self._insert_group_translations(
|
||
sub_shape.shapes, translation_map, target_language,
|
||
slide_idx, f"{shape_idx}.{sub_shape_idx}", depth + 1
|
||
)
|
||
ok_count += nested_ok
|
||
skip_count += nested_skip
|
||
if nested_ok > 0:
|
||
shape_processed = True
|
||
logger.debug(f"Inserted {nested_ok} nested group translations at depth {depth + 1}")
|
||
except Exception as e:
|
||
logger.debug(f"Failed to process nested group at depth {depth + 1}: {str(e)}")
|
||
|
||
# 2. 處理群組內的文字框(並行處理)
|
||
if getattr(sub_shape, "has_text_frame", False):
|
||
text = self._extract_text_from_frame(sub_shape.text_frame)
|
||
if text.strip():
|
||
if (target_language, text) in translation_map:
|
||
translated_text = translation_map[(target_language, text)]
|
||
# 使用更安全的文字替換方法
|
||
try:
|
||
# 清除並重新設置文字
|
||
sub_shape.text_frame.clear()
|
||
para = sub_shape.text_frame.add_paragraph()
|
||
para.text = translated_text
|
||
ok_count += 1
|
||
shape_processed = True
|
||
logger.debug(f"Inserted group text frame translation: {text[:30]}... -> {translated_text[:30]}...")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to replace text frame content: {str(e)}")
|
||
skip_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
# 3. 處理群組內的表格(並行處理)
|
||
if getattr(sub_shape, "has_table", False):
|
||
table_ok, table_skip = self._insert_table_translations(
|
||
sub_shape.table, translation_map, target_language
|
||
)
|
||
ok_count += table_ok
|
||
skip_count += table_skip
|
||
if table_ok > 0:
|
||
shape_processed = True
|
||
|
||
# 4. 處理群組內的圖表(並行處理)
|
||
if getattr(sub_shape, "has_chart", False):
|
||
chart_ok, chart_skip = self._insert_chart_translations(
|
||
sub_shape.chart, translation_map, target_language
|
||
)
|
||
ok_count += chart_ok
|
||
skip_count += chart_skip
|
||
if chart_ok > 0:
|
||
shape_processed = True
|
||
|
||
# 5. 處理基本形狀文字(作為備選方案)
|
||
if not shape_processed and hasattr(sub_shape, 'text') and sub_shape.text.strip():
|
||
if (target_language, sub_shape.text) in translation_map:
|
||
translated_text = translation_map[(target_language, sub_shape.text)]
|
||
sub_shape.text = translated_text
|
||
ok_count += 1
|
||
logger.debug(f"Inserted basic group shape translation: {sub_shape.text[:30]}...")
|
||
shape_processed = True
|
||
else:
|
||
skip_count += 1
|
||
|
||
logger.debug(f"Group translation at depth {depth}: {ok_count} successful, {skip_count} skipped")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to insert group translations at depth {depth}: {str(e)}")
|
||
|
||
return ok_count, skip_count
|
||
|
||
def _insert_combined_chart_translations(self, chart, translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str]) -> Tuple[int, int]:
|
||
"""插入組合翻譯到圖表 - 有限支援"""
|
||
ok_count = skip_count = 0
|
||
|
||
try:
|
||
# 處理圖表標題
|
||
if hasattr(chart, 'chart_title') and chart.chart_title.has_text_frame:
|
||
title_text = chart.chart_title.text_frame.text.strip()
|
||
if title_text:
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for lang in target_languages:
|
||
if (lang, title_text) in translation_map:
|
||
translations.append(translation_map[(lang, title_text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{lang}】")
|
||
|
||
if any(trans for trans in translations if not trans.startswith("【翻譯缺失")):
|
||
# 組合原文和所有翻譯
|
||
combined_text = title_text + '\n' + '\n'.join(translations)
|
||
chart.chart_title.text_frame.text = combined_text
|
||
ok_count += 1
|
||
else:
|
||
skip_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
# 注意:python-pptx 對圖表軸標籤等的支援非常有限
|
||
logger.info(f"Combined chart translation: {ok_count} successful, {skip_count} skipped (limited support)")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to insert combined chart translations: {str(e)}")
|
||
skip_count += 1
|
||
|
||
return ok_count, skip_count
|
||
|
||
def _insert_combined_group_translations(self, shapes, translation_map: Dict[Tuple[str, str], str],
|
||
target_languages: List[str]) -> Tuple[int, int]:
|
||
"""插入組合翻譯到群組形狀"""
|
||
ok_count = skip_count = 0
|
||
|
||
try:
|
||
for sub_shape in shapes:
|
||
# 處理群組內的文字框
|
||
if getattr(sub_shape, "has_text_frame", False):
|
||
text = self._extract_text_from_frame(sub_shape.text_frame)
|
||
if text.strip():
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for lang in target_languages:
|
||
if (lang, text) in translation_map:
|
||
translations.append(translation_map[(lang, text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{lang}】")
|
||
|
||
if any(trans for trans in translations if not trans.startswith("【翻譯缺失")):
|
||
# 添加所有翻譯
|
||
for translation in translations:
|
||
self._append_translation(sub_shape.text_frame, translation)
|
||
ok_count += 1
|
||
else:
|
||
skip_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
# 處理群組內的表格
|
||
elif getattr(sub_shape, "has_table", False):
|
||
table_ok, table_skip = self._insert_combined_table_translations(
|
||
sub_shape.table, translation_map, target_languages
|
||
)
|
||
ok_count += table_ok
|
||
skip_count += table_skip
|
||
|
||
# 處理群組內的基本形狀文字
|
||
elif hasattr(sub_shape, 'text') and sub_shape.text.strip():
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for lang in target_languages:
|
||
if (lang, sub_shape.text) in translation_map:
|
||
translations.append(translation_map[(lang, sub_shape.text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{lang}】")
|
||
|
||
if translations:
|
||
# 組合原文和所有翻譯
|
||
combined_text = sub_shape.text + '\n' + '\n'.join(translations)
|
||
sub_shape.text = combined_text
|
||
ok_count += 1
|
||
else:
|
||
skip_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to insert combined group translations: {str(e)}")
|
||
|
||
return ok_count, skip_count
|
||
|
||
|
||
class TranslationService:
|
||
"""翻譯服務"""
|
||
|
||
def __init__(self):
|
||
self.dify_client = DifyClient()
|
||
self.document_processor = DocumentProcessor()
|
||
|
||
# 文件解析器映射
|
||
self.parsers = {
|
||
'.docx': DocxParser,
|
||
'.doc': DocParser, # 需要先轉換為 DOCX
|
||
'.pptx': PptxParser, # PowerPoint 簡報支援
|
||
'.xlsx': ExcelParser,
|
||
'.xls': ExcelParser, # Excel 處理器會自動處理 XLS 轉換
|
||
'.pdf': PdfParser,
|
||
# 其他格式可以稍後添加
|
||
}
|
||
|
||
def get_document_parser(self, file_path: str) -> DocumentParser:
|
||
"""取得文件解析器"""
|
||
file_ext = Path(file_path).suffix.lower()
|
||
|
||
parser_class = self.parsers.get(file_ext)
|
||
if not parser_class:
|
||
raise FileProcessingError(f"不支援的檔案格式: {file_ext}")
|
||
|
||
return parser_class(file_path)
|
||
|
||
def split_text_into_sentences(self, text: str, language: str = 'auto') -> List[str]:
|
||
"""將文字分割成句子 - 使用增強的分句邏輯"""
|
||
return self.document_processor.split_text_into_sentences(text, language)
|
||
|
||
def translate_excel_cell(self, text: str, source_language: str,
|
||
target_language: str, user_id: int = None,
|
||
job_id: int = None, conversation_id: str = None) -> Dict[str, Any]:
|
||
"""
|
||
Excel儲存格翻譯 - 整個儲存格作為一個單位翻譯,不進行切片
|
||
返回 dict 包含 translated_text 和 conversation_id
|
||
"""
|
||
if not text or not text.strip():
|
||
return {"translated_text": "", "conversation_id": conversation_id}
|
||
|
||
# 檢查快取 - 整個儲存格內容
|
||
cached_translation = TranslationCache.get_translation(text, source_language, target_language)
|
||
if cached_translation:
|
||
logger.debug(f"Excel cell cache hit: {text[:30]}...")
|
||
return {"translated_text": cached_translation, "conversation_id": conversation_id}
|
||
|
||
# 直接翻譯整個儲存格內容,不進行任何切片
|
||
try:
|
||
result = self.dify_client.translate_text(
|
||
text=text,
|
||
source_language=source_language,
|
||
target_language=target_language,
|
||
user_id=user_id,
|
||
job_id=job_id,
|
||
conversation_id=conversation_id # 傳遞 conversation_id
|
||
)
|
||
|
||
translated_text = result['translated_text']
|
||
|
||
# 儲存整個儲存格的翻譯到快取
|
||
TranslationCache.save_translation(
|
||
text, source_language, target_language, translated_text
|
||
)
|
||
|
||
return result # 返回包含 conversation_id 的完整結果
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate Excel cell: {text[:30]}... Error: {str(e)}")
|
||
# 翻譯失敗時返回失敗標記
|
||
return f"【翻譯失敗|{target_language}】{text}"
|
||
|
||
def translate_word_table_cell(self, text: str, source_language: str,
|
||
target_language: str, user_id: int = None,
|
||
job_id: int = None) -> str:
|
||
"""
|
||
Word表格儲存格翻譯 - 整個儲存格內容作為一個單位翻譯,不進行段落切片
|
||
"""
|
||
if not text or not text.strip():
|
||
return ""
|
||
|
||
# 檢查快取 - 整個儲存格內容
|
||
cached_translation = TranslationCache.get_translation(text, source_language, target_language)
|
||
if cached_translation:
|
||
logger.debug(f"Word table cell cache hit: {text[:30]}...")
|
||
return cached_translation
|
||
|
||
# 直接翻譯整個儲存格內容,不進行任何段落切片
|
||
try:
|
||
result = self.dify_client.translate_text(
|
||
text=text,
|
||
source_language=source_language,
|
||
target_language=target_language,
|
||
user_id=user_id,
|
||
job_id=job_id
|
||
)
|
||
|
||
translated_text = result['translated_text']
|
||
|
||
# 儲存整個儲存格的翻譯到快取
|
||
TranslationCache.save_translation(
|
||
text, source_language, target_language, translated_text
|
||
)
|
||
|
||
return translated_text
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate Word table cell: {text[:30]}... Error: {str(e)}")
|
||
return f"【翻譯失敗|{target_language}】{text}"
|
||
|
||
def translate_segment_with_sentences(self, text: str, source_language: str,
|
||
target_language: str, user_id: int = None,
|
||
job_id: int = None, conversation_id: str = None) -> Dict[str, Any]:
|
||
"""
|
||
按段落翻譯,模仿成功版本的 translate_block_sentencewise 邏輯
|
||
對多行文字進行逐行、逐句翻譯,並重新組合成完整段落
|
||
僅用於Word文檔,Excel請使用 translate_excel_cell
|
||
"""
|
||
if not text or not text.strip():
|
||
return ""
|
||
|
||
# 檢查快取 - 先檢查整個段落的快取
|
||
cached_whole = TranslationCache.get_translation(text, source_language, target_language)
|
||
if cached_whole:
|
||
logger.debug(f"Whole paragraph cache hit: {text[:30]}...")
|
||
return cached_whole
|
||
|
||
# 按行處理
|
||
out_lines = []
|
||
all_successful = True
|
||
current_conversation_id = conversation_id
|
||
|
||
for raw_line in text.split('\n'):
|
||
if not raw_line.strip():
|
||
out_lines.append("")
|
||
continue
|
||
|
||
# 分句處理
|
||
sentences = self.document_processor.split_text_into_sentences(raw_line, source_language)
|
||
if not sentences:
|
||
sentences = [raw_line]
|
||
|
||
translated_parts = []
|
||
for sentence in sentences:
|
||
sentence = sentence.strip()
|
||
if not sentence:
|
||
continue
|
||
|
||
# 檢查句子級快取
|
||
cached_sentence = TranslationCache.get_translation(sentence, source_language, target_language)
|
||
if cached_sentence:
|
||
translated_parts.append(cached_sentence)
|
||
continue
|
||
|
||
# 呼叫 Dify API 翻譯句子
|
||
try:
|
||
result = self.dify_client.translate_text(
|
||
text=sentence,
|
||
source_language=source_language,
|
||
target_language=target_language,
|
||
user_id=user_id,
|
||
job_id=job_id,
|
||
conversation_id=current_conversation_id
|
||
)
|
||
|
||
translated_sentence = result['translated_text']
|
||
|
||
# 更新對話ID以保持上下文連續性
|
||
if result.get('conversation_id'):
|
||
current_conversation_id = result['conversation_id']
|
||
|
||
# 儲存句子級快取
|
||
TranslationCache.save_translation(
|
||
sentence, source_language, target_language, translated_sentence
|
||
)
|
||
|
||
translated_parts.append(translated_sentence)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate sentence: {sentence[:30]}... Error: {str(e)}")
|
||
translated_parts.append(f"【翻譯失敗|{target_language}】{sentence}")
|
||
all_successful = False
|
||
|
||
# 重新組合句子為一行
|
||
out_lines.append(" ".join(translated_parts))
|
||
|
||
# 重新組合所有行
|
||
final_result = "\n".join(out_lines)
|
||
|
||
# 如果全部成功,儲存整個段落的快取
|
||
if all_successful:
|
||
TranslationCache.save_translation(text, source_language, target_language, final_result)
|
||
|
||
return {
|
||
'translated_text': final_result,
|
||
'conversation_id': current_conversation_id
|
||
}
|
||
|
||
def translate_text_with_cache(self, text: str, source_language: str,
|
||
target_language: str, user_id: int = None,
|
||
job_id: int = None, conversation_id: str = None) -> Dict[str, Any]:
|
||
"""帶快取的文字翻譯"""
|
||
|
||
# 檢查快取
|
||
cached_translation = TranslationCache.get_translation(
|
||
text, source_language, target_language
|
||
)
|
||
|
||
if cached_translation:
|
||
logger.debug(f"Cache hit for translation: {text[:50]}...")
|
||
return {
|
||
'translated_text': cached_translation,
|
||
'conversation_id': conversation_id, # 保持原有的conversation_id
|
||
'from_cache': True
|
||
}
|
||
|
||
# 呼叫 Dify API
|
||
try:
|
||
result = self.dify_client.translate_text(
|
||
text=text,
|
||
source_language=source_language,
|
||
target_language=target_language,
|
||
user_id=user_id,
|
||
job_id=job_id,
|
||
conversation_id=conversation_id
|
||
)
|
||
|
||
translated_text = result['translated_text']
|
||
new_conversation_id = result.get('conversation_id')
|
||
|
||
# 儲存到快取
|
||
TranslationCache.save_translation(
|
||
text, source_language, target_language, translated_text
|
||
)
|
||
|
||
return {
|
||
'translated_text': translated_text,
|
||
'conversation_id': new_conversation_id,
|
||
'from_cache': False
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Translation failed for text: {text[:50]}... Error: {str(e)}")
|
||
raise TranslationError(f"翻譯失敗: {str(e)}")
|
||
|
||
def translate_document(self, job_uuid: str) -> Dict[str, Any]:
|
||
"""翻譯文件(主要入口點)- 使用增強的文檔處理邏輯"""
|
||
try:
|
||
# 取得任務資訊
|
||
job = TranslationJob.query.filter_by(job_uuid=job_uuid).first()
|
||
if not job:
|
||
raise TranslationError(f"找不到任務: {job_uuid}")
|
||
|
||
logger.info(f"Starting enhanced document translation: {job_uuid}")
|
||
|
||
# 更新任務狀態
|
||
job.update_status('PROCESSING', progress=0)
|
||
|
||
# 使用增強的文檔處理器直接提取段落
|
||
file_ext = Path(job.file_path).suffix.lower()
|
||
|
||
if file_ext in ['.docx', '.doc']:
|
||
# 使用增強的 DOCX 處理邏輯
|
||
segments = self.document_processor.extract_docx_segments(job.file_path)
|
||
logger.info(f"Enhanced extraction: Found {len(segments)} segments to translate")
|
||
|
||
if not segments:
|
||
raise TranslationError("文件中未找到可翻譯的文字段落")
|
||
|
||
# 使用成功版本的翻譯邏輯 - 直接按段落翻譯,不做複雜分割
|
||
translatable_segments = []
|
||
for seg in segments:
|
||
if self.document_processor.should_translate_text(seg.text, job.source_language):
|
||
translatable_segments.append(seg)
|
||
|
||
logger.info(f"Found {len(translatable_segments)} segments to translate")
|
||
|
||
# 批次翻譯 - 直接按原始段落翻譯
|
||
translation_map = {} # 格式: (target_language, source_text) -> translated_text
|
||
total_segments = len(translatable_segments)
|
||
|
||
for target_language in job.target_languages:
|
||
logger.info(f"Translating to {target_language}")
|
||
|
||
# 每個目標語言使用獨立的對話ID以保持該語言的翻譯一致性
|
||
current_conversation_id = None
|
||
|
||
for i, seg in enumerate(translatable_segments):
|
||
try:
|
||
# 根據段落類型選擇適當的翻譯方法
|
||
if seg.kind == "table_cell":
|
||
# 表格儲存格使用整個儲存格為單位的翻譯方法
|
||
translated = self.translate_word_table_cell(
|
||
text=seg.text,
|
||
source_language=job.source_language,
|
||
target_language=target_language,
|
||
user_id=job.user_id,
|
||
job_id=job.id
|
||
)
|
||
else:
|
||
# 一般段落使用原有的句子切片方法
|
||
translation_result = self.translate_segment_with_sentences(
|
||
text=seg.text,
|
||
source_language=job.source_language,
|
||
target_language=target_language,
|
||
user_id=job.user_id,
|
||
job_id=job.id,
|
||
conversation_id=current_conversation_id
|
||
)
|
||
|
||
translated = translation_result['translated_text']
|
||
# 更新當前對話ID以保持上下文連續性
|
||
if translation_result.get('conversation_id'):
|
||
current_conversation_id = translation_result['conversation_id']
|
||
|
||
# 直接以原始段落文字為鍵儲存翻譯結果
|
||
translation_map[(target_language, seg.text)] = translated
|
||
|
||
# 更新進度
|
||
progress = (i + 1) / total_segments * 100 / len(job.target_languages)
|
||
current_lang_index = job.target_languages.index(target_language)
|
||
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
|
||
job.update_status('PROCESSING', progress=total_progress)
|
||
|
||
# 短暫延遲避免過快請求
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate segment: {seg.text[:50]}... Error: {str(e)}")
|
||
# 翻譯失敗時保留原文
|
||
translation_map[(target_language, seg.text)] = f"[翻譯失敗] {seg.text}"
|
||
|
||
# 保存該語言的對話ID到任務記錄中(用於後續重試等場景)
|
||
if current_conversation_id and not job.conversation_id:
|
||
job.conversation_id = current_conversation_id
|
||
db.session.commit()
|
||
logger.info(f"Saved conversation_id {current_conversation_id} for job {job.job_uuid}")
|
||
|
||
# 生成翻譯文件
|
||
logger.info("Generating translated documents with enhanced insertion")
|
||
output_dir = Path(job.file_path).parent
|
||
output_files = {}
|
||
|
||
for target_language in job.target_languages:
|
||
try:
|
||
# 生成輸出檔名
|
||
output_filename = generate_filename(
|
||
Path(job.file_path).name,
|
||
'translated',
|
||
'translated',
|
||
target_language
|
||
)
|
||
output_path = output_dir / output_filename
|
||
|
||
# 使用增強的翻譯插入邏輯
|
||
ok_count, skip_count = self.document_processor.insert_docx_translations(
|
||
job.file_path,
|
||
segments,
|
||
translation_map,
|
||
[target_language],
|
||
str(output_path)
|
||
)
|
||
|
||
output_files[target_language] = str(output_path)
|
||
|
||
# 記錄翻譯檔案到資料庫
|
||
file_size = Path(output_path).stat().st_size
|
||
job.add_translated_file(
|
||
language_code=target_language,
|
||
filename=Path(output_path).name,
|
||
file_path=str(output_path),
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated {target_language}: {ok_count} insertions, {skip_count} skips")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated document for {target_language}: {str(e)}")
|
||
raise TranslationError(f"生成 {target_language} 翻譯文件失敗: {str(e)}")
|
||
|
||
# 生成組合多語言檔案 - 包含所有翻譯在一個文件中
|
||
if len(job.target_languages) > 1:
|
||
try:
|
||
# 生成組合檔案的檔名
|
||
combined_filename = generate_filename(
|
||
Path(job.file_path).name,
|
||
'translated',
|
||
'combined',
|
||
'multilang'
|
||
)
|
||
combined_output_path = output_dir / combined_filename
|
||
|
||
# 使用新的組合翻譯插入方法
|
||
combined_ok_count, combined_skip_count = self.document_processor.insert_docx_combined_translations(
|
||
job.file_path,
|
||
segments,
|
||
translation_map,
|
||
job.target_languages,
|
||
str(combined_output_path)
|
||
)
|
||
|
||
output_files['combined'] = str(combined_output_path)
|
||
|
||
# 記錄組合翻譯檔案到資料庫
|
||
file_size = Path(combined_output_path).stat().st_size
|
||
job.add_translated_file(
|
||
language_code='combined',
|
||
filename=Path(combined_output_path).name,
|
||
file_path=str(combined_output_path),
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated combined multi-language file: {combined_ok_count} insertions, {combined_skip_count} skips")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined multi-language document: {str(e)}")
|
||
# 不要因為組合檔案失敗而讓整個任務失敗,只記錄警告
|
||
logger.warning("Combined multi-language file generation failed, but individual files were successful")
|
||
|
||
elif file_ext in ['.xlsx', '.xls']:
|
||
# Excel 文件使用儲存格為單位的翻譯邏輯
|
||
logger.info(f"Using cell-based processing for Excel files")
|
||
parser = self.get_document_parser(job.file_path)
|
||
|
||
# 提取儲存格文字內容(不進行句子切片)
|
||
cell_segments = parser.extract_text_segments()
|
||
|
||
if not cell_segments:
|
||
raise TranslationError("Excel 文件中未找到可翻譯的文字")
|
||
|
||
logger.info(f"Found {len(cell_segments)} cell segments to translate")
|
||
|
||
# 批次翻譯 - 使用儲存格為單位的翻譯方法
|
||
translation_results = {}
|
||
total_segments = len(cell_segments)
|
||
|
||
for target_language in job.target_languages:
|
||
logger.info(f"Translating Excel cells to {target_language}")
|
||
translated_cells = []
|
||
current_conversation_id = job.conversation_id # 維持上下文連貫性
|
||
|
||
for i, cell_text in enumerate(cell_segments):
|
||
try:
|
||
# 使用新的儲存格翻譯方法(整個儲存格作為單位)
|
||
translated = self.translate_excel_cell(
|
||
text=cell_text,
|
||
source_language=job.source_language,
|
||
target_language=target_language,
|
||
user_id=job.user_id,
|
||
job_id=job.id,
|
||
conversation_id=current_conversation_id # 傳遞 conversation_id
|
||
)
|
||
# 提取翻譯文字(translate_excel_cell 現在返回 dict)
|
||
translated_text = translated["translated_text"] if isinstance(translated, dict) else translated
|
||
translated_cells.append(translated_text)
|
||
|
||
# 更新 conversation_id 以維持連續對話上下文
|
||
if isinstance(translated, dict) and translated.get("conversation_id"):
|
||
current_conversation_id = translated["conversation_id"]
|
||
|
||
# 更新進度
|
||
progress = (i + 1) / total_segments * 100 / len(job.target_languages)
|
||
current_lang_index = job.target_languages.index(target_language)
|
||
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
|
||
job.update_status('PROCESSING', progress=total_progress)
|
||
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate Excel cell: {cell_text[:50]}... Error: {str(e)}")
|
||
translated_cells.append(f"[翻譯失敗] {cell_text}")
|
||
|
||
translation_results[target_language] = translated_cells
|
||
|
||
# 生成翻譯文件
|
||
output_dir = Path(job.file_path).parent
|
||
output_files = {}
|
||
|
||
for target_language, translations in translation_results.items():
|
||
translation_mapping = {target_language: translations}
|
||
|
||
output_file = parser.generate_translated_document(
|
||
translations=translation_mapping,
|
||
target_language=target_language,
|
||
output_dir=output_dir
|
||
)
|
||
|
||
output_files[target_language] = output_file
|
||
|
||
file_size = Path(output_file).stat().st_size
|
||
job.add_translated_file(
|
||
language_code=target_language,
|
||
filename=Path(output_file).name,
|
||
file_path=output_file,
|
||
file_size=file_size
|
||
)
|
||
|
||
# 生成組合多語言Excel檔案
|
||
if len(job.target_languages) > 1:
|
||
try:
|
||
# 生成組合檔案的檔名
|
||
combined_filename = generate_filename(
|
||
Path(job.file_path).name,
|
||
'translated',
|
||
'combined',
|
||
'multilang'
|
||
)
|
||
combined_output_path = output_dir / combined_filename
|
||
|
||
# 為Excel組合檔案建立翻譯映射
|
||
combined_translation_mapping = {}
|
||
for lang in job.target_languages:
|
||
combined_translation_mapping[lang] = translation_results[lang]
|
||
|
||
# 使用修改過的generate_combined_excel_document方法
|
||
combined_output_file = self._generate_combined_excel_document(
|
||
parser,
|
||
combined_translation_mapping,
|
||
job.target_languages,
|
||
combined_output_path
|
||
)
|
||
|
||
output_files['combined'] = combined_output_file
|
||
|
||
# 記錄組合翻譯檔案到資料庫
|
||
file_size = Path(combined_output_file).stat().st_size
|
||
job.add_translated_file(
|
||
language_code='combined',
|
||
filename=Path(combined_output_file).name,
|
||
file_path=combined_output_file,
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated combined multi-language Excel file")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined multi-language Excel document: {str(e)}")
|
||
logger.warning("Combined multi-language Excel file generation failed, but individual files were successful")
|
||
|
||
elif file_ext == '.pptx':
|
||
# PowerPoint 文件使用增強的處理邏輯,仿照 DOCX 處理方式
|
||
logger.info(f"Using enhanced PowerPoint processing for {job_uuid}")
|
||
parser = self.get_document_parser(job.file_path)
|
||
|
||
# 提取文字段落和表格內容
|
||
text_segments = parser.extract_text_segments()
|
||
|
||
if not text_segments:
|
||
raise TranslationError("PowerPoint 文件中未找到可翻譯的文字")
|
||
|
||
logger.info(f"Found {len(text_segments)} PowerPoint text segments to translate")
|
||
|
||
# 批次翻譯 - 建立翻譯映射
|
||
translation_map = {} # 格式: (target_language, source_text) -> translated_text
|
||
total_segments = len(text_segments)
|
||
|
||
for target_language in job.target_languages:
|
||
logger.info(f"Translating PowerPoint segments to {target_language}")
|
||
translated_segments = []
|
||
current_conversation_id = job.conversation_id # 維持上下文連貫性
|
||
|
||
for i, segment_text in enumerate(text_segments):
|
||
try:
|
||
# 對於 PowerPoint 文字框和表格,使用段落級別的翻譯
|
||
translated = self.translate_segment_with_sentences(
|
||
text=segment_text,
|
||
source_language=job.source_language,
|
||
target_language=target_language,
|
||
user_id=job.user_id,
|
||
job_id=job.id,
|
||
conversation_id=current_conversation_id # 傳遞 conversation_id
|
||
)
|
||
|
||
# 使用與 DOCX 相同的格式儲存翻譯結果
|
||
translation_map[(target_language, segment_text)] = translated
|
||
|
||
# 更新 conversation_id 以維持連續對話上下文
|
||
if isinstance(translated, dict) and translated.get("conversation_id"):
|
||
current_conversation_id = translated["conversation_id"]
|
||
|
||
# 更新進度
|
||
progress = (i + 1) / total_segments * 100 / len(job.target_languages)
|
||
current_lang_index = job.target_languages.index(target_language)
|
||
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
|
||
job.update_status('PROCESSING', progress=total_progress)
|
||
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate PowerPoint segment: {segment_text[:50]}... Error: {str(e)}")
|
||
# 翻譯失敗時保留原文
|
||
translation_map[(target_language, segment_text)] = f"[翻譯失敗] {segment_text}"
|
||
|
||
# 生成翻譯文件 - 仿照 DOCX 的方式
|
||
logger.info("Generating translated PowerPoint documents with enhanced insertion")
|
||
output_dir = Path(job.file_path).parent
|
||
output_files = {}
|
||
|
||
# 生成單語言文件
|
||
for target_language in job.target_languages:
|
||
try:
|
||
# 生成輸出檔名
|
||
output_filename = generate_filename(
|
||
Path(job.file_path).name,
|
||
'translated',
|
||
'translated',
|
||
target_language
|
||
)
|
||
output_path = output_dir / output_filename
|
||
|
||
# 使用增強的翻譯插入邏輯
|
||
ok_count, skip_count = parser.insert_pptx_translations(
|
||
translation_map,
|
||
[target_language],
|
||
str(output_path)
|
||
)
|
||
|
||
output_files[target_language] = str(output_path)
|
||
|
||
# 記錄翻譯檔案到資料庫
|
||
file_size = Path(output_path).stat().st_size
|
||
job.add_translated_file(
|
||
language_code=target_language,
|
||
filename=Path(output_path).name,
|
||
file_path=str(output_path),
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated {target_language}: {ok_count} insertions, {skip_count} skips")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate translated PowerPoint document for {target_language}: {str(e)}")
|
||
raise TranslationError(f"生成 {target_language} PowerPoint 翻譯文件失敗: {str(e)}")
|
||
|
||
# 生成組合多語言檔案 - 包含所有翻譯在一個文件中
|
||
if len(job.target_languages) > 1:
|
||
try:
|
||
# 生成組合檔案的檔名
|
||
combined_filename = generate_filename(
|
||
Path(job.file_path).name,
|
||
'translated',
|
||
'combined',
|
||
'multilang'
|
||
)
|
||
combined_output_path = output_dir / combined_filename
|
||
|
||
# 使用組合翻譯插入方法
|
||
combined_ok_count, combined_skip_count = parser.insert_pptx_combined_translations(
|
||
translation_map,
|
||
job.target_languages,
|
||
str(combined_output_path)
|
||
)
|
||
|
||
output_files['combined'] = str(combined_output_path)
|
||
|
||
# 記錄組合翻譯檔案到資料庫
|
||
file_size = Path(combined_output_path).stat().st_size
|
||
job.add_translated_file(
|
||
language_code='combined',
|
||
filename=Path(combined_output_path).name,
|
||
file_path=str(combined_output_path),
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated combined multi-language PowerPoint file: {combined_ok_count} insertions, {combined_skip_count} skips")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined multi-language PowerPoint document: {str(e)}")
|
||
# 不要因為組合檔案失敗而讓整個任務失敗,只記錄警告
|
||
logger.warning("Combined multi-language PowerPoint file generation failed, but individual files were successful")
|
||
|
||
elif file_ext == '.pdf':
|
||
# PDF 文件使用增強的OCR處理邏輯(避免重複OCR)
|
||
logger.info(f"Using enhanced PDF processing for {job_uuid}")
|
||
|
||
from app.services.enhanced_pdf_parser import EnhancedPdfParser
|
||
enhanced_parser = EnhancedPdfParser(job.file_path)
|
||
|
||
# 提取文字片段(會使用OCR快取避免重複處理)
|
||
text_segments = enhanced_parser.extract_text_segments(user_id=job.user_id, job_id=job.id)
|
||
|
||
if not text_segments:
|
||
raise TranslationError("PDF文件中未找到可翻譯的文字")
|
||
|
||
logger.info(f"Found {len(text_segments)} PDF text segments to translate")
|
||
|
||
# 批次翻譯PDF文字段落
|
||
translation_results = {}
|
||
total_segments = len(text_segments)
|
||
|
||
for target_language in job.target_languages:
|
||
logger.info(f"Translating PDF segments to {target_language}")
|
||
translated_segments = []
|
||
current_conversation_id = job.conversation_id # 維持上下文連貫性
|
||
|
||
for i, segment_text in enumerate(text_segments):
|
||
try:
|
||
# 對於PDF段落,使用段落級別的翻譯(保留段落結構)
|
||
translated = self.translate_segment_with_sentences(
|
||
text=segment_text,
|
||
source_language=job.source_language,
|
||
target_language=target_language,
|
||
user_id=job.user_id,
|
||
job_id=job.id,
|
||
conversation_id=current_conversation_id # 傳遞 conversation_id
|
||
)
|
||
# 提取翻譯文字(translate_segment_with_sentences 返回 dict)
|
||
translated_text = translated['translated_text'] if isinstance(translated, dict) else translated
|
||
translated_segments.append(translated_text)
|
||
|
||
# 更新 conversation_id 以維持連續對話上下文
|
||
if isinstance(translated, dict) and translated.get('conversation_id'):
|
||
current_conversation_id = translated['conversation_id']
|
||
|
||
# 更新進度
|
||
progress = (i + 1) / total_segments * 100 / len(job.target_languages)
|
||
current_lang_index = job.target_languages.index(target_language)
|
||
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
|
||
job.update_status('PROCESSING', progress=total_progress)
|
||
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate PDF segment: {segment_text[:50]}... Error: {str(e)}")
|
||
translated_segments.append(f"[翻譯失敗] {segment_text}")
|
||
|
||
translation_results[target_language] = translated_segments
|
||
|
||
# 生成翻譯Word文件
|
||
logger.info("Generating translated Word documents from PDF")
|
||
output_dir = Path(job.file_path).parent
|
||
output_files = {}
|
||
|
||
for target_language, translations in translation_results.items():
|
||
try:
|
||
# 使用增強PDF解析器生成Word文檔
|
||
output_file = enhanced_parser.generate_translated_document(
|
||
translations={target_language: translations},
|
||
target_language=target_language,
|
||
output_dir=output_dir
|
||
)
|
||
|
||
output_files[target_language] = output_file
|
||
|
||
# 記錄翻譯檔案到資料庫
|
||
file_size = Path(output_file).stat().st_size
|
||
job.add_translated_file(
|
||
language_code=target_language,
|
||
filename=Path(output_file).name,
|
||
file_path=output_file,
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated PDF translation for {target_language}: {output_file}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate PDF translated document for {target_language}: {str(e)}")
|
||
raise TranslationError(f"生成PDF {target_language} 翻譯文件失敗: {str(e)}")
|
||
|
||
# 生成組合多語言文檔 - 譯文1/譯文2格式(當有多個目標語言時)
|
||
if len(job.target_languages) > 1:
|
||
try:
|
||
logger.info("Generating combined multi-language PDF document")
|
||
combined_output_file = enhanced_parser.generate_combined_translated_document(
|
||
all_translations=translation_results,
|
||
target_languages=job.target_languages,
|
||
output_dir=output_dir
|
||
)
|
||
|
||
output_files['combined'] = combined_output_file
|
||
|
||
# 記錄組合翻譯檔案到資料庫
|
||
file_size = Path(combined_output_file).stat().st_size
|
||
job.add_translated_file(
|
||
language_code='combined',
|
||
filename=Path(combined_output_file).name,
|
||
file_path=combined_output_file,
|
||
file_size=file_size
|
||
)
|
||
|
||
logger.info(f"Generated combined multi-language PDF file: {combined_output_file}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined multi-language PDF document: {str(e)}")
|
||
# 不要因為組合檔案失敗而讓整個任務失敗,只記錄警告
|
||
logger.warning("Combined multi-language PDF file generation failed, but individual files were successful")
|
||
|
||
else:
|
||
# 對於其他文件格式,使用原有邏輯
|
||
logger.info(f"Using legacy sentence-based processing for {file_ext} files")
|
||
parser = self.get_document_parser(job.file_path)
|
||
|
||
# 提取文字片段 - 对PDF传递user_id和job_id以支持OCR
|
||
if file_ext == '.pdf':
|
||
text_segments = parser.extract_text_segments(user_id=job.user_id, job_id=job.id)
|
||
else:
|
||
text_segments = parser.extract_text_segments()
|
||
|
||
if not text_segments:
|
||
raise TranslationError("文件中未找到可翻譯的文字")
|
||
|
||
# 分割成句子
|
||
all_sentences = []
|
||
for segment in text_segments:
|
||
sentences = self.split_text_into_sentences(segment, job.source_language)
|
||
all_sentences.extend(sentences)
|
||
|
||
# 去重複
|
||
unique_sentences = list(dict.fromkeys(all_sentences))
|
||
logger.info(f"Found {len(unique_sentences)} unique sentences to translate")
|
||
|
||
# 批次翻譯
|
||
translation_results = {}
|
||
total_sentences = len(unique_sentences)
|
||
|
||
for target_language in job.target_languages:
|
||
logger.info(f"Translating to {target_language}")
|
||
translated_sentences = []
|
||
current_conversation_id = job.conversation_id # 維持上下文連貫性
|
||
|
||
for i, sentence in enumerate(unique_sentences):
|
||
try:
|
||
translation_result = self.translate_text_with_cache(
|
||
text=sentence,
|
||
source_language=job.source_language,
|
||
target_language=target_language,
|
||
user_id=job.user_id,
|
||
job_id=job.id,
|
||
conversation_id=current_conversation_id # 傳遞 conversation_id
|
||
)
|
||
translated_sentences.append(translation_result['translated_text'])
|
||
|
||
# 更新 conversation_id 以維持連續對話上下文
|
||
if translation_result.get("conversation_id"):
|
||
current_conversation_id = translation_result["conversation_id"]
|
||
|
||
# 更新進度
|
||
progress = (i + 1) / total_sentences * 100 / len(job.target_languages)
|
||
current_lang_index = job.target_languages.index(target_language)
|
||
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
|
||
job.update_status('PROCESSING', progress=total_progress)
|
||
|
||
time.sleep(0.1)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to translate sentence: {sentence[:50]}... Error: {str(e)}")
|
||
translated_sentences.append(f"[翻譯失敗] {sentence}")
|
||
|
||
translation_results[target_language] = translated_sentences
|
||
|
||
# 生成翻譯文件
|
||
output_dir = Path(job.file_path).parent
|
||
output_files = {}
|
||
|
||
for target_language, translations in translation_results.items():
|
||
translation_mapping = {target_language: translations}
|
||
|
||
output_file = parser.generate_translated_document(
|
||
translations=translation_mapping,
|
||
target_language=target_language,
|
||
output_dir=output_dir
|
||
)
|
||
|
||
output_files[target_language] = output_file
|
||
|
||
file_size = Path(output_file).stat().st_size
|
||
job.add_translated_file(
|
||
language_code=target_language,
|
||
filename=Path(output_file).name,
|
||
file_path=output_file,
|
||
file_size=file_size
|
||
)
|
||
|
||
# 計算總成本
|
||
total_cost = self._calculate_job_cost(job.id)
|
||
|
||
# 更新任務狀態為完成
|
||
job.update_status('COMPLETED', progress=100)
|
||
job.total_cost = total_cost
|
||
# 計算實際使用的 token 數(從 API 使用統計中獲取)
|
||
from sqlalchemy import func
|
||
from app.models.stats import APIUsageStats
|
||
from app import db
|
||
|
||
actual_tokens = db.session.query(
|
||
func.sum(APIUsageStats.total_tokens)
|
||
).filter_by(job_id=job.id).scalar()
|
||
|
||
job.total_tokens = int(actual_tokens) if actual_tokens else 0
|
||
|
||
db.session.commit()
|
||
|
||
logger.info(f"Enhanced document translation completed: {job_uuid}")
|
||
|
||
return {
|
||
'success': True,
|
||
'job_uuid': job_uuid,
|
||
'output_files': output_files,
|
||
'total_sentences': len(texts_to_translate) if 'texts_to_translate' in locals() else len(unique_sentences) if 'unique_sentences' in locals() else 0,
|
||
'total_cost': float(total_cost),
|
||
'target_languages': job.target_languages
|
||
}
|
||
|
||
except TranslationError:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Enhanced document translation failed: {job_uuid}. Error: {str(e)}")
|
||
raise TranslationError(f"文件翻譯失敗: {str(e)}")
|
||
|
||
def _calculate_job_cost(self, job_id: int) -> float:
|
||
"""計算任務總成本"""
|
||
from app import db
|
||
from sqlalchemy import func
|
||
from app.models.stats import APIUsageStats
|
||
|
||
total_cost = db.session.query(
|
||
func.sum(APIUsageStats.cost)
|
||
).filter_by(job_id=job_id).scalar()
|
||
|
||
return float(total_cost) if total_cost else 0.0
|
||
|
||
def _generate_combined_excel_document(self, parser, translation_mapping: Dict[str, List[str]],
|
||
target_languages: List[str], output_path: Path) -> str:
|
||
"""生成包含所有翻譯語言的組合Excel檔案"""
|
||
try:
|
||
import openpyxl
|
||
from openpyxl.styles import Alignment, Font
|
||
from sqlalchemy import text as sql_text
|
||
from app import db
|
||
|
||
# 載入原始工作簿
|
||
wb = openpyxl.load_workbook(str(parser.file_path), data_only=False)
|
||
try:
|
||
wb_vals = openpyxl.load_workbook(str(parser.file_path), data_only=True)
|
||
except Exception:
|
||
wb_vals = None
|
||
|
||
# 取得原始文字段落以建立翻譯映射
|
||
original_segments = parser.extract_text_segments()
|
||
combined_tmap = {}
|
||
|
||
logger.info(f"Building combined translation map for {len(original_segments)} segments")
|
||
|
||
for original_text in original_segments:
|
||
# 從翻譯快取中查詢所有語言的翻譯
|
||
for target_lang in target_languages:
|
||
result = db.session.execute(sql_text("""
|
||
SELECT translated_text
|
||
FROM dt_translation_cache
|
||
WHERE source_text = :text AND target_language = :lang
|
||
ORDER BY created_at ASC
|
||
LIMIT 1
|
||
"""), {'text': original_text, 'lang': target_lang})
|
||
|
||
row = result.fetchone()
|
||
if row and row[0]:
|
||
combined_tmap[(target_lang, original_text)] = row[0]
|
||
|
||
logger.info(f"Built combined translation map with {len(combined_tmap)} mappings")
|
||
|
||
# 處理每個工作表,插入組合翻譯
|
||
for ws in wb.worksheets:
|
||
logger.info(f"Processing combined worksheet: {ws.title}")
|
||
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
|
||
max_row, max_col = ws.max_row, ws.max_column
|
||
|
||
for r in range(1, max_row + 1):
|
||
for c in range(1, max_col + 1):
|
||
cell = ws.cell(row=r, column=c)
|
||
src_text = parser._get_display_text_for_translation(ws, ws_vals, r, c)
|
||
|
||
if not src_text or not parser._should_translate(src_text, 'auto'):
|
||
continue
|
||
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for target_lang in target_languages:
|
||
if (target_lang, src_text) in combined_tmap:
|
||
translations.append(combined_tmap[(target_lang, src_text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{target_lang}】")
|
||
|
||
# 組合翻譯文字:原文\n英文\n越南文
|
||
if translations:
|
||
combined_text = src_text + '\n' + '\n'.join(translations)
|
||
|
||
# 設置儲存格值
|
||
cell.value = combined_text
|
||
cell.alignment = Alignment(wrap_text=True, vertical='top')
|
||
cell.font = Font(size=10)
|
||
|
||
# 儲存組合檔案
|
||
wb.save(str(output_path))
|
||
|
||
logger.info(f"Generated combined Excel file: {output_path}")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined Excel document: {str(e)}")
|
||
raise FileProcessingError(f"組合 Excel 檔案生成失敗: {str(e)}")
|
||
|
||
def _generate_combined_pptx_document(self, parser, translation_results: Dict[str, List[str]],
|
||
target_languages: List[str], output_path: Path) -> str:
|
||
"""生成包含所有翻譯語言的組合PowerPoint檔案"""
|
||
try:
|
||
import pptx
|
||
from sqlalchemy import text as sql_text
|
||
from app import db
|
||
|
||
# 載入原始 PowerPoint 文件
|
||
prs = pptx.Presentation(str(parser.file_path))
|
||
|
||
# 收集所有文字框和原始文字
|
||
text_frames_data = []
|
||
for slide in prs.slides:
|
||
for shape in slide.shapes:
|
||
if getattr(shape, "has_text_frame", False):
|
||
text = parser._extract_text_from_frame(shape.text_frame)
|
||
if text.strip():
|
||
text_frames_data.append((shape.text_frame, text))
|
||
|
||
# 建立組合翻譯映射 - 從快取讀取所有語言的翻譯
|
||
combined_translation_map = {}
|
||
logger.info(f"Building combined PowerPoint translation map for {len(text_frames_data)} text frames")
|
||
|
||
for text_frame, original_text in text_frames_data:
|
||
# 從翻譯快取中查詢所有語言的翻譯
|
||
for target_lang in target_languages:
|
||
result = db.session.execute(sql_text("""
|
||
SELECT translated_text
|
||
FROM dt_translation_cache
|
||
WHERE source_text = :text AND target_language = :lang
|
||
ORDER BY created_at ASC
|
||
LIMIT 1
|
||
"""), {'text': original_text, 'lang': target_lang})
|
||
|
||
row = result.fetchone()
|
||
if row and row[0]:
|
||
combined_translation_map[(target_lang, original_text)] = row[0]
|
||
|
||
logger.info(f"Built combined PowerPoint translation map with {len(combined_translation_map)} mappings")
|
||
|
||
# 處理每個文字框,插入組合翻譯
|
||
ok_count = skip_count = 0
|
||
|
||
for text_frame, original_text in text_frames_data:
|
||
# 收集所有語言的翻譯
|
||
translations = []
|
||
for target_lang in target_languages:
|
||
if (target_lang, original_text) in combined_translation_map:
|
||
translations.append(combined_translation_map[(target_lang, original_text)])
|
||
else:
|
||
translations.append(f"【翻譯缺失|{target_lang}】")
|
||
|
||
# 檢查是否已存在翻譯
|
||
if parser._check_existing_translations(text_frame, translations):
|
||
skip_count += 1
|
||
continue
|
||
|
||
# 添加所有語言的翻譯
|
||
for translation in translations:
|
||
parser._append_translation(text_frame, translation)
|
||
|
||
ok_count += 1
|
||
|
||
# 儲存組合檔案
|
||
prs.save(str(output_path))
|
||
|
||
logger.info(f"Generated combined PowerPoint file: {output_path} with {ok_count} frames, {skip_count} skips")
|
||
return str(output_path)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate combined PowerPoint document: {str(e)}")
|
||
raise FileProcessingError(f"組合 PowerPoint 檔案生成失敗: {str(e)}") |