#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 翻譯服務 Author: PANJIT IT Team Created: 2024-01-28 Modified: 2024-01-28 """ import hashlib import time from pathlib import Path from typing import List, Dict, Any, Optional, Tuple from app.utils.logger import get_logger from app.utils.exceptions import TranslationError, FileProcessingError from app.services.dify_client import DifyClient from app.services.document_processor import DocumentProcessor, Segment from app.models.cache import TranslationCache from app.models.job import TranslationJob from app.utils.helpers import generate_filename, create_job_directory logger = get_logger(__name__) class DocumentParser: """文件解析器基類""" def __init__(self, file_path: str): self.file_path = Path(file_path) if not self.file_path.exists(): raise FileProcessingError(f"檔案不存在: {file_path}") def extract_text_segments(self) -> List[str]: """提取文字片段""" raise NotImplementedError def generate_translated_document(self, translations: Dict[str, List[str]], target_language: str, output_dir: Path) -> str: """生成翻譯後的文件""" raise NotImplementedError class DocxParser(DocumentParser): """DOCX 文件解析器 - 使用增強的 DocumentProcessor""" def __init__(self, file_path: str): super().__init__(file_path) self.processor = DocumentProcessor() def extract_text_segments(self) -> List[str]: """提取 DOCX 文件的文字片段 - 使用增強邏輯""" try: # 使用新的文檔處理器提取段落 segments = self.processor.extract_docx_segments(str(self.file_path)) # 轉換為文字列表 text_segments = [] for seg in segments: if seg.text.strip() and len(seg.text.strip()) > 3: text_segments.append(seg.text) logger.info(f"Enhanced extraction: {len(text_segments)} text segments from DOCX") return text_segments except Exception as e: logger.error(f"Failed to extract text from DOCX: {str(e)}") raise FileProcessingError(f"DOCX 文件解析失敗: {str(e)}") def extract_segments_with_context(self) -> List[Segment]: """提取帶上下文的段落資訊""" return self.processor.extract_docx_segments(str(self.file_path)) def generate_translated_document(self, translations: Dict[str, List[str]], target_language: str, output_dir: Path) -> str: """生成翻譯後的 DOCX 文件 - 使用增強的翻譯插入邏輯(從快取讀取)""" try: from sqlalchemy import text as sql_text from app import db # 生成輸出檔名 output_filename = generate_filename( self.file_path.name, 'translated', 'translated', target_language ) output_path = output_dir / output_filename # 提取段落資訊 segments = self.extract_segments_with_context() # 建立翻譯映射 - 從快取讀取而非使用傳入的translations參數 translation_map = {} logger.info(f"Building translation map for {len(segments)} segments in language {target_language}") for seg in segments: # 從翻譯快取中查詢每個段落的翻譯 result = db.session.execute(sql_text(""" SELECT translated_text FROM dt_translation_cache WHERE source_text = :text AND target_language = :lang ORDER BY created_at DESC LIMIT 1 """), {'text': seg.text, 'lang': target_language}) row = result.fetchone() if row and row[0]: translation_map[(target_language, seg.text)] = row[0] logger.debug(f"Found translation for: {seg.text[:50]}...") else: logger.warning(f"No translation found for: {seg.text[:50]}...") logger.info(f"Translation map built with {len(translation_map)} mappings") # 使用增強的翻譯插入邏輯 ok_count, skip_count = self.processor.insert_docx_translations( str(self.file_path), segments, translation_map, [target_language], str(output_path) ) logger.info(f"Enhanced translation: Generated {output_path} with {ok_count} insertions, {skip_count} skips") return str(output_path) except Exception as e: logger.error(f"Failed to generate translated DOCX: {str(e)}") raise FileProcessingError(f"生成翻譯 DOCX 失敗: {str(e)}") class DocParser(DocumentParser): """DOC 文件解析器 - 需要先轉換為 DOCX""" def extract_text_segments(self) -> List[str]: """提取 DOC 文件的文字片段 - 先轉換為 DOCX 再處理""" try: # 檢查是否有 Word COM 支援 import tempfile import os try: import win32com.client as win32 import pythoncom _WIN32COM_AVAILABLE = True except ImportError: _WIN32COM_AVAILABLE = False if not _WIN32COM_AVAILABLE: raise FileProcessingError("DOC 格式需要 Word COM 支援,請先手動轉換為 DOCX 格式或安裝 Microsoft Office") # 創建臨時 DOCX 文件 temp_docx = None try: with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp: temp_docx = tmp.name # 使用 Word COM 轉換 DOC 到 DOCX (格式 16) self._word_convert(str(self.file_path), temp_docx, 16) # 使用 DOCX 解析器處理轉換後的文件 docx_parser = DocxParser(temp_docx) segments = docx_parser.extract_text_segments() logger.info(f"Converted DOC to DOCX and extracted {len(segments)} segments") return segments finally: # 清理臨時文件 if temp_docx and os.path.exists(temp_docx): try: os.remove(temp_docx) except Exception: pass except Exception as e: logger.error(f"Failed to extract text from DOC file: {str(e)}") raise FileProcessingError(f"DOC 文件解析失敗: {str(e)}") def _word_convert(self, input_path: str, output_path: str, target_format: int): """使用 Word COM 轉換文件格式(移植自參考檔案)""" try: import win32com.client as win32 import pythoncom pythoncom.CoInitialize() try: word = win32.Dispatch("Word.Application") word.Visible = False doc = word.Documents.Open(os.path.abspath(input_path)) doc.SaveAs2(os.path.abspath(output_path), FileFormat=target_format) doc.Close(False) finally: word.Quit() pythoncom.CoUninitialize() except Exception as e: raise FileProcessingError(f"Word COM 轉換失敗: {str(e)}") def generate_translated_document(self, translations: Dict[str, List[str]], target_language: str, output_dir: Path) -> str: """生成翻譯後的 DOC 文件 - 先轉為 DOCX 處理後輸出為 DOCX""" try: import tempfile import os # 先轉換為 DOCX,然後使用 DOCX 處理邏輯 temp_docx = None try: with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp: temp_docx = tmp.name # 轉換 DOC 到 DOCX self._word_convert(str(self.file_path), temp_docx, 16) # 使用 DOCX 解析器生成翻譯文檔 docx_parser = DocxParser(temp_docx) # 注意:最終輸出為 DOCX 格式,因為 DOC 格式較難直接處理 output_filename = f"{self.file_path.stem}_{target_language}_translated.docx" output_path = output_dir / output_filename result_path = docx_parser.generate_translated_document(translations, target_language, output_dir) logger.info(f"Generated translated DOC file (as DOCX): {result_path}") return result_path finally: # 清理臨時文件 if temp_docx and os.path.exists(temp_docx): try: os.remove(temp_docx) except Exception: pass except Exception as e: logger.error(f"Failed to generate translated DOC file: {str(e)}") raise FileProcessingError(f"DOC 翻譯檔生成失敗: {str(e)}") class ExcelParser(DocumentParser): """Excel 文件解析器(XLSX/XLS)- 移植自參考檔案""" def extract_text_segments(self) -> List[str]: """提取 Excel 文件的文字片段""" try: import openpyxl from openpyxl.utils.exceptions import InvalidFileException # 載入工作簿(移植自參考檔案邏輯) try: wb = openpyxl.load_workbook(str(self.file_path), data_only=False) wb_vals = openpyxl.load_workbook(str(self.file_path), data_only=True) except InvalidFileException: if self.file_path.suffix.lower() == '.xls': raise FileProcessingError("XLS 格式需要先轉換為 XLSX 格式") raise except Exception: wb_vals = None # 提取文字段落(完全按照參考檔案的邏輯) segs = [] for ws in wb.worksheets: ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None max_row, max_col = ws.max_row, ws.max_column for r in range(1, max_row + 1): for c in range(1, max_col + 1): src_text = self._get_display_text_for_translation(ws, ws_vals, r, c) if not src_text: continue if not self._should_translate(src_text, 'auto'): continue segs.append(src_text) # 去重保持順序 unique_segments = [] seen = set() for seg in segs: if seg not in seen: unique_segments.append(seg) seen.add(seg) logger.info(f"Extracted {len(unique_segments)} unique text segments from Excel file") return unique_segments except Exception as e: logger.error(f"Failed to extract text from Excel file: {str(e)}") raise FileProcessingError(f"Excel 文件解析失敗: {str(e)}") def _get_display_text_for_translation(self, ws, ws_vals, r: int, c: int) -> Optional[str]: """取得儲存格用於翻譯的顯示文字(完全移植自參考檔案)""" val = ws.cell(row=r, column=c).value if isinstance(val, str) and val.startswith("="): if ws_vals is not None: shown = ws_vals.cell(row=r, column=c).value return shown if isinstance(shown, str) and shown.strip() else None return None if isinstance(val, str) and val.strip(): return val if ws_vals is not None: shown = ws_vals.cell(row=r, column=c).value if isinstance(shown, str) and shown.strip(): return shown return None def _should_translate(self, text: str, src_lang: str) -> bool: """判斷文字是否需要翻譯(修正中文長度判斷)""" text = text.strip() # 檢查是否包含中日韓文字 has_cjk = self._has_cjk(text) # 對於包含CJK字符的文字,放寬長度限制為2個字符 min_length = 2 if has_cjk else 3 if len(text) < min_length: return False # Skip pure numbers, dates, etc. import re if re.match(r'^[\d\s\.\-\:\/]+$', text): return False # For auto-detect, translate if has CJK or meaningful text if src_lang.lower() in ('auto', 'auto-detect'): return has_cjk or len(text) > 5 return True def _has_cjk(self, text: str) -> bool: """檢查是否包含中日韓文字(移植自參考檔案)""" for char in text: if '\u4e00' <= char <= '\u9fff' or \ '\u3400' <= char <= '\u4dbf' or \ '\u20000' <= char <= '\u2a6df' or \ '\u3040' <= char <= '\u309f' or \ '\u30a0' <= char <= '\u30ff' or \ '\uac00' <= char <= '\ud7af': return True return False def generate_translated_document(self, translations: Dict[str, List[str]], target_language: str, output_dir: Path) -> str: """生成翻譯後的 Excel 文件(使用翻譯快取確保正確映射)""" try: import openpyxl from openpyxl.styles import Alignment from openpyxl.comments import Comment from sqlalchemy import text as sql_text from app import db # 載入原始工作簿 wb = openpyxl.load_workbook(str(self.file_path), data_only=False) try: wb_vals = openpyxl.load_workbook(str(self.file_path), data_only=True) except Exception: wb_vals = None # 建立翻譯映射 - 改用翻譯快取查詢,確保正確對應 original_segments = self.extract_text_segments() tmap = {} logger.info(f"Building translation map for {len(original_segments)} segments in language {target_language}") for original_text in original_segments: # 從翻譯快取中查詢每個原文的翻譯 # 使用聯合查詢,優先使用最早的翻譯記錄(原始DIFY翻譯) normalized_text = original_text.replace('\n', ' ').replace('\r', ' ').strip() result = db.session.execute(sql_text(""" SELECT translated_text, created_at, 'exact' as match_type FROM dt_translation_cache WHERE source_text = :exact_text AND target_language = :lang UNION ALL SELECT translated_text, created_at, 'normalized' as match_type FROM dt_translation_cache WHERE REPLACE(REPLACE(TRIM(source_text), '\n', ' '), '\r', ' ') = :norm_text AND target_language = :lang AND source_text != :exact_text ORDER BY created_at ASC LIMIT 1 """), {'exact_text': original_text, 'norm_text': normalized_text, 'lang': target_language}) row = result.fetchone() if row and row[0]: tmap[original_text] = row[0] logger.debug(f"Cache hit for Excel: {original_text[:30]}... -> {row[0][:30]}...") else: logger.warning(f"No translation found in cache for: {original_text[:50]}...") logger.info(f"Translation map built with {len(tmap)} mappings from cache") # 處理每個工作表(加入詳細調試日誌) translation_count = 0 skip_count = 0 for ws in wb.worksheets: logger.info(f"Processing worksheet: {ws.title}") ws_vals = wb_vals[ws.title] if wb_vals and ws.title in wb_vals.sheetnames else None max_row, max_col = ws.max_row, ws.max_column for r in range(1, max_row + 1): for c in range(1, max_col + 1): cell_name = f"{openpyxl.utils.get_column_letter(c)}{r}" src_text = self._get_display_text_for_translation(ws, ws_vals, r, c) if not src_text: continue # 檢查是否需要翻譯 should_translate = self._should_translate(src_text, 'auto') if not should_translate: logger.debug(f"Skip {cell_name}: '{src_text[:30]}...' (should not translate)") skip_count += 1 continue # 檢查翻譯映射 if src_text not in tmap: logger.warning(f"No translation mapping for {cell_name}: '{src_text[:30]}...'") skip_count += 1 continue val = ws.cell(row=r, column=c).value is_formula = isinstance(val, str) and val.startswith("=") translated_text = tmap[src_text] cell = ws.cell(row=r, column=c) if is_formula: # 公式儲存格:添加註解 txt_comment = f"翻譯: {translated_text}" exist = cell.comment if not exist or exist.text.strip() != txt_comment: cell.comment = Comment(txt_comment, "translator") logger.debug(f"Added comment to {cell_name}: {translated_text[:30]}...") translation_count += 1 else: # 一般儲存格:使用交錯格式(原文+翻譯) combined = f"{src_text}\n{translated_text}" # 檢查是否已經是預期的格式 current_text = str(cell.value) if cell.value else "" if current_text.strip() == combined.strip(): logger.debug(f"Skip {cell_name}: already translated") continue cell.value = combined logger.info(f"Translated {cell_name}: '{src_text[:20]}...' -> '{translated_text[:20]}...'") translation_count += 1 # 設定自動換行(移植自參考檔案) try: if cell.alignment: cell.alignment = Alignment( horizontal=cell.alignment.horizontal, vertical=cell.alignment.vertical, wrap_text=True ) else: cell.alignment = Alignment(wrap_text=True) except Exception: cell.alignment = Alignment(wrap_text=True) # 儲存翻譯後的檔案 output_filename = f"{self.file_path.stem}_{target_language}_translated.xlsx" output_path = output_dir / output_filename wb.save(str(output_path)) logger.info(f"Excel translation completed: {translation_count} translations, {skip_count} skips") logger.info(f"Generated translated Excel file: {output_path}") return str(output_path) except Exception as e: logger.error(f"Failed to generate translated Excel file: {str(e)}") raise FileProcessingError(f"Excel 翻譯檔生成失敗: {str(e)}") class PdfParser(DocumentParser): """PDF 文件解析器(只讀)""" def extract_text_segments(self) -> List[str]: """提取 PDF 文件的文字片段""" try: from PyPDF2 import PdfReader reader = PdfReader(str(self.file_path)) text_segments = [] for page in reader.pages: text = page.extract_text() # 簡單的句子分割 sentences = text.split('.') for sentence in sentences: sentence = sentence.strip() if sentence and len(sentence) > 10: text_segments.append(sentence) logger.info(f"Extracted {len(text_segments)} text segments from PDF") return text_segments except Exception as e: logger.error(f"Failed to extract text from PDF: {str(e)}") raise FileProcessingError(f"PDF 文件解析失敗: {str(e)}") def generate_translated_document(self, translations: Dict[str, List[str]], target_language: str, output_dir: Path) -> str: """生成翻譯文字檔(PDF 不支援直接編輯)""" try: translated_texts = translations.get(target_language, []) # 生成純文字檔案 output_filename = f"{self.file_path.stem}_{target_language}_translated.txt" output_path = output_dir / output_filename with open(output_path, 'w', encoding='utf-8') as f: f.write(f"翻譯結果 - {target_language}\n") f.write("=" * 50 + "\n\n") for i, text in enumerate(translated_texts): f.write(f"{i+1}. {text}\n\n") logger.info(f"Generated translated text file: {output_path}") return str(output_path) except Exception as e: logger.error(f"Failed to generate translated text file: {str(e)}") raise FileProcessingError(f"生成翻譯文字檔失敗: {str(e)}") class TranslationService: """翻譯服務""" def __init__(self): self.dify_client = DifyClient() self.document_processor = DocumentProcessor() # 文件解析器映射 self.parsers = { '.docx': DocxParser, '.doc': DocParser, # 需要先轉換為 DOCX '.xlsx': ExcelParser, '.xls': ExcelParser, # Excel 處理器會自動處理 XLS 轉換 '.pdf': PdfParser, # 其他格式可以稍後添加 } def get_document_parser(self, file_path: str) -> DocumentParser: """取得文件解析器""" file_ext = Path(file_path).suffix.lower() parser_class = self.parsers.get(file_ext) if not parser_class: raise FileProcessingError(f"不支援的檔案格式: {file_ext}") return parser_class(file_path) def split_text_into_sentences(self, text: str, language: str = 'auto') -> List[str]: """將文字分割成句子 - 使用增強的分句邏輯""" return self.document_processor.split_text_into_sentences(text, language) def translate_excel_cell(self, text: str, source_language: str, target_language: str, user_id: int = None, job_id: int = None) -> str: """ Excel儲存格翻譯 - 整個儲存格作為一個單位翻譯,不進行切片 """ if not text or not text.strip(): return "" # 檢查快取 - 整個儲存格內容 cached_translation = TranslationCache.get_translation(text, source_language, target_language) if cached_translation: logger.debug(f"Excel cell cache hit: {text[:30]}...") return cached_translation # 直接翻譯整個儲存格內容,不進行任何切片 try: result = self.dify_client.translate_text( text=text, source_language=source_language, target_language=target_language, user_id=user_id, job_id=job_id ) translated_text = result['translated_text'] # 儲存整個儲存格的翻譯到快取 TranslationCache.save_translation( text, source_language, target_language, translated_text ) return translated_text except Exception as e: logger.error(f"Failed to translate Excel cell: {text[:30]}... Error: {str(e)}") # 翻譯失敗時返回失敗標記 return f"【翻譯失敗|{target_language}】{text}" def translate_word_table_cell(self, text: str, source_language: str, target_language: str, user_id: int = None, job_id: int = None) -> str: """ Word表格儲存格翻譯 - 整個儲存格內容作為一個單位翻譯,不進行段落切片 """ if not text or not text.strip(): return "" # 檢查快取 - 整個儲存格內容 cached_translation = TranslationCache.get_translation(text, source_language, target_language) if cached_translation: logger.debug(f"Word table cell cache hit: {text[:30]}...") return cached_translation # 直接翻譯整個儲存格內容,不進行任何段落切片 try: result = self.dify_client.translate_text( text=text, source_language=source_language, target_language=target_language, user_id=user_id, job_id=job_id ) translated_text = result['translated_text'] # 儲存整個儲存格的翻譯到快取 TranslationCache.save_translation( text, source_language, target_language, translated_text ) return translated_text except Exception as e: logger.error(f"Failed to translate Word table cell: {text[:30]}... Error: {str(e)}") return f"【翻譯失敗|{target_language}】{text}" def translate_segment_with_sentences(self, text: str, source_language: str, target_language: str, user_id: int = None, job_id: int = None) -> str: """ 按段落翻譯,模仿成功版本的 translate_block_sentencewise 邏輯 對多行文字進行逐行、逐句翻譯,並重新組合成完整段落 僅用於Word文檔,Excel請使用 translate_excel_cell """ if not text or not text.strip(): return "" # 檢查快取 - 先檢查整個段落的快取 cached_whole = TranslationCache.get_translation(text, source_language, target_language) if cached_whole: logger.debug(f"Whole paragraph cache hit: {text[:30]}...") return cached_whole # 按行處理 out_lines = [] all_successful = True for raw_line in text.split('\n'): if not raw_line.strip(): out_lines.append("") continue # 分句處理 sentences = self.document_processor.split_text_into_sentences(raw_line, source_language) if not sentences: sentences = [raw_line] translated_parts = [] for sentence in sentences: sentence = sentence.strip() if not sentence: continue # 檢查句子級快取 cached_sentence = TranslationCache.get_translation(sentence, source_language, target_language) if cached_sentence: translated_parts.append(cached_sentence) continue # 呼叫 Dify API 翻譯句子 try: result = self.dify_client.translate_text( text=sentence, source_language=source_language, target_language=target_language, user_id=user_id, job_id=job_id ) translated_sentence = result['translated_text'] # 儲存句子級快取 TranslationCache.save_translation( sentence, source_language, target_language, translated_sentence ) translated_parts.append(translated_sentence) except Exception as e: logger.error(f"Failed to translate sentence: {sentence[:30]}... Error: {str(e)}") translated_parts.append(f"【翻譯失敗|{target_language}】{sentence}") all_successful = False # 重新組合句子為一行 out_lines.append(" ".join(translated_parts)) # 重新組合所有行 final_result = "\n".join(out_lines) # 如果全部成功,儲存整個段落的快取 if all_successful: TranslationCache.save_translation(text, source_language, target_language, final_result) return final_result def translate_text_with_cache(self, text: str, source_language: str, target_language: str, user_id: int = None, job_id: int = None) -> str: """帶快取的文字翻譯""" # 檢查快取 cached_translation = TranslationCache.get_translation( text, source_language, target_language ) if cached_translation: logger.debug(f"Cache hit for translation: {text[:50]}...") return cached_translation # 呼叫 Dify API try: result = self.dify_client.translate_text( text=text, source_language=source_language, target_language=target_language, user_id=user_id, job_id=job_id ) translated_text = result['translated_text'] # 儲存到快取 TranslationCache.save_translation( text, source_language, target_language, translated_text ) return translated_text except Exception as e: logger.error(f"Translation failed for text: {text[:50]}... Error: {str(e)}") raise TranslationError(f"翻譯失敗: {str(e)}") def translate_document(self, job_uuid: str) -> Dict[str, Any]: """翻譯文件(主要入口點)- 使用增強的文檔處理邏輯""" try: # 取得任務資訊 job = TranslationJob.query.filter_by(job_uuid=job_uuid).first() if not job: raise TranslationError(f"找不到任務: {job_uuid}") logger.info(f"Starting enhanced document translation: {job_uuid}") # 更新任務狀態 job.update_status('PROCESSING', progress=0) # 使用增強的文檔處理器直接提取段落 file_ext = Path(job.file_path).suffix.lower() if file_ext in ['.docx', '.doc']: # 使用增強的 DOCX 處理邏輯 segments = self.document_processor.extract_docx_segments(job.file_path) logger.info(f"Enhanced extraction: Found {len(segments)} segments to translate") if not segments: raise TranslationError("文件中未找到可翻譯的文字段落") # 使用成功版本的翻譯邏輯 - 直接按段落翻譯,不做複雜分割 translatable_segments = [] for seg in segments: if self.document_processor.should_translate_text(seg.text, job.source_language): translatable_segments.append(seg) logger.info(f"Found {len(translatable_segments)} segments to translate") # 批次翻譯 - 直接按原始段落翻譯 translation_map = {} # 格式: (target_language, source_text) -> translated_text total_segments = len(translatable_segments) for target_language in job.target_languages: logger.info(f"Translating to {target_language}") for i, seg in enumerate(translatable_segments): try: # 根據段落類型選擇適當的翻譯方法 if seg.kind == "table_cell": # 表格儲存格使用整個儲存格為單位的翻譯方法 translated = self.translate_word_table_cell( text=seg.text, source_language=job.source_language, target_language=target_language, user_id=job.user_id, job_id=job.id ) else: # 一般段落使用原有的句子切片方法 translated = self.translate_segment_with_sentences( text=seg.text, source_language=job.source_language, target_language=target_language, user_id=job.user_id, job_id=job.id ) # 直接以原始段落文字為鍵儲存翻譯結果 translation_map[(target_language, seg.text)] = translated # 更新進度 progress = (i + 1) / total_segments * 100 / len(job.target_languages) current_lang_index = job.target_languages.index(target_language) total_progress = (current_lang_index * 100 + progress) / len(job.target_languages) job.update_status('PROCESSING', progress=total_progress) # 短暫延遲避免過快請求 time.sleep(0.1) except Exception as e: logger.error(f"Failed to translate segment: {seg.text[:50]}... Error: {str(e)}") # 翻譯失敗時保留原文 translation_map[(target_language, seg.text)] = f"[翻譯失敗] {seg.text}" # 生成翻譯文件 logger.info("Generating translated documents with enhanced insertion") output_dir = Path(job.file_path).parent output_files = {} for target_language in job.target_languages: try: # 生成輸出檔名 output_filename = generate_filename( Path(job.file_path).name, 'translated', 'translated', target_language ) output_path = output_dir / output_filename # 使用增強的翻譯插入邏輯 ok_count, skip_count = self.document_processor.insert_docx_translations( job.file_path, segments, translation_map, [target_language], str(output_path) ) output_files[target_language] = str(output_path) # 記錄翻譯檔案到資料庫 file_size = Path(output_path).stat().st_size job.add_translated_file( language_code=target_language, filename=Path(output_path).name, file_path=str(output_path), file_size=file_size ) logger.info(f"Generated {target_language}: {ok_count} insertions, {skip_count} skips") except Exception as e: logger.error(f"Failed to generate translated document for {target_language}: {str(e)}") raise TranslationError(f"生成 {target_language} 翻譯文件失敗: {str(e)}") elif file_ext in ['.xlsx', '.xls']: # Excel 文件使用儲存格為單位的翻譯邏輯 logger.info(f"Using cell-based processing for Excel files") parser = self.get_document_parser(job.file_path) # 提取儲存格文字內容(不進行句子切片) cell_segments = parser.extract_text_segments() if not cell_segments: raise TranslationError("Excel 文件中未找到可翻譯的文字") logger.info(f"Found {len(cell_segments)} cell segments to translate") # 批次翻譯 - 使用儲存格為單位的翻譯方法 translation_results = {} total_segments = len(cell_segments) for target_language in job.target_languages: logger.info(f"Translating Excel cells to {target_language}") translated_cells = [] for i, cell_text in enumerate(cell_segments): try: # 使用新的儲存格翻譯方法(整個儲存格作為單位) translated = self.translate_excel_cell( text=cell_text, source_language=job.source_language, target_language=target_language, user_id=job.user_id, job_id=job.id ) translated_cells.append(translated) # 更新進度 progress = (i + 1) / total_segments * 100 / len(job.target_languages) current_lang_index = job.target_languages.index(target_language) total_progress = (current_lang_index * 100 + progress) / len(job.target_languages) job.update_status('PROCESSING', progress=total_progress) time.sleep(0.1) except Exception as e: logger.error(f"Failed to translate Excel cell: {cell_text[:50]}... Error: {str(e)}") translated_cells.append(f"[翻譯失敗] {cell_text}") translation_results[target_language] = translated_cells # 生成翻譯文件 output_dir = Path(job.file_path).parent output_files = {} for target_language, translations in translation_results.items(): translation_mapping = {target_language: translations} output_file = parser.generate_translated_document( translations=translation_mapping, target_language=target_language, output_dir=output_dir ) output_files[target_language] = output_file file_size = Path(output_file).stat().st_size job.add_translated_file( language_code=target_language, filename=Path(output_file).name, file_path=output_file, file_size=file_size ) else: # 對於其他文件格式,使用原有邏輯 logger.info(f"Using legacy sentence-based processing for {file_ext} files") parser = self.get_document_parser(job.file_path) # 提取文字片段 text_segments = parser.extract_text_segments() if not text_segments: raise TranslationError("文件中未找到可翻譯的文字") # 分割成句子 all_sentences = [] for segment in text_segments: sentences = self.split_text_into_sentences(segment, job.source_language) all_sentences.extend(sentences) # 去重複 unique_sentences = list(dict.fromkeys(all_sentences)) logger.info(f"Found {len(unique_sentences)} unique sentences to translate") # 批次翻譯 translation_results = {} total_sentences = len(unique_sentences) for target_language in job.target_languages: logger.info(f"Translating to {target_language}") translated_sentences = [] for i, sentence in enumerate(unique_sentences): try: translated = self.translate_text_with_cache( text=sentence, source_language=job.source_language, target_language=target_language, user_id=job.user_id, job_id=job.id ) translated_sentences.append(translated) # 更新進度 progress = (i + 1) / total_sentences * 100 / len(job.target_languages) current_lang_index = job.target_languages.index(target_language) total_progress = (current_lang_index * 100 + progress) / len(job.target_languages) job.update_status('PROCESSING', progress=total_progress) time.sleep(0.1) except Exception as e: logger.error(f"Failed to translate sentence: {sentence[:50]}... Error: {str(e)}") translated_sentences.append(f"[翻譯失敗] {sentence}") translation_results[target_language] = translated_sentences # 生成翻譯文件 output_dir = Path(job.file_path).parent output_files = {} for target_language, translations in translation_results.items(): translation_mapping = {target_language: translations} output_file = parser.generate_translated_document( translations=translation_mapping, target_language=target_language, output_dir=output_dir ) output_files[target_language] = output_file file_size = Path(output_file).stat().st_size job.add_translated_file( language_code=target_language, filename=Path(output_file).name, file_path=output_file, file_size=file_size ) # 計算總成本 total_cost = self._calculate_job_cost(job.id) # 更新任務狀態為完成 job.update_status('COMPLETED', progress=100) job.total_cost = total_cost # 計算實際使用的 token 數(從 API 使用統計中獲取) from sqlalchemy import func from app.models.stats import APIUsageStats from app import db actual_tokens = db.session.query( func.sum(APIUsageStats.total_tokens) ).filter_by(job_id=job.id).scalar() job.total_tokens = int(actual_tokens) if actual_tokens else 0 db.session.commit() logger.info(f"Enhanced document translation completed: {job_uuid}") return { 'success': True, 'job_uuid': job_uuid, 'output_files': output_files, 'total_sentences': len(texts_to_translate) if 'texts_to_translate' in locals() else len(unique_sentences) if 'unique_sentences' in locals() else 0, 'total_cost': float(total_cost), 'target_languages': job.target_languages } except TranslationError: raise except Exception as e: logger.error(f"Enhanced document translation failed: {job_uuid}. Error: {str(e)}") raise TranslationError(f"文件翻譯失敗: {str(e)}") def _calculate_job_cost(self, job_id: int) -> float: """計算任務總成本""" from app import db from sqlalchemy import func from app.models.stats import APIUsageStats total_cost = db.session.query( func.sum(APIUsageStats.cost) ).filter_by(job_id=job_id).scalar() return float(total_cost) if total_cost else 0.0