#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 核心文檔處理邏輯 - 移植自最佳版本 包含完整的 DOCX 文字提取和翻譯插入功能 Author: PANJIT IT Team Created: 2024-09-02 Modified: 2024-09-02 """ import re import sys import time from pathlib import Path from typing import List, Dict, Tuple, Optional, Any from docx.text.paragraph import Paragraph from docx.table import Table, _Cell from docx.shared import Pt from docx.oxml import OxmlElement from docx.oxml.ns import qn, nsdecls import docx from app.utils.logger import get_logger from app.utils.exceptions import FileProcessingError logger = get_logger(__name__) # ---------- Constants ---------- INSERT_FONT_SIZE_PT = 10 SENTENCE_MODE = True # ---------- Optional dependencies detection ---------- try: import blingfire _HAS_BLINGFIRE = True except ImportError: _HAS_BLINGFIRE = False try: import pysbd _HAS_PYSBD = True except ImportError: _HAS_PYSBD = False # ---------- Helper functions ---------- def _has_cjk(text: str) -> bool: """Check if text contains CJK (Chinese/Japanese/Korean) characters.""" for char in text: if '\u4e00' <= char <= '\u9fff' or \ '\u3400' <= char <= '\u4dbf' or \ '\u20000' <= char <= '\u2a6df' or \ '\u3040' <= char <= '\u309f' or \ '\u30a0' <= char <= '\u30ff' or \ '\uac00' <= char <= '\ud7af': return True return False def _normalize_text(text: str) -> str: """Normalize text for comparison.""" return re.sub(r'\s+', ' ', text.strip().lower()) def _append_after(p: Paragraph, text_block: str, italic: bool=True, font_size_pt: int=INSERT_FONT_SIZE_PT) -> Paragraph: """Insert a new paragraph after p, return the new paragraph (for chain insert).""" new_p = OxmlElement("w:p") p._p.addnext(new_p) np = Paragraph(new_p, p._parent) lines = text_block.split("\n") for i, line in enumerate(lines): run = np.add_run(line) if italic: run.italic = True if font_size_pt: run.font.size = Pt(font_size_pt) if i < len(lines) - 1: run.add_break() tag = np.add_run("\u200b") if italic: tag.italic = True if font_size_pt: tag.font.size = Pt(font_size_pt) return np def _is_our_insert_block(p: Paragraph) -> bool: """Return True iff paragraph contains our zero-width marker.""" return any("\u200b" in (r.text or "") for r in p.runs) def _find_last_inserted_after(p: Paragraph, limit: int = 8) -> Optional[Paragraph]: """Find the last paragraph that was inserted after p (up to limit paragraphs).""" try: # Get all paragraphs in the parent container if hasattr(p._parent, 'paragraphs'): all_paras = list(p._parent.paragraphs) else: # Handle cases where _parent doesn't have paragraphs (e.g., table cells) return None # Find p's index p_index = -1 for i, para in enumerate(all_paras): if para._element == p._element: p_index = i break if p_index == -1: return None # Check paragraphs after p last_found = None for i in range(p_index + 1, min(p_index + 1 + limit, len(all_paras))): if _is_our_insert_block(all_paras[i]): last_found = all_paras[i] else: break # Stop at first non-inserted paragraph except Exception: return None return last_found def _p_text_with_breaks(p: Paragraph) -> str: """Extract text from paragraph with line breaks preserved.""" parts = [] for node in p._element.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"): tag = node.tag.split('}', 1)[-1] if tag == "t": parts.append(node.text or "") elif tag == "br": parts.append("\n") elif tag == "tab": parts.append("\t") return "".join(parts) def _is_our_insert_block(p: Paragraph) -> bool: """Check if paragraph is our inserted translation (contains zero-width space marker).""" text = _p_text_with_breaks(p) return "\u200b" in text def should_translate(text: str, src_lang: str) -> bool: """Determine if text should be translated based on content and source language.""" text = text.strip() if len(text) < 3: return False # Skip pure numbers, dates, etc. if re.match(r'^[\d\s\.\-\:\/]+$', text): return False # For auto-detect, translate if has CJK or meaningful text if src_lang.lower() in ('auto', 'auto-detect'): return _has_cjk(text) or len(text) > 5 return True def _split_sentences(text: str, lang: str = 'auto') -> List[str]: """Split text into sentences using available libraries.""" if not text.strip(): return [] # Try blingfire first if _HAS_BLINGFIRE and SENTENCE_MODE: try: sentences = blingfire.text_to_sentences(text).split('\n') sentences = [s.strip() for s in sentences if s.strip()] if sentences: return sentences except Exception as e: logger.warning(f"Blingfire failed: {e}") # Try pysbd if _HAS_PYSBD and SENTENCE_MODE: try: seg = pysbd.Segmenter(language="en" if lang == "auto" else lang) sentences = seg.segment(text) sentences = [s.strip() for s in sentences if s.strip()] if sentences: return sentences except Exception as e: logger.warning(f"PySBD failed: {e}") # Fallback to simple splitting separators = ['. ', '。', '!', '?', '!', '?', '\n'] sentences = [text] for sep in separators: new_sentences = [] for s in sentences: parts = s.split(sep) if len(parts) > 1: new_sentences.extend([p.strip() + sep.rstrip() for p in parts[:-1] if p.strip()]) if parts[-1].strip(): new_sentences.append(parts[-1].strip()) else: new_sentences.append(s) sentences = new_sentences return [s for s in sentences if len(s.strip()) > 3] # ---------- Segment class ---------- class Segment: """Represents a translatable text segment in a document.""" def __init__(self, kind: str, ref: Any, ctx: str, text: str): self.kind = kind # 'para' | 'txbx' self.ref = ref # Reference to original document element self.ctx = ctx # Context information self.text = text # Text content # ---------- TextBox helpers ---------- def _txbx_iter_texts(doc: docx.Document): """ Yield (txbxContent_element, joined_source_text) - Deeply collect all descendant under txbxContent - Skip our inserted translations: contains zero-width or (all italic and no CJK) - Keep only lines that still have CJK """ def _p_text_flags(p_el): parts = [] for node in p_el.xpath(".//*[local-name()='t' or local-name()='br' or local-name()='tab']"): tag = node.tag.split('}', 1)[-1] if tag == "t": parts.append(node.text or "") elif tag == "br": parts.append("\n") else: parts.append(" ") text = "".join(parts) has_zero = ("\u200b" in text) runs = p_el.xpath(".//*[local-name()='r']") vis, ital = [], [] for r in runs: rt = "".join([(t.text or "") for t in r.xpath(".//*[local-name()='t']")]) if (rt or "").strip(): vis.append(rt) ital.append(bool(r.xpath(".//*[local-name()='i']"))) all_italic = (len(vis) > 0 and all(ital)) return text, has_zero, all_italic for tx in doc._element.xpath(".//*[local-name()='txbxContent']"): kept = [] for p in tx.xpath(".//*[local-name()='p']"): # all descendant paragraphs text, has_zero, all_italic = _p_text_flags(p) if not (text or "").strip(): continue if has_zero: continue # our inserted for line in text.split("\n"): if line.strip(): kept.append(line.strip()) if kept: joined = "\n".join(kept) yield tx, joined def _txbx_append_paragraph(tx, text_block: str, italic: bool = True, font_size_pt: int = INSERT_FONT_SIZE_PT): """Append a paragraph to textbox content.""" p = OxmlElement("w:p") r = OxmlElement("w:r") rPr = OxmlElement("w:rPr") if italic: rPr.append(OxmlElement("w:i")) if font_size_pt: sz = OxmlElement("w:sz") sz.set(qn("w:val"), str(int(font_size_pt * 2))) rPr.append(sz) r.append(rPr) lines = text_block.split("\n") for i, line in enumerate(lines): if i > 0: r.append(OxmlElement("w:br")) t = OxmlElement("w:t") t.set(qn("xml:space"), "preserve") t.text = line r.append(t) tag = OxmlElement("w:t") tag.set(qn("xml:space"), "preserve") tag.text = "\u200b" r.append(tag) p.append(r) tx.append(p) def _txbx_tail_equals(tx, translations: List[str]) -> bool: """Check if textbox already contains the expected translations.""" paras = tx.xpath("./*[local-name()='p']") if len(paras) < len(translations): return False tail = paras[-len(translations):] for q, expect in zip(tail, translations): parts = [] for node in q.xpath(".//*[local-name()='t' or local-name()='br']"): tag = node.tag.split("}", 1)[-1] parts.append("\n" if tag == "br" else (node.text or "")) if _normalize_text("".join(parts).strip()) != _normalize_text(expect): return False return True # ---------- Main extraction logic ---------- def _get_paragraph_key(p: Paragraph) -> str: """Generate a stable unique key for paragraph deduplication.""" try: # Use XML content hash + text content for stable deduplication xml_content = p._p.xml if hasattr(p._p, 'xml') else str(p._p) text_content = _p_text_with_breaks(p) combined = f"{hash(xml_content)}_{len(text_content)}_{text_content[:50]}" return combined except Exception: # Fallback to simple text-based key text_content = _p_text_with_breaks(p) return f"fallback_{hash(text_content)}_{len(text_content)}" def _collect_docx_segments(doc: docx.Document) -> List[Segment]: """ Enhanced segment collector with improved stability. Handles paragraphs, tables, textboxes, and SDT Content Controls. """ segs: List[Segment] = [] seen_par_keys = set() def _add_paragraph(p: Paragraph, ctx: str): try: p_key = _get_paragraph_key(p) if p_key in seen_par_keys: return txt = _p_text_with_breaks(p) if txt.strip() and not _is_our_insert_block(p): segs.append(Segment("para", p, ctx, txt)) seen_par_keys.add(p_key) except Exception as e: # Log error but continue processing logger.warning(f"段落處理錯誤: {e}, 跳過此段落") def _process_container_content(container, ctx: str): """ Recursively processes content within a container (body, cell, or SDT content). Identifies and handles paragraphs, tables, and SDT elements. """ if container._element is None: return for child_element in container._element: qname = child_element.tag if qname.endswith('}p'): # Paragraph p = Paragraph(child_element, container) _add_paragraph(p, ctx) elif qname.endswith('}tbl'): # Table table = Table(child_element, container) for r_idx, row in enumerate(table.rows, 1): for c_idx, cell in enumerate(row.cells, 1): cell_ctx = f"{ctx} > Tbl(r{r_idx},c{c_idx})" _process_container_content(cell, cell_ctx) elif qname.endswith('}sdt'): # Structured Document Tag (SDT) sdt_ctx = f"{ctx} > SDT" # 1. 提取 SDT 的元數據文本 (Placeholder, Dropdown items) ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'} # 提取 Placeholder text placeholder_texts = [] for t in child_element.xpath('.//w:placeholder//w:t', namespaces=ns): if t.text: placeholder_texts.append(t.text) if placeholder_texts: full_placeholder = "".join(placeholder_texts).strip() if full_placeholder: segs.append(Segment("para", child_element, f"{sdt_ctx}-Placeholder", full_placeholder)) # 提取 Dropdown list items list_items = [] for item in child_element.xpath('.//w:dropDownList/w:listItem', namespaces=ns): display_text = item.get(qn('w:displayText')) if display_text: list_items.append(display_text) if list_items: items_as_text = "\n".join(list_items) segs.append(Segment("para", child_element, f"{sdt_ctx}-Dropdown", items_as_text)) # 2. 遞迴處理 SDT 的實際內容 (sdtContent) sdt_content_element = child_element.find(qn('w:sdtContent')) if sdt_content_element is not None: class SdtContentWrapper: def __init__(self, element, parent): self._element = element self._parent = parent sdt_content_wrapper = SdtContentWrapper(sdt_content_element, container) _process_container_content(sdt_content_wrapper, sdt_ctx) # --- Main execution starts here --- # 1. Process the main document body _process_container_content(doc._body, "Body") # 2. Process textboxes for tx, s in _txbx_iter_texts(doc): if s.strip() and (_has_cjk(s) or should_translate(s, 'auto')): segs.append(Segment("txbx", tx, "TextBox", s)) return segs def _insert_docx_translations(doc: docx.Document, segs: List[Segment], tmap: Dict[Tuple[str, str], str], targets: List[str], log=lambda s: None) -> Tuple[int, int]: """ Insert translations into DOCX document segments. CRITICAL: This function contains the fix for the major translation insertion bug. The key fix is in the segment filtering logic - we now correctly check if any target language has translation available using the proper key format (target_lang, text). Args: doc: The DOCX document object segs: List of segments to translate tmap: Translation map with keys as (target_language, source_text) targets: List of target languages in order log: Logging function Returns: Tuple of (successful_insertions, skipped_insertions) Key Bug Fix: OLD (INCORRECT): if (seg.kind, seg.text) not in tmap and (targets[0], seg.text) not in tmap NEW (CORRECT): has_any_translation = any((tgt, seg.text) in tmap for tgt in targets) """ ok_cnt = skip_cnt = 0 # Helper function to add a formatted run to a paragraph def _add_formatted_run(p: Paragraph, text: str, italic: bool, font_size_pt: int): lines = text.split("\n") for i, line in enumerate(lines): run = p.add_run(line) if italic: run.italic = True if font_size_pt: run.font.size = Pt(font_size_pt) if i < len(lines) - 1: run.add_break() # Add our zero-width space marker tag_run = p.add_run("\u200b") if italic: tag_run.italic = True if font_size_pt: tag_run.font.size = Pt(font_size_pt) for seg in segs: # Check if any target language has translation for this segment has_any_translation = any((tgt, seg.text) in tmap for tgt in targets) if not has_any_translation: log(f"[SKIP] 無翻譯結果: {seg.ctx} | {seg.text[:50]}...") skip_cnt += 1 continue # Get translations for all targets, with fallback for missing ones translations = [] for tgt in targets: if (tgt, seg.text) in tmap: translations.append(tmap[(tgt, seg.text)]) else: log(f"[WARNING] 缺少 {tgt} 翻譯: {seg.text[:30]}...") translations.append(f"【翻譯查詢失敗|{tgt}】{seg.text[:50]}...") log(f"[INSERT] 準備插入 {len(translations)} 個翻譯到 {seg.ctx}: {seg.text[:30]}...") if seg.kind == "para": # Check if this is an SDT segment (ref is an XML element, not a Paragraph) if hasattr(seg.ref, 'tag') and seg.ref.tag.endswith('}sdt'): # Handle SDT segments - insert translation into sdtContent sdt_element = seg.ref ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'} sdt_content = sdt_element.find(qn('w:sdtContent')) if sdt_content is not None: # Check if translations already exist existing_paras = sdt_content.xpath('.//w:p', namespaces=ns) existing_texts = [] for ep in existing_paras: p_obj = Paragraph(ep, None) if _is_our_insert_block(p_obj): existing_texts.append(_p_text_with_breaks(p_obj)) # Check if all translations already exist if len(existing_texts) >= len(translations): if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)): skip_cnt += 1 log(f"[SKIP] SDT 已存在翻譯: {seg.text[:30]}...") continue # Add translations to SDT content for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts): # Create new paragraph in SDT content new_p_element = OxmlElement("w:p") sdt_content.append(new_p_element) new_p = Paragraph(new_p_element, None) _add_formatted_run(new_p, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) ok_cnt += 1 log(f"[SUCCESS] SDT 插入翻譯(交錯格式)") continue p: Paragraph = seg.ref # --- CONTEXT-AWARE INSERTION LOGIC (from successful version) --- # Check if the paragraph's parent is a table cell if isinstance(p._parent, _Cell): cell = p._parent try: # Find the current paragraph's position in the cell cell_paragraphs = list(cell.paragraphs) p_index = -1 for idx, cell_p in enumerate(cell_paragraphs): if cell_p._element == p._element: p_index = idx break if p_index == -1: log(f"[WARNING] 無法找到段落在單元格中的位置,使用原始方法") # Fallback to original method for block in translations: new_p = cell.add_paragraph() _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) ok_cnt += 1 continue # Check if translations already exist right after this paragraph existing_texts = [] check_limit = min(p_index + 1 + len(translations), len(cell_paragraphs)) for idx in range(p_index + 1, check_limit): if _is_our_insert_block(cell_paragraphs[idx]): existing_texts.append(_p_text_with_breaks(cell_paragraphs[idx])) # Check if all translations already exist in order if len(existing_texts) >= len(translations): if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)): skip_cnt += 1 log(f"[SKIP] 表格單元格已存在翻譯: {seg.text[:30]}...") continue # Determine which translations need to be added to_add = [] for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts): to_add.append(t) if not to_add: skip_cnt += 1 log(f"[SKIP] 表格單元格所有翻譯已存在: {seg.text[:30]}...") continue # Insert new paragraphs right after the current paragraph insert_after = p for block in to_add: try: # Create new paragraph and insert it after the current position new_p_element = OxmlElement("w:p") insert_after._element.addnext(new_p_element) new_p = Paragraph(new_p_element, cell) _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) insert_after = new_p # Update position for next insertion except Exception as e: log(f"[ERROR] 表格插入失敗: {e}, 嘗試fallback方法") # Fallback: add at the end of cell try: new_p = cell.add_paragraph() _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) log(f"[SUCCESS] Fallback插入成功") except Exception as e2: log(f"[FATAL] Fallback也失敗: {e2}") continue ok_cnt += 1 log(f"[SUCCESS] 表格單元格插入 {len(to_add)} 個翻譯(緊接原文後)") except Exception as e: log(f"[ERROR] 表格處理全面失敗: {e}, 跳過此段落") continue else: # Normal paragraph (not in table cell) - enhanced logic from successful version try: # Check existing translations using the enhanced method last = _find_last_inserted_after(p, limit=max(len(translations), 4)) # Check if all translations already exist existing_texts = [] current_check = p for _ in range(len(translations)): try: # Get the next sibling paragraph next_sibling = current_check._element.getnext() if next_sibling is not None and next_sibling.tag.endswith('}p'): next_p = Paragraph(next_sibling, p._parent) if _is_our_insert_block(next_p): existing_texts.append(_p_text_with_breaks(next_p)) current_check = next_p else: break else: break except Exception: break # Skip if all translations already exist in order if len(existing_texts) >= len(translations): if all(_normalize_text(e) == _normalize_text(t) for e, t in zip(existing_texts[:len(translations)], translations)): skip_cnt += 1 log(f"[SKIP] 段落已存在翻譯: {seg.text[:30]}...") continue # Determine which translations need to be added to_add = [] for t in translations: if not any(_normalize_text(t) == _normalize_text(e) for e in existing_texts): to_add.append(t) if not to_add: skip_cnt += 1 log(f"[SKIP] 段落所有翻譯已存在: {seg.text[:30]}...") continue # Use enhanced insertion with proper positioning anchor = last if last else p for block in to_add: try: anchor = _append_after(anchor, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) except Exception as e: log(f"[ERROR] 段落插入失敗: {e}, 嘗試簡化插入") try: # Fallback: simple append if hasattr(p._parent, 'add_paragraph'): new_p = p._parent.add_paragraph() _add_formatted_run(new_p, block, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) log(f"[SUCCESS] Fallback段落插入成功") else: log(f"[ERROR] 無法進行fallback插入") except Exception as e2: log(f"[FATAL] Fallback也失敗: {e2}") continue ok_cnt += 1 log(f"[SUCCESS] 段落插入 {len(to_add)} 個翻譯(交錯格式)") except Exception as e: log(f"[ERROR] 段落處理失敗: {e}, 跳過此段落") continue elif seg.kind == "txbx": tx = seg.ref # Check if textbox already has our translations at the end if _txbx_tail_equals(tx, translations): skip_cnt += 1 log(f"[SKIP] 文字框已存在翻譯: {seg.text[:30]}...") continue # Append translations to textbox for t in translations: _txbx_append_paragraph(tx, t, italic=True, font_size_pt=INSERT_FONT_SIZE_PT) ok_cnt += 1 log(f"[SUCCESS] 文字框插入 {len(translations)} 個翻譯") return ok_cnt, skip_cnt # ---------- Main DocumentProcessor class ---------- class DocumentProcessor: """Enhanced document processor with complete DOCX handling capabilities.""" def __init__(self): self.logger = logger def extract_docx_segments(self, file_path: str) -> List[Segment]: """Extract all translatable segments from DOCX file.""" try: doc = docx.Document(file_path) segments = _collect_docx_segments(doc) self.logger.info(f"Extracted {len(segments)} segments from {file_path}") for seg in segments[:5]: # Log first 5 segments for debugging self.logger.debug(f"Segment: {seg.kind} | {seg.ctx} | {seg.text[:50]}...") return segments except Exception as e: self.logger.error(f"Failed to extract DOCX segments from {file_path}: {str(e)}") raise FileProcessingError(f"DOCX 文件分析失敗: {str(e)}") def insert_docx_translations(self, file_path: str, segments: List[Segment], translation_map: Dict[Tuple[str, str], str], target_languages: List[str], output_path: str) -> Tuple[int, int]: """Insert translations into DOCX file and save to output path.""" try: doc = docx.Document(file_path) def log_func(msg: str): self.logger.debug(msg) ok_count, skip_count = _insert_docx_translations( doc, segments, translation_map, target_languages, log_func ) # Save the modified document doc.save(output_path) self.logger.info(f"Inserted {ok_count} translations, skipped {skip_count}. Saved to: {output_path}") return ok_count, skip_count except Exception as e: self.logger.error(f"Failed to insert DOCX translations: {str(e)}") raise FileProcessingError(f"DOCX 翻譯插入失敗: {str(e)}") def split_text_into_sentences(self, text: str, language: str = 'auto') -> List[str]: """Split text into sentences using the best available method.""" return _split_sentences(text, language) def should_translate_text(self, text: str, source_language: str) -> bool: """Determine if text should be translated.""" return should_translate(text, source_language)