Files
Document_Translator/app/services/translation_service.py
2025-09-04 10:21:16 +08:00

1229 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
翻譯服務
Author: PANJIT IT Team
Created: 2024-01-28
Modified: 2024-01-28
"""
import hashlib
import time
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from app.utils.logger import get_logger
from app.utils.exceptions import TranslationError, FileProcessingError
from app.services.dify_client import DifyClient
from app.services.document_processor import DocumentProcessor, Segment
from app.models.cache import TranslationCache
from app.models.job import TranslationJob
from app.utils.helpers import generate_filename, create_job_directory
logger = get_logger(__name__)
class DocumentParser:
"""文件解析器基類"""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
if not self.file_path.exists():
raise FileProcessingError(f"檔案不存在: {file_path}")
def extract_text_segments(self) -> List[str]:
"""提取文字片段"""
raise NotImplementedError
def generate_translated_document(self, translations: Dict[str, List[str]],
target_language: str, output_dir: Path) -> str:
"""生成翻譯後的文件"""
raise NotImplementedError
class DocxParser(DocumentParser):
"""DOCX 文件解析器 - 使用增強的 DocumentProcessor"""
def __init__(self, file_path: str):
super().__init__(file_path)
self.processor = DocumentProcessor()
def extract_text_segments(self) -> List[str]:
"""提取 DOCX 文件的文字片段 - 使用增強邏輯"""
try:
# 使用新的文檔處理器提取段落
segments = self.processor.extract_docx_segments(str(self.file_path))
# 轉換為文字列表
text_segments = []
for seg in segments:
if seg.text.strip() and len(seg.text.strip()) > 3:
text_segments.append(seg.text)
logger.info(f"Enhanced extraction: {len(text_segments)} text segments from DOCX")
return text_segments
except Exception as e:
logger.error(f"Failed to extract text from DOCX: {str(e)}")
raise FileProcessingError(f"DOCX 文件解析失敗: {str(e)}")
def extract_segments_with_context(self) -> List[Segment]:
"""提取帶上下文的段落資訊"""
return self.processor.extract_docx_segments(str(self.file_path))
def generate_translated_document(self, translations: Dict[str, List[str]],
target_language: str, output_dir: Path) -> str:
"""生成翻譯後的 DOCX 文件 - 使用增強的翻譯插入邏輯(從快取讀取)"""
try:
from sqlalchemy import text as sql_text
from app import db
# 生成輸出檔名
output_filename = generate_filename(
self.file_path.name,
'translated',
'translated',
target_language
)
output_path = output_dir / output_filename
# 提取段落資訊
segments = self.extract_segments_with_context()
# 建立翻譯映射 - 從快取讀取而非使用傳入的translations參數
translation_map = {}
logger.info(f"Building translation map for {len(segments)} segments in language {target_language}")
for seg in segments:
# 從翻譯快取中查詢每個段落的翻譯
result = db.session.execute(sql_text("""
SELECT translated_text
FROM dt_translation_cache
WHERE source_text = :text AND target_language = :lang
ORDER BY created_at DESC
LIMIT 1
"""), {'text': seg.text, 'lang': target_language})
row = result.fetchone()
if row and row[0]:
translation_map[(target_language, seg.text)] = row[0]
logger.debug(f"Found translation for: {seg.text[:50]}...")
else:
logger.warning(f"No translation found for: {seg.text[:50]}...")
logger.info(f"Translation map built with {len(translation_map)} mappings")
# 使用增強的翻譯插入邏輯
ok_count, skip_count = self.processor.insert_docx_translations(
str(self.file_path),
segments,
translation_map,
[target_language],
str(output_path)
)
logger.info(f"Enhanced translation: Generated {output_path} with {ok_count} insertions, {skip_count} skips")
return str(output_path)
except Exception as e:
logger.error(f"Failed to generate translated DOCX: {str(e)}")
raise FileProcessingError(f"生成翻譯 DOCX 失敗: {str(e)}")
class DocParser(DocumentParser):
"""DOC 文件解析器 - 需要先轉換為 DOCX"""
def extract_text_segments(self) -> List[str]:
"""提取 DOC 文件的文字片段 - 先轉換為 DOCX 再處理"""
try:
# 檢查是否有 Word COM 支援
import tempfile
import os
try:
import win32com.client as win32
import pythoncom
_WIN32COM_AVAILABLE = True
except ImportError:
_WIN32COM_AVAILABLE = False
if not _WIN32COM_AVAILABLE:
raise FileProcessingError("DOC 格式需要 Word COM 支援,請先手動轉換為 DOCX 格式或安裝 Microsoft Office")
# 創建臨時 DOCX 文件
temp_docx = None
try:
with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp:
temp_docx = tmp.name
# 使用 Word COM 轉換 DOC 到 DOCX (格式 16)
self._word_convert(str(self.file_path), temp_docx, 16)
# 使用 DOCX 解析器處理轉換後的文件
docx_parser = DocxParser(temp_docx)
segments = docx_parser.extract_text_segments()
logger.info(f"Converted DOC to DOCX and extracted {len(segments)} segments")
return segments
finally:
# 清理臨時文件
if temp_docx and os.path.exists(temp_docx):
try:
os.remove(temp_docx)
except Exception:
pass
except Exception as e:
logger.error(f"Failed to extract text from DOC file: {str(e)}")
raise FileProcessingError(f"DOC 文件解析失敗: {str(e)}")
def _word_convert(self, input_path: str, output_path: str, target_format: int):
"""使用 Word COM 轉換文件格式(移植自參考檔案)"""
try:
import win32com.client as win32
import pythoncom
pythoncom.CoInitialize()
try:
word = win32.Dispatch("Word.Application")
word.Visible = False
doc = word.Documents.Open(os.path.abspath(input_path))
doc.SaveAs2(os.path.abspath(output_path), FileFormat=target_format)
doc.Close(False)
finally:
word.Quit()
pythoncom.CoUninitialize()
except Exception as e:
raise FileProcessingError(f"Word COM 轉換失敗: {str(e)}")
def generate_translated_document(self, translations: Dict[str, List[str]],
target_language: str, output_dir: Path) -> str:
"""生成翻譯後的 DOC 文件 - 先轉為 DOCX 處理後輸出為 DOCX"""
try:
import tempfile
import os
# 先轉換為 DOCX然後使用 DOCX 處理邏輯
temp_docx = None
try:
with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp:
temp_docx = tmp.name
# 轉換 DOC 到 DOCX
self._word_convert(str(self.file_path), temp_docx, 16)
# 使用 DOCX 解析器生成翻譯文檔
docx_parser = DocxParser(temp_docx)
# 注意:最終輸出為 DOCX 格式,因為 DOC 格式較難直接處理
output_filename = f"{self.file_path.stem}_{target_language}_translated.docx"
output_path = output_dir / output_filename
result_path = docx_parser.generate_translated_document(translations, target_language, output_dir)
logger.info(f"Generated translated DOC file (as DOCX): {result_path}")
return result_path
finally:
# 清理臨時文件
if temp_docx and os.path.exists(temp_docx):
try:
os.remove(temp_docx)
except Exception:
pass
except Exception as e:
logger.error(f"Failed to generate translated DOC file: {str(e)}")
raise FileProcessingError(f"DOC 翻譯檔生成失敗: {str(e)}")
class ExcelParser(DocumentParser):
"""Excel 文件解析器XLSX/XLS- 移植自參考檔案"""
def extract_text_segments(self) -> List[str]:
"""提取 Excel 文件的文字片段"""
try:
import openpyxl
from openpyxl.utils.exceptions import InvalidFileException
# 載入工作簿(移植自參考檔案邏輯)
try:
wb = openpyxl.load_workbook(str(self.file_path), data_only=False)
wb_vals = openpyxl.load_workbook(str(self.file_path), data_only=True)
except InvalidFileException:
if self.file_path.suffix.lower() == '.xls':
raise FileProcessingError("XLS 格式需要先轉換為 XLSX 格式")
raise
except Exception:
wb_vals = None
# 提取文字段落(完全按照參考檔案的邏輯)
segs = []
for ws in wb.worksheets:
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
max_row, max_col = ws.max_row, ws.max_column
for r in range(1, max_row + 1):
for c in range(1, max_col + 1):
src_text = self._get_display_text_for_translation(ws, ws_vals, r, c)
if not src_text:
continue
if not self._should_translate(src_text, 'auto'):
continue
segs.append(src_text)
# 去重保持順序
unique_segments = []
seen = set()
for seg in segs:
if seg not in seen:
unique_segments.append(seg)
seen.add(seg)
logger.info(f"Extracted {len(unique_segments)} unique text segments from Excel file")
return unique_segments
except Exception as e:
logger.error(f"Failed to extract text from Excel file: {str(e)}")
raise FileProcessingError(f"Excel 文件解析失敗: {str(e)}")
def _get_display_text_for_translation(self, ws, ws_vals, r: int, c: int) -> Optional[str]:
"""取得儲存格用於翻譯的顯示文字(完全移植自參考檔案)"""
val = ws.cell(row=r, column=c).value
if isinstance(val, str) and val.startswith("="):
if ws_vals is not None:
shown = ws_vals.cell(row=r, column=c).value
return shown if isinstance(shown, str) and shown.strip() else None
return None
if isinstance(val, str) and val.strip():
return val
if ws_vals is not None:
shown = ws_vals.cell(row=r, column=c).value
if isinstance(shown, str) and shown.strip():
return shown
return None
def _should_translate(self, text: str, src_lang: str) -> bool:
"""判斷文字是否需要翻譯(只要有字就翻譯)"""
text = text.strip()
# 只要有字就翻譯 - 最小長度設為1
if len(text) < 1:
return False
# Skip pure numbers, dates, etc.
import re
if re.match(r'^[\d\s\.\-\:\/]+$', text):
return False
# For auto-detect, translate if has CJK or meaningful text
if src_lang.lower() in ('auto', 'auto-detect'):
return self._has_cjk(text) or len(text) > 5
return True
def _has_cjk(self, text: str) -> bool:
"""檢查是否包含中日韓文字(移植自參考檔案)"""
for char in text:
if '\u4e00' <= char <= '\u9fff' or \
'\u3400' <= char <= '\u4dbf' or \
'\u20000' <= char <= '\u2a6df' or \
'\u3040' <= char <= '\u309f' or \
'\u30a0' <= char <= '\u30ff' or \
'\uac00' <= char <= '\ud7af':
return True
return False
def generate_translated_document(self, translations: Dict[str, List[str]],
target_language: str, output_dir: Path) -> str:
"""生成翻譯後的 Excel 文件(使用翻譯快取確保正確映射)"""
try:
import openpyxl
from openpyxl.styles import Alignment
from openpyxl.comments import Comment
from sqlalchemy import text as sql_text
from app import db
# 載入原始工作簿
wb = openpyxl.load_workbook(str(self.file_path), data_only=False)
try:
wb_vals = openpyxl.load_workbook(str(self.file_path), data_only=True)
except Exception:
wb_vals = None
# 建立翻譯映射 - 改用翻譯快取查詢,確保正確對應
original_segments = self.extract_text_segments()
tmap = {}
logger.info(f"Building translation map for {len(original_segments)} segments in language {target_language}")
for original_text in original_segments:
# 從翻譯快取中查詢每個原文的翻譯
# 使用聯合查詢優先使用最早的翻譯記錄原始DIFY翻譯
normalized_text = original_text.replace('\n', ' ').replace('\r', ' ').strip()
result = db.session.execute(sql_text("""
SELECT translated_text, created_at, 'exact' as match_type
FROM dt_translation_cache
WHERE source_text = :exact_text AND target_language = :lang
UNION ALL
SELECT translated_text, created_at, 'normalized' as match_type
FROM dt_translation_cache
WHERE REPLACE(REPLACE(TRIM(source_text), '\n', ' '), '\r', ' ') = :norm_text
AND target_language = :lang
AND source_text != :exact_text
ORDER BY created_at ASC
LIMIT 1
"""), {'exact_text': original_text, 'norm_text': normalized_text, 'lang': target_language})
row = result.fetchone()
if row and row[0]:
tmap[original_text] = row[0]
logger.debug(f"Cache hit for Excel: {original_text[:30]}... -> {row[0][:30]}...")
else:
logger.warning(f"No translation found in cache for: {original_text[:50]}...")
logger.info(f"Translation map built with {len(tmap)} mappings from cache")
# 處理每個工作表(加入詳細調試日誌)
translation_count = 0
skip_count = 0
for ws in wb.worksheets:
logger.info(f"Processing worksheet: {ws.title}")
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
max_row, max_col = ws.max_row, ws.max_column
for r in range(1, max_row + 1):
for c in range(1, max_col + 1):
cell_name = f"{openpyxl.utils.get_column_letter(c)}{r}"
src_text = self._get_display_text_for_translation(ws, ws_vals, r, c)
if not src_text:
continue
# 檢查是否需要翻譯
should_translate = self._should_translate(src_text, 'auto')
if not should_translate:
logger.debug(f"Skip {cell_name}: '{src_text[:30]}...' (should not translate)")
skip_count += 1
continue
# 檢查翻譯映射
if src_text not in tmap:
logger.warning(f"No translation mapping for {cell_name}: '{src_text[:30]}...'")
skip_count += 1
continue
val = ws.cell(row=r, column=c).value
is_formula = isinstance(val, str) and val.startswith("=")
translated_text = tmap[src_text]
cell = ws.cell(row=r, column=c)
if is_formula:
# 公式儲存格:添加註解
txt_comment = f"翻譯: {translated_text}"
exist = cell.comment
if not exist or exist.text.strip() != txt_comment:
cell.comment = Comment(txt_comment, "translator")
logger.debug(f"Added comment to {cell_name}: {translated_text[:30]}...")
translation_count += 1
else:
# 一般儲存格:使用交錯格式(原文+翻譯)
combined = f"{src_text}\n{translated_text}"
# 檢查是否已經是預期的格式
current_text = str(cell.value) if cell.value else ""
if current_text.strip() == combined.strip():
logger.debug(f"Skip {cell_name}: already translated")
continue
cell.value = combined
logger.info(f"Translated {cell_name}: '{src_text[:20]}...' -> '{translated_text[:20]}...'")
translation_count += 1
# 設定自動換行(移植自參考檔案)
try:
if cell.alignment:
cell.alignment = Alignment(
horizontal=cell.alignment.horizontal,
vertical=cell.alignment.vertical,
wrap_text=True
)
else:
cell.alignment = Alignment(wrap_text=True)
except Exception:
cell.alignment = Alignment(wrap_text=True)
# 儲存翻譯後的檔案
output_filename = f"{self.file_path.stem}_{target_language}_translated.xlsx"
output_path = output_dir / output_filename
wb.save(str(output_path))
logger.info(f"Excel translation completed: {translation_count} translations, {skip_count} skips")
logger.info(f"Generated translated Excel file: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Failed to generate translated Excel file: {str(e)}")
raise FileProcessingError(f"Excel 翻譯檔生成失敗: {str(e)}")
class PdfParser(DocumentParser):
"""PDF 文件解析器(只讀)"""
def extract_text_segments(self) -> List[str]:
"""提取 PDF 文件的文字片段"""
try:
from PyPDF2 import PdfReader
reader = PdfReader(str(self.file_path))
text_segments = []
for page in reader.pages:
text = page.extract_text()
# 簡單的句子分割
sentences = text.split('.')
for sentence in sentences:
sentence = sentence.strip()
if sentence and len(sentence) > 10:
text_segments.append(sentence)
logger.info(f"Extracted {len(text_segments)} text segments from PDF")
return text_segments
except Exception as e:
logger.error(f"Failed to extract text from PDF: {str(e)}")
raise FileProcessingError(f"PDF 文件解析失敗: {str(e)}")
def generate_translated_document(self, translations: Dict[str, List[str]],
target_language: str, output_dir: Path) -> str:
"""生成翻譯文字檔PDF 不支援直接編輯)"""
try:
translated_texts = translations.get(target_language, [])
# 生成純文字檔案
output_filename = f"{self.file_path.stem}_{target_language}_translated.txt"
output_path = output_dir / output_filename
with open(output_path, 'w', encoding='utf-8') as f:
f.write(f"翻譯結果 - {target_language}\n")
f.write("=" * 50 + "\n\n")
for i, text in enumerate(translated_texts):
f.write(f"{i+1}. {text}\n\n")
logger.info(f"Generated translated text file: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Failed to generate translated text file: {str(e)}")
raise FileProcessingError(f"生成翻譯文字檔失敗: {str(e)}")
class TranslationService:
"""翻譯服務"""
def __init__(self):
self.dify_client = DifyClient()
self.document_processor = DocumentProcessor()
# 文件解析器映射
self.parsers = {
'.docx': DocxParser,
'.doc': DocParser, # 需要先轉換為 DOCX
'.xlsx': ExcelParser,
'.xls': ExcelParser, # Excel 處理器會自動處理 XLS 轉換
'.pdf': PdfParser,
# 其他格式可以稍後添加
}
def get_document_parser(self, file_path: str) -> DocumentParser:
"""取得文件解析器"""
file_ext = Path(file_path).suffix.lower()
parser_class = self.parsers.get(file_ext)
if not parser_class:
raise FileProcessingError(f"不支援的檔案格式: {file_ext}")
return parser_class(file_path)
def split_text_into_sentences(self, text: str, language: str = 'auto') -> List[str]:
"""將文字分割成句子 - 使用增強的分句邏輯"""
return self.document_processor.split_text_into_sentences(text, language)
def translate_excel_cell(self, text: str, source_language: str,
target_language: str, user_id: int = None,
job_id: int = None) -> str:
"""
Excel儲存格翻譯 - 整個儲存格作為一個單位翻譯,不進行切片
"""
if not text or not text.strip():
return ""
# 檢查快取 - 整個儲存格內容
cached_translation = TranslationCache.get_translation(text, source_language, target_language)
if cached_translation:
logger.debug(f"Excel cell cache hit: {text[:30]}...")
return cached_translation
# 直接翻譯整個儲存格內容,不進行任何切片
try:
result = self.dify_client.translate_text(
text=text,
source_language=source_language,
target_language=target_language,
user_id=user_id,
job_id=job_id
)
translated_text = result['translated_text']
# 儲存整個儲存格的翻譯到快取
TranslationCache.save_translation(
text, source_language, target_language, translated_text
)
return translated_text
except Exception as e:
logger.error(f"Failed to translate Excel cell: {text[:30]}... Error: {str(e)}")
# 翻譯失敗時返回失敗標記
return f"【翻譯失敗|{target_language}{text}"
def translate_word_table_cell(self, text: str, source_language: str,
target_language: str, user_id: int = None,
job_id: int = None) -> str:
"""
Word表格儲存格翻譯 - 整個儲存格內容作為一個單位翻譯,不進行段落切片
"""
if not text or not text.strip():
return ""
# 檢查快取 - 整個儲存格內容
cached_translation = TranslationCache.get_translation(text, source_language, target_language)
if cached_translation:
logger.debug(f"Word table cell cache hit: {text[:30]}...")
return cached_translation
# 直接翻譯整個儲存格內容,不進行任何段落切片
try:
result = self.dify_client.translate_text(
text=text,
source_language=source_language,
target_language=target_language,
user_id=user_id,
job_id=job_id
)
translated_text = result['translated_text']
# 儲存整個儲存格的翻譯到快取
TranslationCache.save_translation(
text, source_language, target_language, translated_text
)
return translated_text
except Exception as e:
logger.error(f"Failed to translate Word table cell: {text[:30]}... Error: {str(e)}")
return f"【翻譯失敗|{target_language}{text}"
def translate_segment_with_sentences(self, text: str, source_language: str,
target_language: str, user_id: int = None,
job_id: int = None) -> str:
"""
按段落翻譯,模仿成功版本的 translate_block_sentencewise 邏輯
對多行文字進行逐行、逐句翻譯,並重新組合成完整段落
僅用於Word文檔Excel請使用 translate_excel_cell
"""
if not text or not text.strip():
return ""
# 檢查快取 - 先檢查整個段落的快取
cached_whole = TranslationCache.get_translation(text, source_language, target_language)
if cached_whole:
logger.debug(f"Whole paragraph cache hit: {text[:30]}...")
return cached_whole
# 按行處理
out_lines = []
all_successful = True
for raw_line in text.split('\n'):
if not raw_line.strip():
out_lines.append("")
continue
# 分句處理
sentences = self.document_processor.split_text_into_sentences(raw_line, source_language)
if not sentences:
sentences = [raw_line]
translated_parts = []
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# 檢查句子級快取
cached_sentence = TranslationCache.get_translation(sentence, source_language, target_language)
if cached_sentence:
translated_parts.append(cached_sentence)
continue
# 呼叫 Dify API 翻譯句子
try:
result = self.dify_client.translate_text(
text=sentence,
source_language=source_language,
target_language=target_language,
user_id=user_id,
job_id=job_id
)
translated_sentence = result['translated_text']
# 儲存句子級快取
TranslationCache.save_translation(
sentence, source_language, target_language, translated_sentence
)
translated_parts.append(translated_sentence)
except Exception as e:
logger.error(f"Failed to translate sentence: {sentence[:30]}... Error: {str(e)}")
translated_parts.append(f"【翻譯失敗|{target_language}{sentence}")
all_successful = False
# 重新組合句子為一行
out_lines.append(" ".join(translated_parts))
# 重新組合所有行
final_result = "\n".join(out_lines)
# 如果全部成功,儲存整個段落的快取
if all_successful:
TranslationCache.save_translation(text, source_language, target_language, final_result)
return final_result
def translate_text_with_cache(self, text: str, source_language: str,
target_language: str, user_id: int = None,
job_id: int = None) -> str:
"""帶快取的文字翻譯"""
# 檢查快取
cached_translation = TranslationCache.get_translation(
text, source_language, target_language
)
if cached_translation:
logger.debug(f"Cache hit for translation: {text[:50]}...")
return cached_translation
# 呼叫 Dify API
try:
result = self.dify_client.translate_text(
text=text,
source_language=source_language,
target_language=target_language,
user_id=user_id,
job_id=job_id
)
translated_text = result['translated_text']
# 儲存到快取
TranslationCache.save_translation(
text, source_language, target_language, translated_text
)
return translated_text
except Exception as e:
logger.error(f"Translation failed for text: {text[:50]}... Error: {str(e)}")
raise TranslationError(f"翻譯失敗: {str(e)}")
def translate_document(self, job_uuid: str) -> Dict[str, Any]:
"""翻譯文件(主要入口點)- 使用增強的文檔處理邏輯"""
try:
# 取得任務資訊
job = TranslationJob.query.filter_by(job_uuid=job_uuid).first()
if not job:
raise TranslationError(f"找不到任務: {job_uuid}")
logger.info(f"Starting enhanced document translation: {job_uuid}")
# 更新任務狀態
job.update_status('PROCESSING', progress=0)
# 使用增強的文檔處理器直接提取段落
file_ext = Path(job.file_path).suffix.lower()
if file_ext in ['.docx', '.doc']:
# 使用增強的 DOCX 處理邏輯
segments = self.document_processor.extract_docx_segments(job.file_path)
logger.info(f"Enhanced extraction: Found {len(segments)} segments to translate")
if not segments:
raise TranslationError("文件中未找到可翻譯的文字段落")
# 使用成功版本的翻譯邏輯 - 直接按段落翻譯,不做複雜分割
translatable_segments = []
for seg in segments:
if self.document_processor.should_translate_text(seg.text, job.source_language):
translatable_segments.append(seg)
logger.info(f"Found {len(translatable_segments)} segments to translate")
# 批次翻譯 - 直接按原始段落翻譯
translation_map = {} # 格式: (target_language, source_text) -> translated_text
total_segments = len(translatable_segments)
for target_language in job.target_languages:
logger.info(f"Translating to {target_language}")
for i, seg in enumerate(translatable_segments):
try:
# 根據段落類型選擇適當的翻譯方法
if seg.kind == "table_cell":
# 表格儲存格使用整個儲存格為單位的翻譯方法
translated = self.translate_word_table_cell(
text=seg.text,
source_language=job.source_language,
target_language=target_language,
user_id=job.user_id,
job_id=job.id
)
else:
# 一般段落使用原有的句子切片方法
translated = self.translate_segment_with_sentences(
text=seg.text,
source_language=job.source_language,
target_language=target_language,
user_id=job.user_id,
job_id=job.id
)
# 直接以原始段落文字為鍵儲存翻譯結果
translation_map[(target_language, seg.text)] = translated
# 更新進度
progress = (i + 1) / total_segments * 100 / len(job.target_languages)
current_lang_index = job.target_languages.index(target_language)
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
job.update_status('PROCESSING', progress=total_progress)
# 短暫延遲避免過快請求
time.sleep(0.1)
except Exception as e:
logger.error(f"Failed to translate segment: {seg.text[:50]}... Error: {str(e)}")
# 翻譯失敗時保留原文
translation_map[(target_language, seg.text)] = f"[翻譯失敗] {seg.text}"
# 生成翻譯文件
logger.info("Generating translated documents with enhanced insertion")
output_dir = Path(job.file_path).parent
output_files = {}
for target_language in job.target_languages:
try:
# 生成輸出檔名
output_filename = generate_filename(
Path(job.file_path).name,
'translated',
'translated',
target_language
)
output_path = output_dir / output_filename
# 使用增強的翻譯插入邏輯
ok_count, skip_count = self.document_processor.insert_docx_translations(
job.file_path,
segments,
translation_map,
[target_language],
str(output_path)
)
output_files[target_language] = str(output_path)
# 記錄翻譯檔案到資料庫
file_size = Path(output_path).stat().st_size
job.add_translated_file(
language_code=target_language,
filename=Path(output_path).name,
file_path=str(output_path),
file_size=file_size
)
logger.info(f"Generated {target_language}: {ok_count} insertions, {skip_count} skips")
except Exception as e:
logger.error(f"Failed to generate translated document for {target_language}: {str(e)}")
raise TranslationError(f"生成 {target_language} 翻譯文件失敗: {str(e)}")
# 生成組合多語言檔案 - 包含所有翻譯在一個文件中
if len(job.target_languages) > 1:
try:
# 生成組合檔案的檔名
combined_filename = generate_filename(
Path(job.file_path).name,
'translated',
'combined',
'multilang'
)
combined_output_path = output_dir / combined_filename
# 使用新的組合翻譯插入方法
combined_ok_count, combined_skip_count = self.document_processor.insert_docx_combined_translations(
job.file_path,
segments,
translation_map,
job.target_languages,
str(combined_output_path)
)
output_files['combined'] = str(combined_output_path)
# 記錄組合翻譯檔案到資料庫
file_size = Path(combined_output_path).stat().st_size
job.add_translated_file(
language_code='combined',
filename=Path(combined_output_path).name,
file_path=str(combined_output_path),
file_size=file_size
)
logger.info(f"Generated combined multi-language file: {combined_ok_count} insertions, {combined_skip_count} skips")
except Exception as e:
logger.error(f"Failed to generate combined multi-language document: {str(e)}")
# 不要因為組合檔案失敗而讓整個任務失敗,只記錄警告
logger.warning("Combined multi-language file generation failed, but individual files were successful")
elif file_ext in ['.xlsx', '.xls']:
# Excel 文件使用儲存格為單位的翻譯邏輯
logger.info(f"Using cell-based processing for Excel files")
parser = self.get_document_parser(job.file_path)
# 提取儲存格文字內容(不進行句子切片)
cell_segments = parser.extract_text_segments()
if not cell_segments:
raise TranslationError("Excel 文件中未找到可翻譯的文字")
logger.info(f"Found {len(cell_segments)} cell segments to translate")
# 批次翻譯 - 使用儲存格為單位的翻譯方法
translation_results = {}
total_segments = len(cell_segments)
for target_language in job.target_languages:
logger.info(f"Translating Excel cells to {target_language}")
translated_cells = []
for i, cell_text in enumerate(cell_segments):
try:
# 使用新的儲存格翻譯方法(整個儲存格作為單位)
translated = self.translate_excel_cell(
text=cell_text,
source_language=job.source_language,
target_language=target_language,
user_id=job.user_id,
job_id=job.id
)
translated_cells.append(translated)
# 更新進度
progress = (i + 1) / total_segments * 100 / len(job.target_languages)
current_lang_index = job.target_languages.index(target_language)
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
job.update_status('PROCESSING', progress=total_progress)
time.sleep(0.1)
except Exception as e:
logger.error(f"Failed to translate Excel cell: {cell_text[:50]}... Error: {str(e)}")
translated_cells.append(f"[翻譯失敗] {cell_text}")
translation_results[target_language] = translated_cells
# 生成翻譯文件
output_dir = Path(job.file_path).parent
output_files = {}
for target_language, translations in translation_results.items():
translation_mapping = {target_language: translations}
output_file = parser.generate_translated_document(
translations=translation_mapping,
target_language=target_language,
output_dir=output_dir
)
output_files[target_language] = output_file
file_size = Path(output_file).stat().st_size
job.add_translated_file(
language_code=target_language,
filename=Path(output_file).name,
file_path=output_file,
file_size=file_size
)
# 生成組合多語言Excel檔案
if len(job.target_languages) > 1:
try:
# 生成組合檔案的檔名
combined_filename = generate_filename(
Path(job.file_path).name,
'translated',
'combined',
'multilang'
)
combined_output_path = output_dir / combined_filename
# 為Excel組合檔案建立翻譯映射
combined_translation_mapping = {}
for lang in job.target_languages:
combined_translation_mapping[lang] = translation_results[lang]
# 使用修改過的generate_combined_excel_document方法
combined_output_file = self._generate_combined_excel_document(
parser,
combined_translation_mapping,
job.target_languages,
combined_output_path
)
output_files['combined'] = combined_output_file
# 記錄組合翻譯檔案到資料庫
file_size = Path(combined_output_file).stat().st_size
job.add_translated_file(
language_code='combined',
filename=Path(combined_output_file).name,
file_path=combined_output_file,
file_size=file_size
)
logger.info(f"Generated combined multi-language Excel file")
except Exception as e:
logger.error(f"Failed to generate combined multi-language Excel document: {str(e)}")
logger.warning("Combined multi-language Excel file generation failed, but individual files were successful")
else:
# 對於其他文件格式,使用原有邏輯
logger.info(f"Using legacy sentence-based processing for {file_ext} files")
parser = self.get_document_parser(job.file_path)
# 提取文字片段
text_segments = parser.extract_text_segments()
if not text_segments:
raise TranslationError("文件中未找到可翻譯的文字")
# 分割成句子
all_sentences = []
for segment in text_segments:
sentences = self.split_text_into_sentences(segment, job.source_language)
all_sentences.extend(sentences)
# 去重複
unique_sentences = list(dict.fromkeys(all_sentences))
logger.info(f"Found {len(unique_sentences)} unique sentences to translate")
# 批次翻譯
translation_results = {}
total_sentences = len(unique_sentences)
for target_language in job.target_languages:
logger.info(f"Translating to {target_language}")
translated_sentences = []
for i, sentence in enumerate(unique_sentences):
try:
translated = self.translate_text_with_cache(
text=sentence,
source_language=job.source_language,
target_language=target_language,
user_id=job.user_id,
job_id=job.id
)
translated_sentences.append(translated)
# 更新進度
progress = (i + 1) / total_sentences * 100 / len(job.target_languages)
current_lang_index = job.target_languages.index(target_language)
total_progress = (current_lang_index * 100 + progress) / len(job.target_languages)
job.update_status('PROCESSING', progress=total_progress)
time.sleep(0.1)
except Exception as e:
logger.error(f"Failed to translate sentence: {sentence[:50]}... Error: {str(e)}")
translated_sentences.append(f"[翻譯失敗] {sentence}")
translation_results[target_language] = translated_sentences
# 生成翻譯文件
output_dir = Path(job.file_path).parent
output_files = {}
for target_language, translations in translation_results.items():
translation_mapping = {target_language: translations}
output_file = parser.generate_translated_document(
translations=translation_mapping,
target_language=target_language,
output_dir=output_dir
)
output_files[target_language] = output_file
file_size = Path(output_file).stat().st_size
job.add_translated_file(
language_code=target_language,
filename=Path(output_file).name,
file_path=output_file,
file_size=file_size
)
# 計算總成本
total_cost = self._calculate_job_cost(job.id)
# 更新任務狀態為完成
job.update_status('COMPLETED', progress=100)
job.total_cost = total_cost
# 計算實際使用的 token 數(從 API 使用統計中獲取)
from sqlalchemy import func
from app.models.stats import APIUsageStats
from app import db
actual_tokens = db.session.query(
func.sum(APIUsageStats.total_tokens)
).filter_by(job_id=job.id).scalar()
job.total_tokens = int(actual_tokens) if actual_tokens else 0
db.session.commit()
logger.info(f"Enhanced document translation completed: {job_uuid}")
return {
'success': True,
'job_uuid': job_uuid,
'output_files': output_files,
'total_sentences': len(texts_to_translate) if 'texts_to_translate' in locals() else len(unique_sentences) if 'unique_sentences' in locals() else 0,
'total_cost': float(total_cost),
'target_languages': job.target_languages
}
except TranslationError:
raise
except Exception as e:
logger.error(f"Enhanced document translation failed: {job_uuid}. Error: {str(e)}")
raise TranslationError(f"文件翻譯失敗: {str(e)}")
def _calculate_job_cost(self, job_id: int) -> float:
"""計算任務總成本"""
from app import db
from sqlalchemy import func
from app.models.stats import APIUsageStats
total_cost = db.session.query(
func.sum(APIUsageStats.cost)
).filter_by(job_id=job_id).scalar()
return float(total_cost) if total_cost else 0.0
def _generate_combined_excel_document(self, parser, translation_mapping: Dict[str, List[str]],
target_languages: List[str], output_path: Path) -> str:
"""生成包含所有翻譯語言的組合Excel檔案"""
try:
import openpyxl
from openpyxl.styles import Alignment, Font
from sqlalchemy import text as sql_text
from app import db
# 載入原始工作簿
wb = openpyxl.load_workbook(str(parser.file_path), data_only=False)
try:
wb_vals = openpyxl.load_workbook(str(parser.file_path), data_only=True)
except Exception:
wb_vals = None
# 取得原始文字段落以建立翻譯映射
original_segments = parser.extract_text_segments()
combined_tmap = {}
logger.info(f"Building combined translation map for {len(original_segments)} segments")
for original_text in original_segments:
# 從翻譯快取中查詢所有語言的翻譯
for target_lang in target_languages:
result = db.session.execute(sql_text("""
SELECT translated_text
FROM dt_translation_cache
WHERE source_text = :text AND target_language = :lang
ORDER BY created_at ASC
LIMIT 1
"""), {'text': original_text, 'lang': target_lang})
row = result.fetchone()
if row and row[0]:
combined_tmap[(target_lang, original_text)] = row[0]
logger.info(f"Built combined translation map with {len(combined_tmap)} mappings")
# 處理每個工作表,插入組合翻譯
for ws in wb.worksheets:
logger.info(f"Processing combined worksheet: {ws.title}")
ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None
max_row, max_col = ws.max_row, ws.max_column
for r in range(1, max_row + 1):
for c in range(1, max_col + 1):
cell = ws.cell(row=r, column=c)
src_text = parser._get_display_text_for_translation(ws, ws_vals, r, c)
if not src_text or not parser._should_translate(src_text, 'auto'):
continue
# 收集所有語言的翻譯
translations = []
for target_lang in target_languages:
if (target_lang, src_text) in combined_tmap:
translations.append(combined_tmap[(target_lang, src_text)])
else:
translations.append(f"【翻譯缺失|{target_lang}")
# 組合翻譯文字:原文\n英文\n越南文
if translations:
combined_text = src_text + '\n' + '\n'.join(translations)
# 設置儲存格值
cell.value = combined_text
cell.alignment = Alignment(wrap_text=True, vertical='top')
cell.font = Font(size=10)
# 儲存組合檔案
wb.save(str(output_path))
logger.info(f"Generated combined Excel file: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Failed to generate combined Excel document: {str(e)}")
raise FileProcessingError(f"組合 Excel 檔案生成失敗: {str(e)}")